Spaces:
Running
on
Zero
Running
on
Zero
# Import spaces for ZeroGPU support | |
try: | |
import spaces | |
except ImportError: | |
# Fallback for local development | |
def spaces(func): | |
return func | |
import os | |
import sys | |
import logging | |
import time | |
import uuid | |
import atexit | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import Union | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Import other dependencies | |
import subprocess | |
import cv2 | |
import gradio as gr | |
import numpy as np | |
import threading | |
import subprocess | |
import tempfile | |
import shutil | |
import glob | |
import json | |
import base64 | |
import struct | |
import zlib | |
import argparse | |
import socket | |
import gc | |
from typing import List | |
from pathlib import Path | |
from einops import rearrange | |
from tempfile import TemporaryDirectory | |
from http.server import SimpleHTTPRequestHandler | |
from socketserver import ThreadingTCPServer | |
import socketserver | |
import http.server | |
import torch | |
from pathlib import Path | |
# Determine device | |
device = "cuda" | |
# Import custom modules with error handling | |
try: | |
from app_3rd.sam_utils.inference import SamPredictor, get_sam_predictor, run_inference | |
from app_3rd.spatrack_utils.infer_track import get_tracker_predictor, run_tracker, get_points_on_a_grid | |
except ImportError as e: | |
logger.error(f"Failed to import custom modules: {e}") | |
raise | |
MAX_FRAMES = 80 | |
# Thread pool for delayed deletion | |
thread_pool_executor = ThreadPoolExecutor(max_workers=2) | |
def delete_later(path: Union[str, os.PathLike], delay: int = 600): | |
"""Delete file or directory after specified delay (default 10 minutes)""" | |
def _delete(): | |
try: | |
if os.path.isfile(path): | |
os.remove(path) | |
elif os.path.isdir(path): | |
shutil.rmtree(path) | |
except Exception as e: | |
logger.warning(f"Failed to delete {path}: {e}") | |
def _wait_and_delete(): | |
time.sleep(delay) | |
_delete() | |
thread_pool_executor.submit(_wait_and_delete) | |
atexit.register(_delete) | |
def create_user_temp_dir(): | |
"""Create a unique temporary directory for each user session""" | |
session_id = str(uuid.uuid4())[:8] # Short unique ID | |
temp_dir = os.path.join("temp", f"session_{session_id}") | |
os.makedirs(temp_dir, exist_ok=True) | |
# Schedule deletion after 10 minutes | |
delete_later(temp_dir, delay=600) | |
return temp_dir | |
# Wrap the core GPU functions with @spaces.GPU | |
def gpu_run_inference(predictor_arg, image, points, boxes): | |
"""GPU-accelerated SAM inference""" | |
# Initialize SAM predictor inside GPU function if needed | |
if predictor_arg is None: | |
print("Initializing SAM predictor inside GPU function...") | |
predictor_arg = get_sam_predictor() | |
# Ensure the underlying model of the predictor is on the GPU. | |
try: | |
if hasattr(predictor_arg, 'model'): | |
predictor_arg.model = predictor_arg.model.to("cuda") | |
except Exception as e: | |
print(f"Warning: Could not move SAM model to GPU: {e}") | |
return run_inference(predictor_arg, image, points, boxes) | |
def gpu_run_tracker(tracker_model_arg, tracker_viser_arg, temp_dir, video_name, grid_size, vo_points, fps): | |
"""GPU-accelerated tracking""" | |
import torchvision.transforms as T | |
import decord | |
# Initialize tracker model inside GPU function if needed | |
if tracker_model_arg is None or tracker_viser_arg is None: | |
print("Initializing tracker models inside GPU function...") | |
out_dir = os.path.join(temp_dir, "results") | |
os.makedirs(out_dir, exist_ok=True) | |
tracker_model_arg, tracker_viser_arg = get_tracker_predictor(out_dir, vo_points=vo_points) | |
# Setup paths | |
video_path = os.path.join(temp_dir, f"{video_name}.mp4") | |
mask_path = os.path.join(temp_dir, f"{video_name}.png") | |
out_dir = os.path.join(temp_dir, "results") | |
os.makedirs(out_dir, exist_ok=True) | |
# Load video using decord | |
video_reader = decord.VideoReader(video_path) | |
video_tensor = torch.from_numpy(video_reader.get_batch(range(len(video_reader))).asnumpy()).permute(0, 3, 1, 2) # Convert to tensor and permute to (N, C, H, W) | |
# resize make sure the shortest side is 336 | |
h, w = video_tensor.shape[2:] | |
scale = max(336 / h, 336 / w) | |
if scale < 1: | |
new_h, new_w = int(h * scale), int(w * scale) | |
video_tensor = T.Resize((new_h, new_w))(video_tensor) | |
video_tensor = video_tensor[::fps].float()[:MAX_FRAMES] | |
# Move video tensor to GPU | |
video_tensor = video_tensor.cuda() | |
print(f"Video tensor shape: {video_tensor.shape}, device: {video_tensor.device}") | |
depth_tensor = None | |
intrs = None | |
extrs = None | |
data_npz_load = {} | |
# Load and process mask | |
if os.path.exists(mask_path): | |
mask = cv2.imread(mask_path) | |
mask = cv2.resize(mask, (video_tensor.shape[3], video_tensor.shape[2])) | |
mask = mask.sum(axis=-1)>0 | |
else: | |
mask = np.ones_like(video_tensor[0,0].cpu().numpy())>0 | |
grid_size = 10 | |
# Get frame dimensions and create grid points | |
frame_H, frame_W = video_tensor.shape[2:] | |
grid_pts = get_points_on_a_grid(grid_size, (frame_H, frame_W), device="cuda") # Create on GPU | |
# Sample mask values at grid points and filter out points where mask=0 | |
if os.path.exists(mask_path): | |
grid_pts_int = grid_pts[0].long() | |
mask_values = mask[grid_pts_int.cpu()[...,1], grid_pts_int.cpu()[...,0]] | |
grid_pts = grid_pts[:, mask_values] | |
query_xyt = torch.cat([torch.zeros_like(grid_pts[:, :, :1]), grid_pts], dim=2)[0].cpu().numpy() | |
print(f"Query points shape: {query_xyt.shape}") | |
# Run model inference | |
with torch.amp.autocast(device_type="cuda", dtype=torch.bfloat16): | |
( | |
c2w_traj, intrs, point_map, conf_depth, | |
track3d_pred, track2d_pred, vis_pred, conf_pred, video | |
) = tracker_model_arg.forward(video_tensor, depth=depth_tensor, | |
intrs=intrs, extrs=extrs, | |
queries=query_xyt, | |
fps=1, full_point=False, iters_track=4, | |
query_no_BA=True, fixed_cam=False, stage=1, | |
support_frame=len(video_tensor)-1, replace_ratio=0.2) | |
# Resize results to avoid too large I/O Burden | |
max_size = 336 | |
h, w = video.shape[2:] | |
scale = min(max_size / h, max_size / w) | |
if scale < 1: | |
new_h, new_w = int(h * scale), int(w * scale) | |
video = T.Resize((new_h, new_w))(video) | |
video_tensor = T.Resize((new_h, new_w))(video_tensor) | |
point_map = T.Resize((new_h, new_w))(point_map) | |
track2d_pred[...,:2] = track2d_pred[...,:2] * scale | |
intrs[:,:2,:] = intrs[:,:2,:] * scale | |
conf_depth = T.Resize((new_h, new_w))(conf_depth) | |
# Visualize tracks | |
tracker_viser_arg.visualize(video=video[None], | |
tracks=track2d_pred[None][...,:2], | |
visibility=vis_pred[None],filename="test") | |
# Save in tapip3d format | |
data_npz_load["coords"] = (torch.einsum("tij,tnj->tni", c2w_traj[:,:3,:3], track3d_pred[:,:,:3].cpu()) + c2w_traj[:,:3,3][:,None,:]).numpy() | |
data_npz_load["extrinsics"] = torch.inverse(c2w_traj).cpu().numpy() | |
data_npz_load["intrinsics"] = intrs.cpu().numpy() | |
data_npz_load["depths"] = point_map[:,2,...].cpu().numpy() | |
data_npz_load["video"] = (video_tensor).cpu().numpy()/255 | |
data_npz_load["visibs"] = vis_pred.cpu().numpy() | |
data_npz_load["confs"] = conf_pred.cpu().numpy() | |
data_npz_load["confs_depth"] = conf_depth.cpu().numpy() | |
np.savez(os.path.join(out_dir, f'result.npz'), **data_npz_load) | |
return None | |
# Constants | |
COLORS = [(0, 0, 255), (0, 255, 255)] # BGR: Red for negative, Yellow for positive | |
MARKERS = [1, 5] # Cross for negative, Star for positive | |
MARKER_SIZE = 8 # Increased marker size | |
VIZ_SCRIPT = "tapip3d_viz.py" | |
TRACK_SCRIPT = "inference.py" | |
# VIZ_HTML = "temp/3d_viz.html" | |
VIZ_HTML = "debug.html" | |
VIZ_PORT = 9089 | |
# Sample videos for gallery (you can add your own sample videos here) | |
EXAMPLE_VIDEOS = [ | |
# Add paths to your example videos here | |
{"name": "kiss", "path": "examples/kiss.mp4", "grid_size": 45, "vo_points": 700, "fps": 10}, | |
{"name": "backpack", "path": "examples/backpack.mp4", "grid_size": 40, "vo_points": 600, "fps": 2}, | |
{"name": "kitchen", "path": "examples/kitchen.mp4", "grid_size": 60, "vo_points": 800, "fps": 3}, | |
{"name": "pillow", "path": "examples/pillow.mp4", "grid_size": 35, "vo_points": 500, "fps": 2}, | |
{"name": "biker", "path": "examples/biker.mp4", "grid_size": 45, "vo_points": 700, "fps": 2}, | |
{"name": "running", "path": "examples/running.mp4", "grid_size": 45, "vo_points": 700, "fps": 2}, | |
{"name": "drifting", "path": "examples/drifting.mp4", "grid_size": 35, "vo_points": 1000, "fps": 6}, | |
{"name": "ball", "path": "examples/ball.mp4", "grid_size": 45, "vo_points": 700, "fps": 2}, | |
{"name": "dancer", "path": "examples/dancer.mp4", "grid_size": 45, "vo_points": 700, "fps": 2}, | |
{"name": "skate_sunset", "path": "examples/skate_sunset.mp4", "grid_size": 25, "vo_points": 1800, "fps": 6}, | |
{"name": "ego_kc1", "path": "examples/ego_kc1.mp4", "grid_size": 45, "vo_points": 500, "fps": 4}, | |
{"name": "vertical_place", "path": "examples/vertical_place.mp4", "grid_size": 45, "vo_points": 500, "fps": 3}, | |
{"name": "droid_robot", "path": "examples/droid_robot.mp4", "grid_size": 35, "vo_points": 400, "fps": 8}, | |
] | |
gr.set_static_paths(paths=[Path.cwd().absolute()/"_viz"]) | |
def compress_and_write(filename, header, blob): | |
header_bytes = json.dumps(header).encode("utf-8") | |
header_len = struct.pack("<I", len(header_bytes)) | |
with open(filename, "wb") as f: | |
f.write(header_len) | |
f.write(header_bytes) | |
f.write(blob) | |
def process_point_cloud_data(npz_file, width=256, height=192, fps=4): | |
fixed_size = (width, height) | |
data = np.load(npz_file) | |
extrinsics = data["extrinsics"] | |
intrinsics = data["intrinsics"] | |
trajs = data["coords"] | |
T, C, H, W = data["video"].shape | |
fx = intrinsics[0, 0, 0] | |
fy = intrinsics[0, 1, 1] | |
fov_y = 2 * np.arctan(H / (2 * fy)) * (180 / np.pi) | |
fov_x = 2 * np.arctan(W / (2 * fx)) * (180 / np.pi) | |
original_aspect_ratio = (W / fx) / (H / fy) | |
rgb_video = (rearrange(data["video"], "T C H W -> T H W C") * 255).astype(np.uint8) | |
rgb_video = np.stack([cv2.resize(frame, fixed_size, interpolation=cv2.INTER_AREA) | |
for frame in rgb_video]) | |
depth_video = data["depths"].astype(np.float32) | |
if "confs_depth" in data.keys(): | |
confs = (data["confs_depth"].astype(np.float32) > 0.5).astype(np.float32) | |
depth_video = depth_video * confs | |
depth_video = np.stack([cv2.resize(frame, fixed_size, interpolation=cv2.INTER_NEAREST) | |
for frame in depth_video]) | |
scale_x = fixed_size[0] / W | |
scale_y = fixed_size[1] / H | |
intrinsics = intrinsics.copy() | |
intrinsics[:, 0, :] *= scale_x | |
intrinsics[:, 1, :] *= scale_y | |
min_depth = float(depth_video.min()) * 0.8 | |
max_depth = float(depth_video.max()) * 1.5 | |
depth_normalized = (depth_video - min_depth) / (max_depth - min_depth) | |
depth_int = (depth_normalized * ((1 << 16) - 1)).astype(np.uint16) | |
depths_rgb = np.zeros((T, fixed_size[1], fixed_size[0], 3), dtype=np.uint8) | |
depths_rgb[:, :, :, 0] = (depth_int & 0xFF).astype(np.uint8) | |
depths_rgb[:, :, :, 1] = ((depth_int >> 8) & 0xFF).astype(np.uint8) | |
first_frame_inv = np.linalg.inv(extrinsics[0]) | |
normalized_extrinsics = np.array([first_frame_inv @ ext for ext in extrinsics]) | |
normalized_trajs = np.zeros_like(trajs) | |
for t in range(T): | |
homogeneous_trajs = np.concatenate([trajs[t], np.ones((trajs.shape[1], 1))], axis=1) | |
transformed_trajs = (first_frame_inv @ homogeneous_trajs.T).T | |
normalized_trajs[t] = transformed_trajs[:, :3] | |
arrays = { | |
"rgb_video": rgb_video, | |
"depths_rgb": depths_rgb, | |
"intrinsics": intrinsics, | |
"extrinsics": normalized_extrinsics, | |
"inv_extrinsics": np.linalg.inv(normalized_extrinsics), | |
"trajectories": normalized_trajs.astype(np.float32), | |
"cameraZ": 0.0 | |
} | |
header = {} | |
blob_parts = [] | |
offset = 0 | |
for key, arr in arrays.items(): | |
arr = np.ascontiguousarray(arr) | |
arr_bytes = arr.tobytes() | |
header[key] = { | |
"dtype": str(arr.dtype), | |
"shape": arr.shape, | |
"offset": offset, | |
"length": len(arr_bytes) | |
} | |
blob_parts.append(arr_bytes) | |
offset += len(arr_bytes) | |
raw_blob = b"".join(blob_parts) | |
compressed_blob = zlib.compress(raw_blob, level=9) | |
header["meta"] = { | |
"depthRange": [min_depth, max_depth], | |
"totalFrames": int(T), | |
"resolution": fixed_size, | |
"baseFrameRate": fps, | |
"numTrajectoryPoints": normalized_trajs.shape[1], | |
"fov": float(fov_y), | |
"fov_x": float(fov_x), | |
"original_aspect_ratio": float(original_aspect_ratio), | |
"fixed_aspect_ratio": float(fixed_size[0]/fixed_size[1]) | |
} | |
# Use a temporary file to avoid race conditions with data.bin | |
temp_bin_file = None | |
try: | |
# Create a temporary file path | |
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as f: | |
temp_bin_file = f.name | |
# Write to the temporary file | |
compress_and_write(temp_bin_file, header, compressed_blob) | |
# Read the content and encode it | |
with open(temp_bin_file, "rb") as f: | |
encoded_blob = base64.b64encode(f.read()).decode("ascii") | |
finally: | |
# Clean up the temporary file | |
if temp_bin_file and os.path.exists(temp_bin_file): | |
os.unlink(temp_bin_file) | |
# generate a random path | |
import time | |
random_path = f'./_viz/_{time.time()}.html' | |
with open('./_viz/viz_template.html') as f: | |
html_template = f.read() | |
html_out = html_template.replace( | |
"<head>", | |
f"<head>\n<script>window.embeddedBase64 = `{encoded_blob}`;</script>" | |
) | |
with open(random_path,'w') as f: | |
f.write(html_out) | |
return random_path | |
def numpy_to_base64(arr): | |
"""Convert numpy array to base64 string""" | |
return base64.b64encode(arr.tobytes()).decode('utf-8') | |
def base64_to_numpy(b64_str, shape, dtype): | |
"""Convert base64 string back to numpy array""" | |
return np.frombuffer(base64.b64decode(b64_str), dtype=dtype).reshape(shape) | |
def get_video_name(video_path): | |
"""Extract video name without extension""" | |
return os.path.splitext(os.path.basename(video_path))[0] | |
def handle_video_upload(video): | |
"""Handle video upload and extract first frame""" | |
if video is None: | |
return None, None, [] | |
# Create user-specific temporary directory | |
user_temp_dir = create_user_temp_dir() | |
# Get original video name | |
if isinstance(video, str): | |
video_name = get_video_name(video) | |
video_path = os.path.join(user_temp_dir, f"{video_name}.mp4") | |
shutil.copy(video, video_path) | |
else: | |
video_name = get_video_name(video.name) | |
video_path = os.path.join(user_temp_dir, f"{video_name}.mp4") | |
with open(video_path, 'wb') as f: | |
f.write(video.read()) | |
print(f"Video saved to: {video_path}") | |
cap = cv2.VideoCapture(video_path) | |
success, frame = cap.read() | |
cap.release() | |
if not success: | |
return None, None, [] | |
# Resize frame to have minimum side length of 336 | |
h, w = frame.shape[:2] | |
scale = 336 / min(h, w) | |
new_h, new_w = int(h * scale)//2*2, int(w * scale)//2*2 | |
frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_LINEAR) | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
# Convert frame to base64 string for storage, include temp_dir info | |
frame_data = { | |
'data': numpy_to_base64(frame), | |
'shape': frame.shape, | |
'dtype': str(frame.dtype), | |
'temp_dir': user_temp_dir # Store temp directory path | |
} | |
return json.dumps(frame_data), frame, [] | |
def save_masks(o_masks, video_name, temp_dir): | |
"""Save binary masks to files in user-specific temp directory""" | |
o_files = [] | |
for mask, _ in o_masks: | |
o_mask = np.uint8(mask.squeeze() * 255) | |
o_file = os.path.join(temp_dir, f"{video_name}.png") | |
cv2.imwrite(o_file, o_mask) | |
o_files.append(o_file) | |
return o_files | |
def select_point(original_img: str, sel_pix: list, point_type: str, evt: gr.SelectData): | |
"""Handle point selection for SAM""" | |
if original_img is None: | |
return None, [] | |
# Convert stored image data back to numpy array | |
frame_data = json.loads(original_img) | |
original_img = base64_to_numpy(frame_data['data'], frame_data['shape'], frame_data['dtype']) | |
temp_dir = frame_data.get('temp_dir', 'temp') # Get user-specific temp dir | |
# Create a display image for visualization | |
display_img = original_img.copy() | |
# Create a new list instead of modifying the existing one | |
new_sel_pix = sel_pix.copy() if sel_pix else [] | |
new_sel_pix.append((evt.index, 1 if point_type == 'positive_point' else 0)) | |
# Pass None to force initialization inside GPU function | |
o_masks = gpu_run_inference(None, original_img, new_sel_pix, []) | |
# Draw points on display image | |
for point, label in new_sel_pix: | |
cv2.drawMarker(display_img, point, COLORS[label], markerType=MARKERS[label], markerSize=MARKER_SIZE, thickness=2) | |
# Draw mask overlay on display image | |
if o_masks: | |
# Get the final mask (which is already processed as pos_mask - neg_mask) | |
mask = o_masks[0][0] # Get first mask | |
# Create a light blue overlay | |
overlay = display_img.copy() | |
overlay[mask.squeeze()!=0] = [20, 60, 200] # Light blue in BGR | |
# Blend with original image with lower alpha | |
display_img = cv2.addWeighted(overlay, 0.6, display_img, 0.4, 0) | |
# Get video name from the video file in user's temp directory | |
video_files = glob.glob(os.path.join(temp_dir, "*.mp4")) | |
if video_files: | |
video_name = get_video_name(video_files[0]) | |
save_masks(o_masks, video_name, temp_dir) | |
return display_img, new_sel_pix | |
def reset_points(original_img: str, sel_pix): | |
"""Reset all points and clear the mask""" | |
if original_img is None: | |
return None, [] | |
# Convert stored image data back to numpy array | |
frame_data = json.loads(original_img) | |
original_img = base64_to_numpy(frame_data['data'], frame_data['shape'], frame_data['dtype']) | |
temp_dir = frame_data.get('temp_dir', 'temp') # Get user-specific temp dir | |
# Create a display image for visualization (just the original image) | |
display_img = original_img.copy() | |
# Clear all points | |
new_sel_pix = [] | |
# Clear any existing masks in user's temp directory | |
for mask_file in glob.glob(os.path.join(temp_dir, "*.png")): | |
try: | |
os.remove(mask_file) | |
except Exception as e: | |
logger.warning(f"Failed to remove mask file {mask_file}: {e}") | |
return display_img, new_sel_pix | |
def run_tracker_and_save(video_path, mask_path, grid_size, vo_points, fps, temp_dir): | |
"""Run tracker on video with mask and save result""" | |
# Get video name for output file | |
video_name = get_video_name(video_path) | |
out_dir = os.path.join(temp_dir, "results") | |
os.makedirs(out_dir, exist_ok=True) | |
# Pass None to force initialization inside GPU function | |
gpu_run_tracker(None, None, temp_dir, video_name, grid_size, vo_points, fps) | |
# Return paths for visualization | |
npz_path = os.path.join(out_dir, "result.npz") | |
track2d_video = os.path.join(out_dir, "test_pred_track.mp4") | |
html_out_path = process_point_cloud_data(npz_path) | |
# Schedule deletion of generated files | |
delete_later(html_out_path, delay=600) | |
if os.path.exists(track2d_video): | |
delete_later(track2d_video, delay=600) | |
if os.path.exists(npz_path): | |
delete_later(npz_path, delay=600) | |
return html_out_path, track2d_video | |
def launch_viz(grid_size, vo_points, fps, original_image_state): | |
"""Launch visualization with user-specific temp directory""" | |
if original_image_state is None: | |
return None, None | |
# Get user's temp directory from stored frame data | |
try: | |
frame_data = json.loads(original_image_state) | |
temp_dir = frame_data.get('temp_dir', 'temp') | |
except: | |
temp_dir = 'temp' # Fallback | |
mask_files = glob.glob(os.path.join(temp_dir, "*.png")) | |
if not mask_files: | |
mask_files = [None] | |
video_files = glob.glob(os.path.join(temp_dir, "*.mp4")) | |
if not video_files: | |
return None, None | |
video_path = video_files[0] | |
html_path, track2d_video = run_tracker_and_save(video_path, mask_files[0], grid_size, vo_points, fps, temp_dir) | |
# iframe src through HTTP | |
iframe_html = f""" | |
<div style='border: 3px solid #3b82f6; border-radius: 10px; overflow: hidden; box-shadow: 0 8px 32px rgba(59, 130, 246, 0.3);'> | |
<iframe id="viz_iframe" src="/gradio_api/file={html_path}" width="100%" height="950px" style="border:none;"></iframe> | |
</div> | |
""" | |
return iframe_html, track2d_video | |
def clear_all(): | |
"""Clear all buffers and temporary files - simplified for Spaces""" | |
return None, None, [] | |
# Build UI | |
with gr.Blocks(css=""" | |
#advanced_settings .wrap { | |
font-size: 14px !important; | |
} | |
#advanced_settings .gr-slider { | |
font-size: 13px !important; | |
} | |
#advanced_settings .gr-slider .gr-label { | |
font-size: 13px !important; | |
margin-bottom: 5px !important; | |
} | |
#advanced_settings .gr-slider .gr-info { | |
font-size: 12px !important; | |
} | |
#point_label_radio .gr-radio-group { | |
flex-direction: row !important; | |
gap: 15px !important; | |
} | |
#point_label_radio .gr-radio-group label { | |
margin-right: 0 !important; | |
margin-bottom: 0 !important; | |
} | |
/* Style for example videos label */ | |
.gr-examples .gr-label { | |
font-weight: bold !important; | |
font-size: 16px !important; | |
} | |
/* Simple horizontal scroll for examples */ | |
.gr-examples .gr-table-wrapper { | |
overflow-x: auto !important; | |
overflow-y: hidden !important; | |
} | |
.gr-examples .gr-table { | |
display: flex !important; | |
flex-wrap: nowrap !important; | |
min-width: max-content !important; | |
} | |
.gr-examples .gr-table tbody { | |
display: flex !important; | |
flex-direction: row !important; | |
flex-wrap: nowrap !important; | |
} | |
.gr-examples .gr-table tbody tr { | |
display: flex !important; | |
flex-direction: column !important; | |
min-width: 150px !important; | |
margin-right: 10px !important; | |
} | |
.gr-examples .gr-table tbody tr td { | |
text-align: center !important; | |
padding: 5px !important; | |
} | |
""") as demo: | |
# Initialize states inside Blocks - remove predictor from State since it can't be pickled | |
selected_points = gr.State([]) | |
original_image_state = gr.State() # Store original image in state | |
with gr.Row(): | |
gr.Markdown(""" | |
# ✨ SpatialTrackerV2 | |
<div style='background-color: #eff6ff; padding: 20px; border-radius: 10px; margin: 10px 0;'> | |
<p style='font-size: 22px;'>Welcome to <a href="https://github.com/henry123-boy/SpaTrack2/tree/v2_release" target="_blank" style="color: #3b82f6;">SpatialTracker V2</a>! This interface allows you to track any pixels in 3D using our model.</p> | |
<h2 style='color: #1d4ed8; margin-bottom: 15px;'>Instructions:</h2> | |
<ol style='font-size: 20px; line-height: 1.6;'> | |
<li>🎬 Upload a video or select from examples below</li> | |
<li>🎯 Add a segmentation mask by selecting positive points (green) and negative points (red) on the first frame</li> | |
<li>⚡ Click 'Run Tracker and Visualize' when done</li> | |
<li>🔍 The reconstructed dynamic 3D scene with point tracks will be shown on the right. The 2D tracking result is also shown on the left.</li> | |
</ol> | |
<p style='font-size: 22px;'>❗ We limit the max number of frames to 80 in Huggingface Spaces</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
video_input = gr.Video(label="Upload Video", format="mp4", height=300) | |
# Move Interactive Frame and 2D Tracking under video upload | |
with gr.Row(): | |
display_image = gr.Image(type="numpy", label="📸 Interactive Frame", height=250) | |
track_video = gr.Video(label="🎯 2D Tracking Result", height=250) | |
with gr.Row(): | |
fg_bg_radio = gr.Radio(choices=['positive_point', 'negative_point'], | |
label='Point label', | |
value='positive_point', | |
elem_id="point_label_radio") | |
reset_button = gr.Button("Reset points") | |
clear_button = gr.Button("Clear All", variant="secondary") | |
with gr.Accordion("⚙️ Advanced Settings", open=False, elem_id="advanced_settings"): | |
grid_size = gr.Slider(minimum=10, maximum=100, value=50, step=1, | |
label="Grid Size", info="Size of the tracking grid") | |
vo_points = gr.Slider(minimum=256, maximum=4096, value=756, step=50, | |
label="VO Points", info="Number of points for solving camera pose") | |
fps_slider = gr.Slider(minimum=1, maximum=10, value=2, step=1, | |
label="FPS", info="FPS of the output video") | |
viz_button = gr.Button("🚀 Run Tracker and Visualize", variant="primary", size="lg") | |
with gr.Column(scale=2): | |
# Add example videos using gr.Examples | |
examples_component = gr.Examples( | |
examples=[ | |
"examples/robot_1.mp4", | |
"examples/robot_2.mp4", | |
"examples/robot_3.mp4", | |
"examples/kiss.mp4", | |
"examples/backpack.mp4", | |
"examples/kitchen.mp4", | |
"examples/pillow.mp4", | |
"examples/biker.mp4", | |
"examples/running.mp4", | |
"examples/drifting.mp4", | |
"examples/skate_sunset.mp4", | |
"examples/dancer.mp4", | |
"examples/ego_kc1.mp4", | |
"examples/vertical_place.mp4", | |
"examples/droid_robot.mp4" | |
], | |
inputs=[video_input], | |
label="📁 Example Videos", | |
examples_per_page=20 # Show all examples on one page to enable scrolling | |
) | |
# Initialize with the template interface showing "Interactive 3D Tracking" | |
viz_iframe = gr.HTML(""" | |
<div style='border: 3px solid #3b82f6; border-radius: 10px; overflow: hidden; box-shadow: 0 8px 32px rgba(59, 130, 246, 0.3);'> | |
<iframe id="viz_iframe" src="/gradio_api/file=_viz/viz_template.html" width="100%" height="950px" style="border:none;"></iframe> | |
</div> | |
""") | |
# Simple description below the visualization | |
gr.HTML(""" | |
<div style='text-align: center; margin-top: 15px; color: #666; font-size: 14px;'> | |
🎮 Interactive 3D visualization adapted from <a href="https://tapip3d.github.io/" target="_blank" style="color: #3b82f6;">TAPIP3D</a> | |
</div> | |
""") | |
# Function to handle both manual upload and example selection | |
def handle_video_change(video): | |
"""Handle video change from both manual upload and example selection""" | |
if video is None: | |
return None, None, [], 50, 756, 3 | |
# Handle video upload (extract first frame) | |
original_image_state, display_image, selected_points = handle_video_upload(video) | |
# Check if this is an example video and update settings accordingly | |
video_path = video if isinstance(video, str) else video.name | |
video_name = os.path.splitext(os.path.basename(video_path))[0] | |
# Check if this video is in our examples list | |
is_example = False | |
for config in EXAMPLE_VIDEOS: | |
if config["name"] == video_name: | |
is_example = True | |
grid_size_val, vo_points_val, fps_val = config["grid_size"], config["vo_points"], config["fps"] | |
break | |
# If not an example video, keep current/default settings | |
if not is_example: | |
grid_size_val, vo_points_val, fps_val = 50, 756, 3 | |
return original_image_state, display_image, selected_points, grid_size_val, vo_points_val, fps_val | |
# Bind events | |
video_input.change( | |
handle_video_change, | |
inputs=[video_input], | |
outputs=[original_image_state, display_image, selected_points, grid_size, vo_points, fps_slider] | |
) | |
reset_button.click(reset_points, | |
inputs=[original_image_state, selected_points], | |
outputs=[display_image, selected_points]) | |
clear_button.click(clear_all, | |
outputs=[video_input, display_image, selected_points]) | |
display_image.select(select_point, | |
inputs=[original_image_state, selected_points, fg_bg_radio], | |
outputs=[display_image, selected_points]) | |
viz_button.click(launch_viz, | |
inputs=[grid_size, vo_points, fps_slider, original_image_state], | |
outputs=[viz_iframe, track_video], | |
) | |
# Launch the demo with simplified parameters for Hugging Face Spaces | |
demo.launch() | |
# Remove the demo.launch() call from outside the Blocks context | |
# demo.launch(debug=True, share=False) # Enable debug mode and sharing | |