Spaces:
Paused
Paused
import torch | |
import numpy as np | |
from PIL import Image | |
import trimesh | |
import tempfile | |
from typing import Union, Optional, Dict, Any | |
from pathlib import Path | |
import os | |
import logging | |
import random | |
import time | |
import threading | |
from huggingface_hub import snapshot_download | |
import shutil | |
# Set up detailed logging for 3D generation | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class TimeoutError(Exception): | |
"""Custom timeout exception""" | |
pass | |
class Hunyuan3DGenerator: | |
"""3D model generation using Hunyuan3D-2.1 directly""" | |
def __init__(self, device: str = "cuda"): | |
logger.info(f"π§ Initializing Hunyuan3DGenerator with device: {device}") | |
self.device = device if torch.cuda.is_available() else "cpu" | |
logger.info(f"π§ Final device selection: {self.device}") | |
self.model = None | |
self.preprocessor = None | |
# Model configuration | |
self.model_id = "tencent/Hunyuan3D-2.1" | |
self.model_path = None | |
# Generation parameters | |
self.num_inference_steps = 30 # Reduced for faster generation | |
self.guidance_scale = 7.5 | |
self.resolution = 256 # 3D resolution | |
# Timeout configuration | |
self.generation_timeout = 180 # 3 minutes timeout for local generation | |
# Use full model since we have enough RAM | |
logger.info(f"π§ Using full Hunyuan3D-2.1 model") | |
logger.info(f"β±οΈ Generation timeout set to: {self.generation_timeout} seconds") | |
def _check_vram(self) -> bool: | |
"""Check if we have enough VRAM for full model""" | |
logger.info("π Checking VRAM availability...") | |
if not torch.cuda.is_available(): | |
logger.info("β CUDA not available") | |
return False | |
try: | |
vram = torch.cuda.get_device_properties(0).total_memory | |
vram_gb = vram / (1024 * 1024 * 1024) | |
logger.info(f"π Available VRAM: {vram_gb:.2f} GB") | |
# Need at least 12GB for full model | |
has_enough = vram > 12 * 1024 * 1024 * 1024 | |
logger.info(f"π Has enough VRAM (>12GB): {has_enough}") | |
return has_enough | |
except Exception as e: | |
logger.error(f"β Error checking VRAM: {e}") | |
return False | |
def load_model(self): | |
"""Load Hunyuan3D model and run necessary setup""" | |
if self.model is None: | |
logger.info("π Starting Hunyuan3D model loading and setup...") | |
try: | |
import subprocess | |
import sys | |
import os | |
def run_setup_command(command, cwd): | |
logger.info(f"Running command: {' '.join(command)} in {cwd}") | |
try: | |
process = subprocess.run( | |
command, | |
check=True, | |
capture_output=True, | |
text=True, | |
cwd=cwd | |
) | |
logger.info(f"β Command successful.") | |
if process.stdout: | |
logger.info(f"STDOUT:\n{process.stdout}") | |
if process.stderr: | |
logger.warning(f"STDERR:\n{process.stderr}") | |
except subprocess.CalledProcessError as e: | |
logger.error(f"β Command failed with exit code {e.returncode}") | |
logger.error(f"STDOUT:\n{e.stdout}") | |
logger.error(f"STDERR:\n{e.stderr}") | |
raise # Re-raise the exception to halt execution and see the error | |
# Download model repository if not already present | |
logger.info(f"π₯ Downloading Hunyuan3D repository from {self.model_id}...") | |
self.model_path = snapshot_download( | |
repo_id=self.model_id, | |
repo_type="space", | |
cache_dir="./models/hunyuan3d_cache" | |
) | |
logger.info(f"β Model repository downloaded to: {self.model_path}") | |
# # List the contents of the downloaded directory for debugging | |
# logger.info(f"π Listing contents of {self.model_path}...") | |
# run_setup_command(['ls', '-R'], cwd=self.model_path) | |
# --- Installation and Compilation --- | |
logger.info("π§ Running Hunyuan3D setup scripts with detailed logging...") | |
# 1. Install requirements from the model's specific requirements file | |
# requirements_path = os.path.join(self.model_path, 'requirements_hunyuan3d.txt') | |
# if os.path.exists(requirements_path): | |
# pip_command = [ | |
# sys.executable, '-m', 'pip', 'install', '-r', requirements_path, | |
# '--extra-index-url', 'https://mirrors.cloud.tencent.com/pypi/simple/', | |
# '--extra-index-url', 'https://mirrors.aliyun.com/pypi/simple' | |
# ] | |
# run_setup_command(pip_command, cwd=self.model_path) | |
# 2. Install custom rasterizer dependencies (torch) | |
# logger.info("Installing torch, torchvision, torchaudio...") | |
# pip_command_torch = [sys.executable, '-m', 'pip', 'install', 'torch==2.5.1', 'torchvision==0.20.1', 'torchaudio==2.5.1', '--index-url', 'https://download.pytorch.org/whl/cu124'] | |
# run_setup_command(pip_command_torch, cwd=self.model_path) | |
# 3. Install custom rasterizer | |
rasterizer_path = os.path.join(self.model_path, 'hy3dpaint', 'packages', 'custom_rasterizer') | |
if os.path.exists(rasterizer_path): | |
pip_command_rasterizer = [sys.executable, '-m', 'pip', 'install', '--no-build-isolation', '-e', '.'] | |
run_setup_command(pip_command_rasterizer, cwd=rasterizer_path) | |
# 4. Compile mesh painter | |
renderer_path = os.path.join(self.model_path, 'hy3dpaint', 'DifferentiableRenderer') | |
compile_script_path = os.path.join(renderer_path, 'compile_mesh_painter.sh') | |
if os.path.exists(compile_script_path): | |
bash_command = ['bash', compile_script_path] | |
run_setup_command(bash_command, cwd=renderer_path) | |
logger.info("β Hunyuan3D setup completed successfully.") | |
# --- Pipeline Initialization --- | |
logger.info("βοΈ Initializing Hunyuan3D pipelines...") | |
# Add subdirectories to Python path | |
sys.path.insert(0, os.path.join(self.model_path, 'hy3dshape')) | |
sys.path.insert(0, os.path.join(self.model_path, 'hy3dpaint')) | |
# Import the correct pipelines | |
from hy3dshape.pipelines import Hunyuan3DDiTFlowMatchingPipeline | |
from textureGenPipeline import Hunyuan3DPaintPipeline, Hunyuan3DPaintConfig | |
# Instantiate pipelines | |
logger.info("Instantiating shape pipeline...") | |
self.shape_pipeline = Hunyuan3DDiTFlowMatchingPipeline.from_pretrained( | |
self.model_path, torch_dtype=torch.bfloat16 | |
).to(self.device) | |
logger.info("Instantiating paint pipeline...") | |
paint_config = Hunyuan3DPaintConfig(max_num_view=8, resolution=1024, pbr_optimization=True) | |
self.paint_pipeline = Hunyuan3DPaintPipeline(paint_config) | |
self.model = "direct_model" | |
logger.info("β Hunyuan3D pipelines loaded successfully.") | |
except Exception as e: | |
logger.error(f"β Failed to set up Hunyuan3D pipeline: {e}", exc_info=True) | |
logger.warning("π Falling back to simplified 3D generation...") | |
self.model = "simplified" | |
def image_to_3d(self, | |
image: Union[str, Image.Image, np.ndarray], | |
remove_background: bool = True, | |
texture_resolution: int = 1024) -> Union[str, trimesh.Trimesh]: | |
"""Convert 2D image to 3D model using local Hunyuan3D""" | |
logger.info("π― Starting image-to-3D conversion process...") | |
logger.info(f"π― Input type: {type(image)}") | |
logger.info(f"π― Remove background: {remove_background}") | |
logger.info(f"π― Texture resolution: {texture_resolution}") | |
try: | |
# Load model if needed | |
logger.info("π Checking if model needs loading...") | |
if self.model is None: | |
logger.info("π¦ Model not loaded, initiating loading...") | |
self.load_model() | |
else: | |
logger.info("β Model already loaded") | |
# Prepare image | |
logger.info("πΌοΈ Preparing input image...") | |
if isinstance(image, str): | |
logger.info(f"πΌοΈ Loading image from path: {image}") | |
image = Image.open(image) | |
elif isinstance(image, np.ndarray): | |
logger.info("πΌοΈ Converting numpy array to PIL Image") | |
image = Image.fromarray(image) | |
# Ensure image is PIL Image | |
if not isinstance(image, Image.Image): | |
logger.error("β Invalid image type") | |
raise ValueError("Image must be PIL Image, numpy array, or path string") | |
logger.info(f"πΌοΈ Image mode: {image.mode}, size: {image.size}") | |
# Process based on model type | |
if self.model == "direct_model": | |
logger.info("π Using direct Hunyuan3D model for 3D generation...") | |
return self._generate_with_direct_model(image, remove_background, texture_resolution) | |
elif self.model == "simplified": | |
logger.info("π Using simplified Hunyuan3D generation...") | |
return self._generate_simplified_3d(image) | |
else: | |
# Fallback to simple 3D generation | |
logger.info("π Using fallback 3D generation...") | |
return self._generate_fallback_3d(image) | |
except Exception as e: | |
logger.error(f"β 3D generation error: {e}") | |
logger.error(f"β Error type: {type(e).__name__}") | |
logger.info("π Falling back to simple 3D generation...") | |
return self._generate_fallback_3d(image) | |
def _generate_with_direct_model(self, image: Image.Image, remove_background: bool, texture_resolution: int) -> str: | |
"""Generate 3D model using the official Hunyuan3D pipelines""" | |
try: | |
# Remove background if requested | |
if remove_background: | |
logger.info("π Removing background...") | |
image = self._remove_background(image) | |
# Save image to a temporary file, as pipelines expect a path | |
temp_image_path = self._save_temp_image(image) | |
# 1. Generate the untextured mesh | |
logger.info("π² Generating 3D shape with Hunyuan3DDiTFlowMatchingPipeline...") | |
# The pipeline returns a list of meshes, we take the first one | |
mesh_untextured_path = self.shape_pipeline( | |
image=temp_image_path, | |
num_inference_steps=self.num_inference_steps, | |
guidance_scale=self.guidance_scale, | |
seed=random.randint(1, 10000) | |
)[0] | |
logger.info(f"β Untextured mesh saved to: {mesh_untextured_path}") | |
# 2. Generate the texture for the mesh | |
logger.info("π¨ Generating texture with Hunyuan3DPaintPipeline...") | |
mesh_textured_path = self.paint_pipeline( | |
mesh_path=mesh_untextured_path, | |
image_path=temp_image_path, | |
guidance_scale=self.guidance_scale, | |
seed=random.randint(1, 10000) | |
) | |
logger.info(f"β Textured mesh saved to: {mesh_textured_path}") | |
# 3. Save the final output to a consistent location | |
output_path = self._save_output_mesh(mesh_textured_path) | |
logger.info(f"β 3D model generation successful. Final model at: {output_path}") | |
return output_path | |
except Exception as e: | |
logger.error(f"β Direct model generation failed: {e}", exc_info=True) | |
raise | |
def _generate_simplified_3d(self, image: Image.Image) -> str: | |
"""Generate 3D using simplified approach with PyTorch operations""" | |
logger.info("π§ Using simplified 3D generation with PyTorch...") | |
try: | |
# Convert image to tensor | |
import torchvision.transforms as transforms | |
transform = transforms.Compose([ | |
transforms.Resize((256, 256)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]) | |
]) | |
image_tensor = transform(image).unsqueeze(0).to(self.device) | |
# Create a depth map from the image | |
logger.info("π Generating depth map...") | |
# Simple depth estimation based on image brightness | |
gray_image = image.convert('L') | |
depth_array = np.array(gray_image.resize((64, 64))) / 255.0 | |
# Apply some smoothing and scaling | |
from scipy.ndimage import gaussian_filter | |
depth_array = gaussian_filter(depth_array, sigma=2) | |
depth_array = depth_array * 0.3 + 0.1 # Scale depth | |
# Generate mesh from depth map | |
logger.info("π² Creating mesh from depth map...") | |
mesh = self._depthmap_to_mesh(depth_array, image) | |
# Save mesh | |
output_path = self._save_mesh(mesh) | |
logger.info(f"β Simplified 3D model generated: {output_path}") | |
return output_path | |
except Exception as e: | |
logger.error(f"β Simplified generation failed: {e}") | |
return self._generate_fallback_3d(image) | |
def _depthmap_to_mesh(self, depth_map: np.ndarray, texture_image: Image.Image) -> trimesh.Trimesh: | |
"""Convert depth map to textured 3D mesh""" | |
h, w = depth_map.shape | |
# Create vertices with texture coordinates | |
vertices = [] | |
faces = [] | |
vertex_colors = [] | |
# Resize texture to match depth map | |
texture_resized = texture_image.resize((w, h)) | |
texture_array = np.array(texture_resized) | |
# Create vertex grid with colors | |
for i in range(h): | |
for j in range(w): | |
x = (j - w/2) / w * 2 | |
y = (i - h/2) / h * 2 | |
z = depth_map[i, j] | |
vertices.append([x, y, z]) | |
# Add vertex color from texture | |
if len(texture_array.shape) == 3: | |
color = texture_array[i, j, :3] | |
else: | |
color = [texture_array[i, j]] * 3 | |
vertex_colors.append(color) | |
# Create faces (two triangles per grid square) | |
for i in range(h-1): | |
for j in range(w-1): | |
v1 = i * w + j | |
v2 = v1 + 1 | |
v3 = v1 + w | |
v4 = v3 + 1 | |
faces.append([v1, v2, v3]) | |
faces.append([v2, v4, v3]) | |
vertices = np.array(vertices) | |
faces = np.array(faces) | |
vertex_colors = np.array(vertex_colors, dtype=np.uint8) | |
# Create mesh with vertex colors | |
mesh = trimesh.Trimesh( | |
vertices=vertices, | |
faces=faces, | |
vertex_colors=vertex_colors | |
) | |
# Apply smoothing | |
mesh = mesh.smoothed() | |
# Add a base to make it more stable | |
base_vertices, base_faces = self._create_base(vertices, w, h) | |
base_mesh = trimesh.Trimesh(vertices=base_vertices, faces=base_faces) | |
# Combine mesh with base | |
mesh = trimesh.util.concatenate([mesh, base_mesh]) | |
return mesh | |
def _create_base(self, vertices: np.ndarray, w: int, h: int) -> tuple: | |
"""Create a base for the mesh""" | |
base_z = vertices[:, 2].min() - 0.1 | |
base_vertices = [] | |
base_faces = [] | |
# Get boundary vertices - fix the indexing | |
boundary_indices = [] | |
# Top edge (excluding corners) | |
for j in range(1, w-1): | |
boundary_indices.append(j) | |
# Right edge (including top-right corner) | |
for i in range(h): | |
boundary_indices.append(i * w + w - 1) | |
# Bottom edge (excluding bottom-right corner, going right to left) | |
for j in range(w-2, 0, -1): | |
boundary_indices.append((h-1) * w + j) | |
# Left edge (including bottom-left corner, going bottom to top) | |
for i in range(h-1, -1, -1): | |
boundary_indices.append(i * w) | |
# Remove duplicate indices (first and last should not be the same) | |
if boundary_indices and boundary_indices[0] == boundary_indices[-1]: | |
boundary_indices = boundary_indices[:-1] | |
# Create base vertices | |
start_idx = len(vertices) | |
for idx in boundary_indices: | |
if idx < len(vertices): # Safety check | |
v = vertices[idx].copy() | |
v[2] = base_z | |
base_vertices.append(v) | |
if not base_vertices: | |
# If no base vertices were created, return empty arrays | |
return np.array([]), np.array([]) | |
# Create center vertex | |
center = np.mean(base_vertices, axis=0) | |
base_vertices.append(center) | |
center_idx = len(base_vertices) - 1 | |
# Create base faces | |
for i in range(len(boundary_indices)): | |
next_i = (i + 1) % len(boundary_indices) | |
base_faces.append([ | |
i, | |
next_i, | |
center_idx | |
]) | |
return np.array(base_vertices), np.array(base_faces) | |
def _remove_background(self, image: Image.Image) -> Image.Image: | |
"""Remove background from image""" | |
try: | |
# Try using rembg if available | |
from rembg import remove | |
return remove(image) | |
except: | |
# Fallback: simple background removal | |
# Convert to RGBA | |
image = image.convert("RGBA") | |
# Simple white background removal | |
datas = image.getdata() | |
new_data = [] | |
for item in datas: | |
# Remove white-ish backgrounds | |
if item[0] > 230 and item[1] > 230 and item[2] > 230: | |
new_data.append((255, 255, 255, 0)) | |
else: | |
new_data.append(item) | |
image.putdata(new_data) | |
return image | |
def _generate_fallback_3d(self, image: Union[Image.Image, np.ndarray]) -> str: | |
"""Generate fallback 3D model when main model fails""" | |
# Create a simple 3D representation based on image | |
if isinstance(image, np.ndarray): | |
image = Image.fromarray(image) | |
elif isinstance(image, str): | |
image = Image.open(image) | |
# Analyze image for basic shape | |
image_array = np.array(image.resize((64, 64))) | |
# Create height map from image brightness | |
gray = np.mean(image_array, axis=2) if len(image_array.shape) == 3 else image_array | |
height_map = gray / 255.0 | |
# Create mesh from height map | |
mesh = self._heightmap_to_mesh(height_map) | |
# Save and return path | |
return self._save_mesh(mesh) | |
def _heightmap_to_mesh(self, heightmap: np.ndarray) -> trimesh.Trimesh: | |
"""Convert heightmap to 3D mesh""" | |
h, w = heightmap.shape | |
# Create vertices | |
vertices = [] | |
faces = [] | |
# Create vertex grid | |
for i in range(h): | |
for j in range(w): | |
x = (j - w/2) / w * 2 | |
y = (i - h/2) / h * 2 | |
z = heightmap[i, j] * 0.5 | |
vertices.append([x, y, z]) | |
# Create faces | |
for i in range(h-1): | |
for j in range(w-1): | |
# Two triangles per grid square | |
v1 = i * w + j | |
v2 = v1 + 1 | |
v3 = v1 + w | |
v4 = v3 + 1 | |
faces.append([v1, v2, v3]) | |
faces.append([v2, v4, v3]) | |
vertices = np.array(vertices) | |
faces = np.array(faces) | |
# Create mesh | |
mesh = trimesh.Trimesh(vertices=vertices, faces=faces) | |
# Apply smoothing | |
mesh = mesh.smoothed() | |
return mesh | |
def _save_mesh(self, mesh: trimesh.Trimesh) -> str: | |
"""Save mesh to file""" | |
# Create temporary file | |
with tempfile.NamedTemporaryFile(suffix='.glb', delete=False) as tmp: | |
mesh_path = tmp.name | |
# Export mesh | |
mesh.export(mesh_path) | |
return mesh_path | |
def _save_temp_image(self, image: Image.Image) -> str: | |
"""Save PIL image to temporary file""" | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: | |
image_path = tmp.name | |
# Save image | |
image.save(image_path, 'PNG') | |
logger.info(f"πΎ Saved temp image to: {image_path}") | |
return image_path | |
def _save_output_mesh(self, source_mesh_path: str) -> str: | |
"""Copy generated mesh to our output location""" | |
# Create output directory if it doesn't exist | |
output_dir = "/tmp/hunyuan3d_output" | |
os.makedirs(output_dir, exist_ok=True) | |
# Generate unique filename | |
timestamp = tempfile.mktemp().split('/')[-1] | |
output_filename = f"hunyuan3d_mesh_{timestamp}.glb" | |
output_path = os.path.join(output_dir, output_filename) | |
# Copy the file | |
shutil.copy2(source_mesh_path, output_path) | |
logger.info(f"π Copied mesh from {source_mesh_path} to {output_path}") | |
return output_path | |
def text_to_3d(self, text_prompt: str) -> str: | |
"""Generate 3D model from text description""" | |
# First generate image, then convert to 3D | |
# This would require image generator integration | |
raise NotImplementedError("Text to 3D requires image generation first") | |
def to(self, device: str): | |
"""Update device preference""" | |
self.device = device | |
logger.info(f"π§ Device preference updated to: {device}") | |
def __del__(self): | |
"""Cleanup when object is destroyed""" | |
if hasattr(self, 'model') and self.model not in [None, "fallback_mode", "simplified"]: | |
del self.model | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() |