Spaces:
Running
Running
import modal | |
from fastapi import FastAPI, UploadFile, File, Body, Query | |
from fastapi.responses import JSONResponse | |
web_app = FastAPI(title="MCP Video Analysis API") | |
import os | |
import tempfile | |
import io # Used by Whisper for BytesIO | |
import httpx # For downloading videos from URLs | |
from typing import Optional, List, Dict, Any | |
import json | |
import hashlib | |
from fastapi.responses import JSONResponse | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
import re # For parsing search results | |
import yt_dlp | |
import asyncio # For concurrent video processing | |
import gradio as gr | |
# Global Configuration (should be at the top of the file) | |
WHISPER_MODEL_NAME = "openai/whisper-large-v3" # Use latest Whisper model | |
CAPTION_MODEL_NAME = "microsoft/xclip-base-patch16" # For SpaceTimeGPT alternative | |
CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder | |
# CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) | |
ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" | |
ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) | |
OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" | |
OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" | |
# --- Modal Image Definition --- | |
video_analysis_image_v2 = ( | |
modal.Image.debian_slim(python_version="3.10") | |
.apt_install("ffmpeg") | |
.pip_install( | |
"gradio==3.50.2", # Pin Gradio version for stability | |
"transformers[torch]", # For all Hugging Face models and PyTorch | |
"soundfile", # For Whisper | |
"av", # For video frame extraction | |
"Pillow", # For image processing | |
"timm", # Often a dependency for vision models | |
"torchvision", | |
"torchaudio", | |
"fastapi[standard]", # For web endpoints | |
"pydantic", | |
"yt-dlp", # For request body validation | |
"httpx", # For downloading video from URL | |
"cowsay==6.1" # Cache-busting package | |
) | |
) | |
# --- Modal App Definition --- | |
app = modal.App(name="video-analysis-gradio-pipeline") # New app name, using App | |
# --- Pydantic model for web endpoint request --- | |
class VideoAnalysisRequestPayload(BaseModel): | |
video_url: Optional[str] = None | |
class TopicAnalysisRequest(BaseModel): | |
topic: str | |
max_videos: int = Query(3, ge=1, le=10) # Default 3, min 1, max 10 videos | |
# --- Constants for Model Names --- | |
# WHISPER_MODEL_NAME = "openai/whisper-large-v3" | |
CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT" | |
CAPTION_PROCESSOR_NAME = "Neleac/SpaceTimeGPT" # Use processor from SpaceTimeGPT itself | |
# # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor) | |
# ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics" | |
# ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME) | |
# OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50" | |
# OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50" | |
# --- Modal Distributed Dictionary for Caching --- | |
video_analysis_cache = modal.Dict.from_name("video_analysis_cache", create_if_missing=True) | |
# --- Hugging Face Token Secret --- | |
HF_TOKEN_SECRET = modal.Secret.from_name("my-huggingface-secret") | |
# --- Helper: Hugging Face Login --- | |
def _login_to_hf(): | |
import os | |
from huggingface_hub import login | |
hf_token = os.environ.get("HF_TOKEN") | |
if hf_token: | |
try: | |
login(token=hf_token) | |
print("Successfully logged into Hugging Face Hub.") | |
return True | |
except Exception as e: | |
print(f"Hugging Face Hub login failed: {e}") | |
return False | |
else: | |
print("HF_TOKEN secret not found. Some models might fail to load.") | |
return False | |
# === 1. Transcription with Whisper === | |
def transcribe_video_with_whisper(video_bytes: bytes) -> str: | |
_login_to_hf() | |
import torch | |
from transformers import pipeline | |
import soundfile as sf | |
import av # For robust audio extraction | |
import numpy as np | |
import io | |
print("[Whisper] Starting transcription.") | |
temp_audio_path = None | |
try: | |
# Robust audio extraction using PyAV | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
tmp_video_file.write(video_bytes) | |
video_path = tmp_video_file.name | |
container = av.open(video_path) | |
audio_stream = next((s for s in container.streams if s.type == 'audio'), None) | |
if audio_stream is None: | |
return "Whisper Error: No audio stream found in video." | |
# Decode and resample audio to 16kHz mono WAV | |
# Store resampled audio in a temporary WAV file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_file_for_sf: | |
temp_audio_path = tmp_audio_file_for_sf.name | |
output_container = av.open(temp_audio_path, mode='w') | |
output_stream = output_container.add_stream('pcm_s16le', rate=16000, layout='mono') | |
for frame in container.decode(audio_stream): | |
for packet in output_stream.encode(frame): | |
output_container.mux(packet) | |
# Flush stream | |
for packet in output_stream.encode(): | |
output_container.mux(packet) | |
output_container.close() | |
container.close() | |
os.remove(video_path) # Clean up temp video file | |
pipe = pipeline( | |
"automatic-speech-recognition", | |
model=WHISPER_MODEL_NAME, | |
torch_dtype=torch.float16, | |
device="cuda:0" if torch.cuda.is_available() else "cpu", | |
) | |
print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...") | |
# Add robust error handling for the Whisper model | |
try: | |
outputs = pipe(temp_audio_path, chunk_length_s=30, stride_length_s=5, batch_size=8, generate_kwargs={"language": "english"}, return_timestamps=False) | |
except Exception as whisper_err: | |
print(f"[Whisper] Error during transcription: {whisper_err}") | |
# Try again with different settings if the first attempt failed | |
print(f"[Whisper] Attempting fallback transcription with smaller chunk size...") | |
outputs = pipe(temp_audio_path, chunk_length_s=10, stride_length_s=2, batch_size=4, generate_kwargs={"language": "english"}, return_timestamps=False) | |
transcription = outputs["text"] | |
print(f"[Whisper] Transcription successful: {transcription[:100]}...") | |
return transcription | |
except Exception as e: | |
print(f"[Whisper] Error: {e}") | |
import traceback | |
traceback.print_exc() | |
return f"Whisper Error: {str(e)}" | |
finally: | |
if temp_audio_path and os.path.exists(temp_audio_path): | |
os.remove(temp_audio_path) | |
if 'video_path' in locals() and video_path and os.path.exists(video_path): | |
os.remove(video_path) # Ensure temp video is cleaned up if audio extraction failed early | |
# === 2. Captioning with SpaceTimeGPT === | |
def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str: | |
_login_to_hf() | |
import torch | |
from transformers import AutoProcessor, AutoModelForVision2Seq | |
import av | |
import numpy as np | |
import tempfile | |
print("[SpaceTimeGPT] Starting captioning.") | |
video_path = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
tmp_video_file.write(video_bytes) | |
video_path = tmp_video_file.name | |
container = av.open(video_path) | |
video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
if video_stream is None: | |
return "SpaceTimeGPT Error: No video stream found." | |
num_frames_to_sample = 16 | |
total_frames = video_stream.frames | |
if total_frames == 0: return "SpaceTimeGPT Error: Video has no frames." | |
indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) | |
frames = [] | |
for i in indices: | |
container.seek(int(i), stream=video_stream) | |
frame = next(container.decode(video_stream)) | |
frames.append(frame.to_rgb().to_ndarray()) | |
container.close() | |
video_frames_np = np.stack(frames) | |
processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True) | |
# Debug prints | |
print(f"[SpaceTimeGPT] DEBUG: CAPTION_MODEL_NAME is {CAPTION_MODEL_NAME}") | |
print(f"[SpaceTimeGPT] DEBUG: Intending to use model class: {AutoModelForVision2Seq.__name__}") | |
print(f"[SpaceTimeGPT] DEBUG: Type of model class object: {type(AutoModelForVision2Seq)}") | |
model = AutoModelForVision2Seq.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True) | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
model.to(device) | |
if hasattr(processor, 'tokenizer'): # Check if tokenizer exists | |
processor.tokenizer.padding_side = "right" | |
print("[SpaceTimeGPT] Model and processor loaded. Generating captions...") | |
inputs = processor(text=None, videos=list(video_frames_np), return_tensors="pt", padding=True).to(device) | |
generated_ids = model.generate(**inputs, max_new_tokens=128) | |
captions = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() | |
print(f"[SpaceTimeGPT] Captioning successful: {captions}") | |
return captions | |
except Exception as e: | |
print(f"[SpaceTimeGPT] Error: {e}") | |
import traceback | |
traceback.print_exc() | |
return f"SpaceTimeGPT Error: {str(e)}" | |
finally: | |
if video_path and os.path.exists(video_path): | |
os.remove(video_path) | |
# === 3. Action Recognition with VideoMAE === | |
def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]: | |
_login_to_hf() | |
import torch | |
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification | |
import av | |
import numpy as np | |
import tempfile | |
print("[VideoMAE] Starting action recognition.") | |
video_path = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
tmp_video_file.write(video_bytes) | |
video_path = tmp_video_file.name | |
container = av.open(video_path) | |
video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
if video_stream is None: | |
return [{"error": "VideoMAE Error: No video stream found."}] | |
num_frames_to_sample = 16 | |
total_frames = video_stream.frames | |
if total_frames == 0: return [{"error": "VideoMAE Error: Video has no frames."}] | |
indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int) | |
video_frames_list = [] | |
for i in indices: | |
container.seek(int(i), stream=video_stream) | |
frame = next(container.decode(video_stream)) | |
video_frames_list.append(frame.to_rgb().to_ndarray()) | |
container.close() | |
processor = VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME) | |
model = VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME) | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
model.to(device) | |
print("[VideoMAE] Model and processor loaded. Classifying actions...") | |
inputs = processor(video_frames_list, return_tensors="pt").to(device) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
top_k = 5 | |
probabilities = torch.softmax(logits, dim=-1) | |
top_probs, top_indices = torch.topk(probabilities, top_k) | |
results = [] | |
for i in range(top_k): | |
label = model.config.id2label[top_indices[0, i].item()] | |
score = top_probs[0, i].item() | |
results.append({"action": label, "confidence": round(score, 4)}) | |
print(f"[VideoMAE] Action recognition successful: {results}") | |
return results | |
except Exception as e: | |
print(f"[VideoMAE] Error: {e}") | |
import traceback | |
traceback.print_exc() | |
return [{"error": f"VideoMAE Error: {str(e)}"}] | |
finally: | |
if video_path and os.path.exists(video_path): | |
os.remove(video_path) | |
# === 4. Object Detection with DETR === | |
def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]: | |
_login_to_hf() | |
import torch | |
from transformers import DetrImageProcessor, DetrForObjectDetection | |
from PIL import Image # Imported but not directly used, av.frame.to_image() is used | |
import av | |
import numpy as np | |
import tempfile | |
print("[DETR] Starting object detection.") | |
video_path = None | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file: | |
tmp_video_file.write(video_bytes) | |
video_path = tmp_video_file.name | |
container = av.open(video_path) | |
video_stream = next((s for s in container.streams if s.type == 'video'), None) | |
if video_stream is None: | |
return [{"error": "DETR Error: No video stream found."}] | |
num_frames_to_extract = 3 | |
total_frames = video_stream.frames | |
if total_frames == 0: return [{"error": "DETR Error: Video has no frames."}] | |
frame_indices = np.linspace(0, total_frames - 1, num_frames_to_extract, dtype=int) | |
processor = DetrImageProcessor.from_pretrained(OBJECT_DETECTION_PROCESSOR_NAME) | |
model = DetrForObjectDetection.from_pretrained(OBJECT_DETECTION_MODEL_NAME) | |
device = "cuda:0" if torch.cuda.is_available() else "cpu" | |
model.to(device) | |
print("[DETR] Model and processor loaded.") | |
all_frame_detections = [] | |
for frame_num, target_frame_index in enumerate(frame_indices): | |
container.seek(int(target_frame_index), stream=video_stream) | |
frame = next(container.decode(video_stream)) | |
pil_image = frame.to_image() | |
print(f"[DETR] Processing frame {frame_num + 1}/{num_frames_to_extract} (original index {target_frame_index})...") | |
inputs = processor(images=pil_image, return_tensors="pt").to(device) | |
outputs = model(**inputs) | |
target_sizes = torch.tensor([pil_image.size[::-1]], device=device) | |
results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0] | |
frame_detections = [] | |
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]): | |
frame_detections.append({ | |
"label": model.config.id2label[label.item()], | |
"confidence": round(score.item(), 3), | |
"box": [round(coord) for coord in box.tolist()] | |
}) | |
if frame_detections: # Only add if detections are present for this frame | |
all_frame_detections.append({ | |
"frame_number": frame_num + 1, | |
"original_frame_index": int(target_frame_index), | |
"detections": frame_detections | |
}) | |
container.close() | |
print(f"[DETR] Object detection successful: {all_frame_detections if all_frame_detections else 'No objects detected with threshold.'}") | |
return all_frame_detections if all_frame_detections else [{"info": "No objects detected with current threshold."}] | |
except Exception as e: | |
print(f"[DETR] Error: {e}") | |
import traceback | |
traceback.print_exc() | |
return [{"error": f"DETR Error: {str(e)}"}] | |
finally: | |
if video_path and os.path.exists(video_path): | |
os.remove(video_path) | |
# === 5. Comprehensive Video Analysis (Orchestrator) === | |
async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]: | |
print("[Orchestrator] Starting comprehensive video analysis.") | |
cache_key = hashlib.sha256(video_bytes).hexdigest() | |
try: | |
cached_result = video_analysis_cache.get(cache_key) | |
if cached_result: | |
print(f"[Orchestrator] Cache hit for key: {cache_key}") | |
return cached_result | |
except Exception as e: | |
# Log error but proceed with analysis if cache get fails | |
print(f"[Orchestrator] Cache GET error: {e}. Proceeding with fresh analysis.") | |
print(f"[Orchestrator] Cache miss for key: {cache_key}. Performing full analysis.") | |
results = {} | |
print("[Orchestrator] Calling transcription...") | |
try: | |
# .call() is synchronous in the context of the Modal function execution | |
results["transcription"] = transcribe_video_with_whisper.remote(video_bytes) | |
except Exception as e: | |
print(f"[Orchestrator] Error in transcription: {e}") | |
results["transcription"] = f"Transcription Error: {str(e)}" | |
print("[Orchestrator] Calling captioning...") | |
try: | |
results["caption"] = generate_captions_with_spacetimegpt.remote(video_bytes) | |
except Exception as e: | |
print(f"[Orchestrator] Error in captioning: {e}") | |
results["caption"] = f"Captioning Error: {str(e)}" | |
print("[Orchestrator] Calling action recognition...") | |
try: | |
results["actions"] = generate_action_labels.remote(video_bytes) | |
except Exception as e: | |
print(f"[Orchestrator] Error in action recognition: {e}") | |
results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error | |
print("[Orchestrator] Calling object detection...") | |
try: | |
results["objects"] = generate_object_detection.remote(video_bytes) | |
except Exception as e: | |
print(f"[Orchestrator] Error in object detection: {e}") | |
results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error | |
print("[Orchestrator] All analyses attempted. Storing results in cache.") | |
try: | |
video_analysis_cache.put(cache_key, results) | |
print(f"[Orchestrator] Successfully cached results for key: {cache_key}") | |
except Exception as e: | |
print(f"[Orchestrator] Cache PUT error: {e}") | |
return results | |
# === FastAPI Endpoint for Video Analysis === | |
def process_video_analysis(payload: VideoAnalysisRequestPayload): | |
"""FastAPI endpoint for comprehensive video analysis.""" | |
print(f"[FastAPI Endpoint] Received request for video analysis") | |
video_url = payload.video_url | |
if not video_url: | |
return JSONResponse(status_code=400, content={"error": "video_url must be provided in JSON payload."}) | |
print(f"[FastAPI Endpoint] Processing video_url: {video_url}") | |
try: | |
# Download video using yt-dlp with enhanced options for robustness | |
import yt_dlp | |
import tempfile | |
import os | |
import subprocess | |
import shutil | |
video_bytes = None | |
with tempfile.TemporaryDirectory() as tmpdir: | |
output_base = os.path.join(tmpdir, 'video') | |
output_path = output_base + '.mp4' | |
# Enhanced yt-dlp options for more reliable downloads | |
ydl_opts = { | |
# Request specific formats in priority order | |
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
'outtmpl': output_base, | |
'quiet': False, # Temporarily enable output for debugging | |
'verbose': True, # More verbose output to diagnose issues | |
'no_warnings': False, # Show warnings for debugging | |
'noplaylist': True, | |
# Force remux to ensure valid container | |
'merge_output_format': 'mp4', | |
# Add postprocessors to ensure valid MP4 | |
'postprocessors': [{ | |
'key': 'FFmpegVideoConvertor', | |
'preferedformat': 'mp4', | |
'postprocessor_args': ['-movflags', '+faststart'], | |
}], | |
# Force ffmpeg to create a valid MP4 with moov atom at the beginning | |
'prefer_ffmpeg': True, | |
'http_headers': { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' | |
}, | |
} | |
try: | |
print(f"[FastAPI Endpoint] Downloading video with enhanced yt-dlp options from {video_url}") | |
download_success = False | |
# Try yt-dlp first | |
try: | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([video_url]) | |
# Find the actual output file (might have a different extension) | |
downloaded_files = [f for f in os.listdir(tmpdir) if f.startswith('video')] | |
if downloaded_files: | |
actual_file = os.path.join(tmpdir, downloaded_files[0]) | |
print(f"[FastAPI Endpoint] Found downloaded file: {actual_file}") | |
download_success = True | |
except Exception as e: | |
print(f"[FastAPI Endpoint] yt-dlp download failed: {e}. Trying direct download...") | |
# Fallback to direct download if it's a direct video URL | |
if not download_success and (video_url.endswith('.mp4') or 'commondatastorage.googleapis.com' in video_url): | |
import requests | |
try: | |
print(f"[FastAPI Endpoint] Attempting direct download for {video_url}") | |
actual_file = os.path.join(tmpdir, 'direct_video.mp4') | |
with requests.get(video_url, stream=True) as r: | |
r.raise_for_status() | |
with open(actual_file, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"[FastAPI Endpoint] Direct download successful: {actual_file}") | |
download_success = True | |
except Exception as e: | |
print(f"[FastAPI Endpoint] Direct download failed: {e}") | |
# For testing: Try a sample video if all downloads failed (Big Buck Bunny) | |
if not download_success: | |
test_url = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4" | |
print(f"[FastAPI Endpoint] All downloads failed. Falling back to sample video: {test_url}") | |
import requests | |
try: | |
actual_file = os.path.join(tmpdir, 'fallback_video.mp4') | |
with requests.get(test_url, stream=True) as r: | |
r.raise_for_status() | |
with open(actual_file, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"[FastAPI Endpoint] Fallback download successful") | |
download_success = True | |
except Exception as e: | |
print(f"[FastAPI Endpoint] Even fallback download failed: {e}") | |
raise Exception("All download methods failed") | |
# Ensure it's a properly formatted MP4 using ffmpeg directly | |
final_output = os.path.join(tmpdir, 'final_video.mp4') | |
try: | |
# Use ffmpeg to re-encode the file, ensuring proper moov atom placement | |
print(f"[FastAPI Endpoint] Reprocessing with ffmpeg to ensure valid MP4 format") | |
subprocess.run( | |
["ffmpeg", "-i", actual_file, "-c:v", "copy", "-c:a", "copy", "-movflags", "faststart", final_output], | |
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
) | |
if os.path.exists(final_output) and os.path.getsize(final_output) > 0: | |
with open(final_output, 'rb') as f: | |
video_bytes = f.read() | |
print(f"[FastAPI Endpoint] Successfully reprocessed video, size: {len(video_bytes)} bytes") | |
else: | |
print(f"[FastAPI Endpoint] ffmpeg reprocessing failed to produce valid output") | |
except subprocess.SubprocessError as se: | |
print(f"[FastAPI Endpoint] ffmpeg reprocessing failed: {se}") | |
# If ffmpeg fails, try with the original file | |
if os.path.exists(actual_file) and os.path.getsize(actual_file) > 0: | |
with open(actual_file, 'rb') as f: | |
video_bytes = f.read() | |
print(f"[FastAPI Endpoint] Using original download, size: {len(video_bytes)} bytes") | |
except yt_dlp.utils.DownloadError: | |
# Fallback to httpx for direct links if yt-dlp fails | |
print(f"[FastAPI Endpoint] yt-dlp failed, falling back to httpx for {video_url}") | |
try: | |
import httpx | |
with httpx.Client() as client: | |
response = client.get(video_url, follow_redirects=True, timeout=60.0) | |
response.raise_for_status() | |
video_bytes = response.content | |
except httpx.RequestError as he: | |
return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL using both yt-dlp and httpx. Details: {he}"}) | |
if not video_bytes: | |
return JSONResponse(status_code=400, content={"error": f"Downloaded video from URL {video_url} is empty or download failed."}) | |
print(f"[FastAPI Endpoint] Successfully downloaded and validated {len(video_bytes)} bytes from {video_url} using enhanced downloader.") | |
# Call comprehensive analysis | |
analysis_results = analyze_video_comprehensive.remote(video_bytes) | |
print("[FastAPI Endpoint] Comprehensive analysis finished.") | |
return JSONResponse(status_code=200, content=analysis_results) | |
except httpx.RequestError as e: | |
print(f"[FastAPI Endpoint] httpx.RequestError downloading video: {e}") | |
return JSONResponse(status_code=400, content={"error": f"Error downloading video from URL: {video_url}. Details: {str(e)}"}) | |
except Exception as e: | |
print(f"[FastAPI Endpoint] Unexpected Exception during analysis: {e}") | |
return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"}) | |
# === FastAPI Endpoint for Topic Analysis === | |
async def handle_analyze_topic_request(request: TopicAnalysisRequest): | |
""" | |
Handles a request to analyze videos based on a topic. | |
1. Finds video URLs for the topic using YouTube search. | |
2. Concurrently analyzes these videos. | |
3. Returns aggregated results. | |
""" | |
print(f"[TopicAPI] Received request to analyze topic: '{request.topic}', max_videos: {request.max_videos}") | |
try: | |
# Use .aio for async call if the Modal function is async, or just .remote if it's sync | |
# Assuming find_video_urls_for_topic is sync as defined, but can be called with .remote() | |
# If find_video_urls_for_topic itself becomes async, then .remote.aio() is appropriate. | |
# For now, let's assume it's called as a standard remote Modal function. | |
video_urls = await find_video_urls_for_topic.remote.aio(request.topic, request.max_videos) | |
if not video_urls: | |
print(f"[TopicAPI] No video URLs found for topic: '{request.topic}'") | |
return JSONResponse( | |
status_code=404, | |
content={ | |
"status": "error", | |
"message": "No videos found for the specified topic.", | |
"topic": request.topic, | |
"details": "The YouTube search did not return any relevant video URLs." | |
} | |
) | |
print(f"[TopicAPI] Found {len(video_urls)} URLs for topic '{request.topic}', proceeding to analysis.") | |
# analyze_videos_by_topic is an async Modal function, so use .remote.aio() | |
analysis_results = await analyze_videos_by_topic.remote.aio(video_urls, request.topic) | |
print(f"[TopicAPI] Successfully analyzed videos for topic: '{request.topic}'") | |
return analysis_results | |
except Exception as e: | |
print(f"[TopicAPI] Error during topic analysis for '{request.topic}': {e}") | |
import traceback | |
traceback.print_exc() | |
return JSONResponse( | |
status_code=500, | |
content={ | |
"status": "error", | |
"message": "An internal server error occurred during topic analysis.", | |
"topic": request.topic, | |
"error_details_str": str(e) # Keep it simple for JSON | |
} | |
) | |
# === 6. Topic-Based Video Search === | |
def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]: | |
"""Finds video URLs (YouTube) for a given topic using yt-dlp.""" | |
print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}") | |
video_urls = [] | |
try: | |
# Add a common user-agent to avoid getting blocked | |
# Let yt-dlp find ffmpeg in the PATH instead of hardcoding it | |
ydl_opts = { | |
'quiet': True, | |
'extract_flat': 'discard_in_playlist', | |
'force_generic_extractor': False, | |
'default_search': f"ytsearch{max_results}", | |
'noplaylist': True, | |
'prefer_ffmpeg': True, | |
'http_headers': { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' | |
} | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
# extract_info with a search query like 'ytsearchN:query' returns a playlist dictionary | |
search_result = ydl.extract_info(topic, download=False) | |
if search_result and 'entries' in search_result: | |
for entry in search_result['entries']: | |
# Ensure entry is a dictionary and has 'webpage_url' | |
if isinstance(entry, dict) and entry.get('webpage_url'): | |
video_urls.append(entry['webpage_url']) | |
# yt-dlp search might return more than max_results, so we cap it here | |
if len(video_urls) >= max_results: | |
break | |
# Sometimes a single video result might not be in 'entries' | |
elif isinstance(search_result, dict) and search_result.get('webpage_url'): | |
video_urls.append(search_result['webpage_url']) | |
# Ensure we don't exceed max_results if the loop didn't break early enough | |
video_urls = video_urls[:max_results] | |
print(f"[TopicSearch] Found {len(video_urls)} video URLs for topic '{topic}': {video_urls}") | |
except Exception as e: | |
print(f"[TopicSearch] Error finding videos for topic '{topic}': {e}") | |
import traceback | |
traceback.print_exc() | |
return video_urls | |
# Helper function (not a Modal function) to extract video URLs from search results | |
def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]: | |
"""Extracts video URLs from a list of search result dictionaries.""" | |
video_urls = [] | |
seen_urls = set() | |
# Regex for YouTube, Vimeo, and common video file extensions | |
# Simplified YouTube regex to catch most common video and shorts links | |
youtube_regex = r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch\?v=|embed/|shorts/)|youtu\.be/)([a-zA-Z0-9_-]{11})" | |
vimeo_regex = r"(?:https?://)?(?:www\.)?vimeo\.com/(\d+)" | |
direct_video_regex = r"https?://[^\s]+\.(mp4|mov|avi|webm|mkv)(\?[^\s]*)?" | |
patterns = [ | |
re.compile(youtube_regex), | |
re.compile(vimeo_regex), | |
re.compile(direct_video_regex) | |
] | |
for item in search_results: | |
url = item.get("link") or item.get("url") # Common keys for URL in search results | |
if not url: | |
continue | |
for pattern in patterns: | |
match = pattern.search(url) | |
if match: | |
# Reconstruct canonical YouTube URL if it's a short link or embed | |
if pattern.pattern == youtube_regex and match.group(1): | |
normalized_url = f"https://www.youtube.com/watch?v={match.group(1)}" | |
else: | |
normalized_url = url | |
if normalized_url not in seen_urls: | |
video_urls.append(normalized_url) | |
seen_urls.add(normalized_url) | |
if len(video_urls) >= max_urls: | |
break | |
if len(video_urls) >= max_urls: | |
break | |
# === 7. Topic-Based Video Analysis Orchestrator === | |
async def _analyze_video_worker(video_url: str) -> dict: | |
""" | |
Worker function to download a video from a URL and run comprehensive analysis. | |
This is designed to be called concurrently. | |
""" | |
print(f"[Worker] Starting analysis for {video_url}") | |
try: | |
async with httpx.AsyncClient() as client: | |
print(f"[Worker] Downloading video from {video_url}") | |
response = await client.get(video_url, follow_redirects=True, timeout=60.0) | |
response.raise_for_status() | |
video_bytes = await response.aread() | |
print(f"[Worker] Downloaded {len(video_bytes)} bytes from {video_url}") | |
if not video_bytes: | |
raise ValueError("Downloaded video content is empty.") | |
analysis_result = await analyze_video_comprehensive.coro(video_bytes) | |
if isinstance(analysis_result, dict) and any("error" in str(v).lower() for v in analysis_result.values()): | |
print(f"[Worker] Comprehensive analysis for {video_url} reported errors: {analysis_result}") | |
return {"url": video_url, "status": "error", "error_type": "analysis_error", "details": analysis_result} | |
else: | |
return {"url": video_url, "status": "success", "analysis": analysis_result} | |
except httpx.HTTPStatusError as e: | |
print(f"[Worker] HTTP error downloading {video_url}: {e}") | |
return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"HTTP {e.response.status_code}"} | |
except httpx.RequestError as e: | |
print(f"[Worker] Request error downloading {video_url}: {e}") | |
return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"Failed to download: {str(e)}"} | |
except Exception as e: | |
print(f"[Worker] Error processing video {video_url}: {e}") | |
import traceback | |
return {"url": video_url, "status": "error", "error_type": "processing_error", "details": str(e), "traceback": traceback.format_exc()[:1000]} | |
async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]: | |
"""Analyzes a list of videos (by URL) concurrently and aggregates results for a topic.""" | |
print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).") | |
results_aggregator = { | |
"topic": topic, | |
"analyzed_videos": [], | |
"errors": [] | |
} | |
if not video_urls: | |
results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."}) | |
return results_aggregator | |
# Use .map to run the worker function concurrently on all video URLs | |
# The list() call forces the generator to execute and retrieve all results. | |
individual_results = list(_analyze_video_worker.map(video_urls)) | |
for result in individual_results: | |
if isinstance(result, dict): | |
if result.get("status") == "error": | |
results_aggregator["errors"].append(result) | |
else: | |
results_aggregator["analyzed_videos"].append(result) | |
else: | |
# This case handles unexpected return types from the worker, like exceptions | |
print(f"[TopicAnalysis] Received an unexpected result type from worker: {type(result)}") | |
results_aggregator["errors"].append({"url": "unknown", "error_type": "unexpected_result", "details": str(result)}) | |
print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.") | |
return results_aggregator | |
# === Gradio Interface === | |
def video_analyzer_gradio_ui(): | |
print("[Gradio] UI function called to define interface.") | |
def analyze_video_all_models(video_filepath): | |
print(f"[Gradio] Received video filepath for analysis: {video_filepath}") | |
if not video_filepath or not os.path.exists(video_filepath): | |
return "Error: Video file path is invalid or does not exist.", "", "[]", "[]" | |
with open(video_filepath, "rb") as f: | |
video_bytes_content = f.read() | |
print(f"[Gradio] Read {len(video_bytes_content)} bytes from video path: {video_filepath}") | |
if not video_bytes_content: | |
return "Error: Could not read video bytes.", "", "[]", "[]" | |
print("[Gradio] Calling Whisper...") | |
transcription = transcribe_video_with_whisper.call(video_bytes_content) | |
print(f"[Gradio] Whisper result length: {len(transcription)}") | |
print("[Gradio] Calling SpaceTimeGPT...") | |
captions = generate_captions_with_spacetimegpt.call(video_bytes_content) | |
print(f"[Gradio] SpaceTimeGPT result: {captions}") | |
print("[Gradio] Calling VideoMAE...") | |
action_labels = generate_action_labels.call(video_bytes_content) | |
print(f"[Gradio] VideoMAE result: {action_labels}") | |
print("[Gradio] Calling DETR...") | |
object_detections = generate_object_detection.call(video_bytes_content) | |
print(f"[Gradio] DETR result: {object_detections}") | |
return transcription, captions, str(action_labels), str(object_detections) | |
with gr.Blocks(title="Comprehensive Video Analyzer", theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Comprehensive Video Analyzer") | |
gr.Markdown("Upload a video to get transcription, captions, action labels, and object detections.") | |
with gr.Row(): | |
video_input = gr.Video(label="Upload Video", sources=["upload"], type="filepath") | |
submit_button = gr.Button("Analyze Video", variant="primary") | |
with gr.Tabs(): | |
with gr.TabItem("Transcription (Whisper)"): | |
transcription_output = gr.Textbox(label="Transcription", lines=10, interactive=False) | |
with gr.TabItem("Dense Captions (SpaceTimeGPT)"): | |
caption_output = gr.Textbox(label="Captions", lines=10, interactive=False) | |
with gr.TabItem("Action Recognition (VideoMAE)"): | |
action_output = gr.Textbox(label="Predicted Actions (JSON format)", lines=10, interactive=False) | |
with gr.TabItem("Object Detection (DETR)"): | |
object_output = gr.Textbox(label="Detected Objects (JSON format)", lines=10, interactive=False) | |
submit_button.click( | |
fn=analyze_video_all_models, | |
inputs=[video_input], | |
outputs=[transcription_output, caption_output, action_output, object_output] | |
) | |
gr.Markdown("### Example Video") | |
gr.Markdown("You can test with a short video. Processing may take a few minutes depending on video length and model inference times.") | |
print("[Gradio] UI definition complete.") | |
return gr.routes.App.create_app(demo) | |