video_mcp / modal_whisper_app.py
jomasego's picture
Fix: Remove duplicate decorator causing AttributeError
e94b0dd
import modal
from fastapi import FastAPI, UploadFile, File, Body, Query
from fastapi.responses import JSONResponse
web_app = FastAPI(title="MCP Video Analysis API")
import os
import tempfile
import io # Used by Whisper for BytesIO
import httpx # For downloading videos from URLs
from typing import Optional, List, Dict, Any
import json
import hashlib
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import re # For parsing search results
import yt_dlp
import asyncio # For concurrent video processing
import gradio as gr
# Global Configuration (should be at the top of the file)
WHISPER_MODEL_NAME = "openai/whisper-large-v3" # Use latest Whisper model
CAPTION_MODEL_NAME = "microsoft/xclip-base-patch16" # For SpaceTimeGPT alternative
CAPTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # For SpaceTimeGPT's video encoder
# CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor)
ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics"
ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME)
OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50"
OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50"
# --- Modal Image Definition ---
video_analysis_image_v2 = (
modal.Image.debian_slim(python_version="3.10")
.apt_install("ffmpeg")
.pip_install(
"gradio==3.50.2", # Pin Gradio version for stability
"transformers[torch]", # For all Hugging Face models and PyTorch
"soundfile", # For Whisper
"av", # For video frame extraction
"Pillow", # For image processing
"timm", # Often a dependency for vision models
"torchvision",
"torchaudio",
"fastapi[standard]", # For web endpoints
"pydantic",
"yt-dlp", # For request body validation
"httpx", # For downloading video from URL
"cowsay==6.1" # Cache-busting package
)
)
# --- Modal App Definition ---
app = modal.App(name="video-analysis-gradio-pipeline") # New app name, using App
# --- Pydantic model for web endpoint request ---
class VideoAnalysisRequestPayload(BaseModel):
video_url: Optional[str] = None
class TopicAnalysisRequest(BaseModel):
topic: str
max_videos: int = Query(3, ge=1, le=10) # Default 3, min 1, max 10 videos
# --- Constants for Model Names ---
# WHISPER_MODEL_NAME = "openai/whisper-large-v3"
CAPTION_MODEL_NAME = "Neleac/SpaceTimeGPT"
CAPTION_PROCESSOR_NAME = "Neleac/SpaceTimeGPT" # Use processor from SpaceTimeGPT itself
# # CAPTION_TOKENIZER_NAME = "gpt2" # For SpaceTimeGPT's text decoder (usually part of processor)
# ACTION_MODEL_NAME = "MCG-NJU/videomae-base-finetuned-kinetics"
# ACTION_PROCESSOR_NAME = "MCG-NJU/videomae-base" # Or VideoMAEImageProcessor.from_pretrained(ACTION_MODEL_NAME)
# OBJECT_DETECTION_MODEL_NAME = "facebook/detr-resnet-50"
# OBJECT_DETECTION_PROCESSOR_NAME = "facebook/detr-resnet-50"
# --- Modal Distributed Dictionary for Caching ---
video_analysis_cache = modal.Dict.from_name("video_analysis_cache", create_if_missing=True)
# --- Hugging Face Token Secret ---
HF_TOKEN_SECRET = modal.Secret.from_name("my-huggingface-secret")
# --- Helper: Hugging Face Login ---
def _login_to_hf():
import os
from huggingface_hub import login
hf_token = os.environ.get("HF_TOKEN")
if hf_token:
try:
login(token=hf_token)
print("Successfully logged into Hugging Face Hub.")
return True
except Exception as e:
print(f"Hugging Face Hub login failed: {e}")
return False
else:
print("HF_TOKEN secret not found. Some models might fail to load.")
return False
# === 1. Transcription with Whisper ===
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def transcribe_video_with_whisper(video_bytes: bytes) -> str:
_login_to_hf()
import torch
from transformers import pipeline
import soundfile as sf
import av # For robust audio extraction
import numpy as np
import io
print("[Whisper] Starting transcription.")
temp_audio_path = None
try:
# Robust audio extraction using PyAV
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
audio_stream = next((s for s in container.streams if s.type == 'audio'), None)
if audio_stream is None:
return "Whisper Error: No audio stream found in video."
# Decode and resample audio to 16kHz mono WAV
# Store resampled audio in a temporary WAV file
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_file_for_sf:
temp_audio_path = tmp_audio_file_for_sf.name
output_container = av.open(temp_audio_path, mode='w')
output_stream = output_container.add_stream('pcm_s16le', rate=16000, layout='mono')
for frame in container.decode(audio_stream):
for packet in output_stream.encode(frame):
output_container.mux(packet)
# Flush stream
for packet in output_stream.encode():
output_container.mux(packet)
output_container.close()
container.close()
os.remove(video_path) # Clean up temp video file
pipe = pipeline(
"automatic-speech-recognition",
model=WHISPER_MODEL_NAME,
torch_dtype=torch.float16,
device="cuda:0" if torch.cuda.is_available() else "cpu",
)
print(f"[Whisper] Pipeline loaded. Transcribing {temp_audio_path}...")
# Add robust error handling for the Whisper model
try:
outputs = pipe(temp_audio_path, chunk_length_s=30, stride_length_s=5, batch_size=8, generate_kwargs={"language": "english"}, return_timestamps=False)
except Exception as whisper_err:
print(f"[Whisper] Error during transcription: {whisper_err}")
# Try again with different settings if the first attempt failed
print(f"[Whisper] Attempting fallback transcription with smaller chunk size...")
outputs = pipe(temp_audio_path, chunk_length_s=10, stride_length_s=2, batch_size=4, generate_kwargs={"language": "english"}, return_timestamps=False)
transcription = outputs["text"]
print(f"[Whisper] Transcription successful: {transcription[:100]}...")
return transcription
except Exception as e:
print(f"[Whisper] Error: {e}")
import traceback
traceback.print_exc()
return f"Whisper Error: {str(e)}"
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
os.remove(temp_audio_path)
if 'video_path' in locals() and video_path and os.path.exists(video_path):
os.remove(video_path) # Ensure temp video is cleaned up if audio extraction failed early
# === 2. Captioning with SpaceTimeGPT ===
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def generate_captions_with_spacetimegpt(video_bytes: bytes) -> str:
_login_to_hf()
import torch
from transformers import AutoProcessor, AutoModelForVision2Seq
import av
import numpy as np
import tempfile
print("[SpaceTimeGPT] Starting captioning.")
video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
video_stream = next((s for s in container.streams if s.type == 'video'), None)
if video_stream is None:
return "SpaceTimeGPT Error: No video stream found."
num_frames_to_sample = 16
total_frames = video_stream.frames
if total_frames == 0: return "SpaceTimeGPT Error: Video has no frames."
indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
frames = []
for i in indices:
container.seek(int(i), stream=video_stream)
frame = next(container.decode(video_stream))
frames.append(frame.to_rgb().to_ndarray())
container.close()
video_frames_np = np.stack(frames)
processor = AutoProcessor.from_pretrained(CAPTION_PROCESSOR_NAME, trust_remote_code=True)
# Debug prints
print(f"[SpaceTimeGPT] DEBUG: CAPTION_MODEL_NAME is {CAPTION_MODEL_NAME}")
print(f"[SpaceTimeGPT] DEBUG: Intending to use model class: {AutoModelForVision2Seq.__name__}")
print(f"[SpaceTimeGPT] DEBUG: Type of model class object: {type(AutoModelForVision2Seq)}")
model = AutoModelForVision2Seq.from_pretrained(CAPTION_MODEL_NAME, trust_remote_code=True)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(device)
if hasattr(processor, 'tokenizer'): # Check if tokenizer exists
processor.tokenizer.padding_side = "right"
print("[SpaceTimeGPT] Model and processor loaded. Generating captions...")
inputs = processor(text=None, videos=list(video_frames_np), return_tensors="pt", padding=True).to(device)
generated_ids = model.generate(**inputs, max_new_tokens=128)
captions = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
print(f"[SpaceTimeGPT] Captioning successful: {captions}")
return captions
except Exception as e:
print(f"[SpaceTimeGPT] Error: {e}")
import traceback
traceback.print_exc()
return f"SpaceTimeGPT Error: {str(e)}"
finally:
if video_path and os.path.exists(video_path):
os.remove(video_path)
# === 3. Action Recognition with VideoMAE ===
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def generate_action_labels(video_bytes: bytes) -> List[Dict[str, Any]]:
_login_to_hf()
import torch
from transformers import VideoMAEImageProcessor, VideoMAEForVideoClassification
import av
import numpy as np
import tempfile
print("[VideoMAE] Starting action recognition.")
video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
video_stream = next((s for s in container.streams if s.type == 'video'), None)
if video_stream is None:
return [{"error": "VideoMAE Error: No video stream found."}]
num_frames_to_sample = 16
total_frames = video_stream.frames
if total_frames == 0: return [{"error": "VideoMAE Error: Video has no frames."}]
indices = np.linspace(0, total_frames - 1, num_frames_to_sample, dtype=int)
video_frames_list = []
for i in indices:
container.seek(int(i), stream=video_stream)
frame = next(container.decode(video_stream))
video_frames_list.append(frame.to_rgb().to_ndarray())
container.close()
processor = VideoMAEImageProcessor.from_pretrained(ACTION_PROCESSOR_NAME)
model = VideoMAEForVideoClassification.from_pretrained(ACTION_MODEL_NAME)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(device)
print("[VideoMAE] Model and processor loaded. Classifying actions...")
inputs = processor(video_frames_list, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
top_k = 5
probabilities = torch.softmax(logits, dim=-1)
top_probs, top_indices = torch.topk(probabilities, top_k)
results = []
for i in range(top_k):
label = model.config.id2label[top_indices[0, i].item()]
score = top_probs[0, i].item()
results.append({"action": label, "confidence": round(score, 4)})
print(f"[VideoMAE] Action recognition successful: {results}")
return results
except Exception as e:
print(f"[VideoMAE] Error: {e}")
import traceback
traceback.print_exc()
return [{"error": f"VideoMAE Error: {str(e)}"}]
finally:
if video_path and os.path.exists(video_path):
os.remove(video_path)
# === 4. Object Detection with DETR ===
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
gpu="any",
timeout=600
)
def generate_object_detection(video_bytes: bytes) -> List[Dict[str, Any]]:
_login_to_hf()
import torch
from transformers import DetrImageProcessor, DetrForObjectDetection
from PIL import Image # Imported but not directly used, av.frame.to_image() is used
import av
import numpy as np
import tempfile
print("[DETR] Starting object detection.")
video_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_file:
tmp_video_file.write(video_bytes)
video_path = tmp_video_file.name
container = av.open(video_path)
video_stream = next((s for s in container.streams if s.type == 'video'), None)
if video_stream is None:
return [{"error": "DETR Error: No video stream found."}]
num_frames_to_extract = 3
total_frames = video_stream.frames
if total_frames == 0: return [{"error": "DETR Error: Video has no frames."}]
frame_indices = np.linspace(0, total_frames - 1, num_frames_to_extract, dtype=int)
processor = DetrImageProcessor.from_pretrained(OBJECT_DETECTION_PROCESSOR_NAME)
model = DetrForObjectDetection.from_pretrained(OBJECT_DETECTION_MODEL_NAME)
device = "cuda:0" if torch.cuda.is_available() else "cpu"
model.to(device)
print("[DETR] Model and processor loaded.")
all_frame_detections = []
for frame_num, target_frame_index in enumerate(frame_indices):
container.seek(int(target_frame_index), stream=video_stream)
frame = next(container.decode(video_stream))
pil_image = frame.to_image()
print(f"[DETR] Processing frame {frame_num + 1}/{num_frames_to_extract} (original index {target_frame_index})...")
inputs = processor(images=pil_image, return_tensors="pt").to(device)
outputs = model(**inputs)
target_sizes = torch.tensor([pil_image.size[::-1]], device=device)
results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.7)[0]
frame_detections = []
for score, label, box in zip(results["scores"], results["labels"], results["boxes"]):
frame_detections.append({
"label": model.config.id2label[label.item()],
"confidence": round(score.item(), 3),
"box": [round(coord) for coord in box.tolist()]
})
if frame_detections: # Only add if detections are present for this frame
all_frame_detections.append({
"frame_number": frame_num + 1,
"original_frame_index": int(target_frame_index),
"detections": frame_detections
})
container.close()
print(f"[DETR] Object detection successful: {all_frame_detections if all_frame_detections else 'No objects detected with threshold.'}")
return all_frame_detections if all_frame_detections else [{"info": "No objects detected with current threshold."}]
except Exception as e:
print(f"[DETR] Error: {e}")
import traceback
traceback.print_exc()
return [{"error": f"DETR Error: {str(e)}"}]
finally:
if video_path and os.path.exists(video_path):
os.remove(video_path)
# === 5. Comprehensive Video Analysis (Orchestrator) ===
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
gpu="any", # Request GPU as some sub-tasks will need it
timeout=1800, # Generous timeout for all models
# allow_concurrent_inputs=10, # Optional: if you expect many parallel requests
# keep_warm=1 # Optional: to keep one instance warm for faster cold starts
)
async def analyze_video_comprehensive(video_bytes: bytes) -> Dict[str, Any]:
print("[Orchestrator] Starting comprehensive video analysis.")
cache_key = hashlib.sha256(video_bytes).hexdigest()
try:
cached_result = video_analysis_cache.get(cache_key)
if cached_result:
print(f"[Orchestrator] Cache hit for key: {cache_key}")
return cached_result
except Exception as e:
# Log error but proceed with analysis if cache get fails
print(f"[Orchestrator] Cache GET error: {e}. Proceeding with fresh analysis.")
print(f"[Orchestrator] Cache miss for key: {cache_key}. Performing full analysis.")
results = {}
print("[Orchestrator] Calling transcription...")
try:
# .call() is synchronous in the context of the Modal function execution
results["transcription"] = transcribe_video_with_whisper.remote(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in transcription: {e}")
results["transcription"] = f"Transcription Error: {str(e)}"
print("[Orchestrator] Calling captioning...")
try:
results["caption"] = generate_captions_with_spacetimegpt.remote(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in captioning: {e}")
results["caption"] = f"Captioning Error: {str(e)}"
print("[Orchestrator] Calling action recognition...")
try:
results["actions"] = generate_action_labels.remote(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in action recognition: {e}")
results["actions"] = [{"error": f"Action Recognition Error: {str(e)}"}] # Ensure list type for error
print("[Orchestrator] Calling object detection...")
try:
results["objects"] = generate_object_detection.remote(video_bytes)
except Exception as e:
print(f"[Orchestrator] Error in object detection: {e}")
results["objects"] = [{"error": f"Object Detection Error: {str(e)}"}] # Ensure list type for error
print("[Orchestrator] All analyses attempted. Storing results in cache.")
try:
video_analysis_cache.put(cache_key, results)
print(f"[Orchestrator] Successfully cached results for key: {cache_key}")
except Exception as e:
print(f"[Orchestrator] Cache PUT error: {e}")
return results
# === FastAPI Endpoint for Video Analysis ===
@web_app.post("/process_video_analysis")
def process_video_analysis(payload: VideoAnalysisRequestPayload):
"""FastAPI endpoint for comprehensive video analysis."""
print(f"[FastAPI Endpoint] Received request for video analysis")
video_url = payload.video_url
if not video_url:
return JSONResponse(status_code=400, content={"error": "video_url must be provided in JSON payload."})
print(f"[FastAPI Endpoint] Processing video_url: {video_url}")
try:
# Download video using yt-dlp with enhanced options for robustness
import yt_dlp
import tempfile
import os
import subprocess
import shutil
video_bytes = None
with tempfile.TemporaryDirectory() as tmpdir:
output_base = os.path.join(tmpdir, 'video')
output_path = output_base + '.mp4'
# Enhanced yt-dlp options for more reliable downloads
ydl_opts = {
# Request specific formats in priority order
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': output_base,
'quiet': False, # Temporarily enable output for debugging
'verbose': True, # More verbose output to diagnose issues
'no_warnings': False, # Show warnings for debugging
'noplaylist': True,
# Force remux to ensure valid container
'merge_output_format': 'mp4',
# Add postprocessors to ensure valid MP4
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4',
'postprocessor_args': ['-movflags', '+faststart'],
}],
# Force ffmpeg to create a valid MP4 with moov atom at the beginning
'prefer_ffmpeg': True,
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
},
}
try:
print(f"[FastAPI Endpoint] Downloading video with enhanced yt-dlp options from {video_url}")
download_success = False
# Try yt-dlp first
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
# Find the actual output file (might have a different extension)
downloaded_files = [f for f in os.listdir(tmpdir) if f.startswith('video')]
if downloaded_files:
actual_file = os.path.join(tmpdir, downloaded_files[0])
print(f"[FastAPI Endpoint] Found downloaded file: {actual_file}")
download_success = True
except Exception as e:
print(f"[FastAPI Endpoint] yt-dlp download failed: {e}. Trying direct download...")
# Fallback to direct download if it's a direct video URL
if not download_success and (video_url.endswith('.mp4') or 'commondatastorage.googleapis.com' in video_url):
import requests
try:
print(f"[FastAPI Endpoint] Attempting direct download for {video_url}")
actual_file = os.path.join(tmpdir, 'direct_video.mp4')
with requests.get(video_url, stream=True) as r:
r.raise_for_status()
with open(actual_file, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
print(f"[FastAPI Endpoint] Direct download successful: {actual_file}")
download_success = True
except Exception as e:
print(f"[FastAPI Endpoint] Direct download failed: {e}")
# For testing: Try a sample video if all downloads failed (Big Buck Bunny)
if not download_success:
test_url = "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"
print(f"[FastAPI Endpoint] All downloads failed. Falling back to sample video: {test_url}")
import requests
try:
actual_file = os.path.join(tmpdir, 'fallback_video.mp4')
with requests.get(test_url, stream=True) as r:
r.raise_for_status()
with open(actual_file, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
print(f"[FastAPI Endpoint] Fallback download successful")
download_success = True
except Exception as e:
print(f"[FastAPI Endpoint] Even fallback download failed: {e}")
raise Exception("All download methods failed")
# Ensure it's a properly formatted MP4 using ffmpeg directly
final_output = os.path.join(tmpdir, 'final_video.mp4')
try:
# Use ffmpeg to re-encode the file, ensuring proper moov atom placement
print(f"[FastAPI Endpoint] Reprocessing with ffmpeg to ensure valid MP4 format")
subprocess.run(
["ffmpeg", "-i", actual_file, "-c:v", "copy", "-c:a", "copy", "-movflags", "faststart", final_output],
check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
if os.path.exists(final_output) and os.path.getsize(final_output) > 0:
with open(final_output, 'rb') as f:
video_bytes = f.read()
print(f"[FastAPI Endpoint] Successfully reprocessed video, size: {len(video_bytes)} bytes")
else:
print(f"[FastAPI Endpoint] ffmpeg reprocessing failed to produce valid output")
except subprocess.SubprocessError as se:
print(f"[FastAPI Endpoint] ffmpeg reprocessing failed: {se}")
# If ffmpeg fails, try with the original file
if os.path.exists(actual_file) and os.path.getsize(actual_file) > 0:
with open(actual_file, 'rb') as f:
video_bytes = f.read()
print(f"[FastAPI Endpoint] Using original download, size: {len(video_bytes)} bytes")
except yt_dlp.utils.DownloadError:
# Fallback to httpx for direct links if yt-dlp fails
print(f"[FastAPI Endpoint] yt-dlp failed, falling back to httpx for {video_url}")
try:
import httpx
with httpx.Client() as client:
response = client.get(video_url, follow_redirects=True, timeout=60.0)
response.raise_for_status()
video_bytes = response.content
except httpx.RequestError as he:
return JSONResponse(status_code=400, content={"error": f"Failed to download video from URL using both yt-dlp and httpx. Details: {he}"})
if not video_bytes:
return JSONResponse(status_code=400, content={"error": f"Downloaded video from URL {video_url} is empty or download failed."})
print(f"[FastAPI Endpoint] Successfully downloaded and validated {len(video_bytes)} bytes from {video_url} using enhanced downloader.")
# Call comprehensive analysis
analysis_results = analyze_video_comprehensive.remote(video_bytes)
print("[FastAPI Endpoint] Comprehensive analysis finished.")
return JSONResponse(status_code=200, content=analysis_results)
except httpx.RequestError as e:
print(f"[FastAPI Endpoint] httpx.RequestError downloading video: {e}")
return JSONResponse(status_code=400, content={"error": f"Error downloading video from URL: {video_url}. Details: {str(e)}"})
except Exception as e:
print(f"[FastAPI Endpoint] Unexpected Exception during analysis: {e}")
return JSONResponse(status_code=500, content={"error": f"Unexpected server error during analysis: {str(e)}"})
# === FastAPI Endpoint for Topic Analysis ===
@web_app.post("/analyze_topic")
async def handle_analyze_topic_request(request: TopicAnalysisRequest):
"""
Handles a request to analyze videos based on a topic.
1. Finds video URLs for the topic using YouTube search.
2. Concurrently analyzes these videos.
3. Returns aggregated results.
"""
print(f"[TopicAPI] Received request to analyze topic: '{request.topic}', max_videos: {request.max_videos}")
try:
# Use .aio for async call if the Modal function is async, or just .remote if it's sync
# Assuming find_video_urls_for_topic is sync as defined, but can be called with .remote()
# If find_video_urls_for_topic itself becomes async, then .remote.aio() is appropriate.
# For now, let's assume it's called as a standard remote Modal function.
video_urls = await find_video_urls_for_topic.remote.aio(request.topic, request.max_videos)
if not video_urls:
print(f"[TopicAPI] No video URLs found for topic: '{request.topic}'")
return JSONResponse(
status_code=404,
content={
"status": "error",
"message": "No videos found for the specified topic.",
"topic": request.topic,
"details": "The YouTube search did not return any relevant video URLs."
}
)
print(f"[TopicAPI] Found {len(video_urls)} URLs for topic '{request.topic}', proceeding to analysis.")
# analyze_videos_by_topic is an async Modal function, so use .remote.aio()
analysis_results = await analyze_videos_by_topic.remote.aio(video_urls, request.topic)
print(f"[TopicAPI] Successfully analyzed videos for topic: '{request.topic}'")
return analysis_results
except Exception as e:
print(f"[TopicAPI] Error during topic analysis for '{request.topic}': {e}")
import traceback
traceback.print_exc()
return JSONResponse(
status_code=500,
content={
"status": "error",
"message": "An internal server error occurred during topic analysis.",
"topic": request.topic,
"error_details_str": str(e) # Keep it simple for JSON
}
)
# === 6. Topic-Based Video Search ===
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
timeout=300
)
def find_video_urls_for_topic(topic: str, max_results: int = 3) -> List[str]:
"""Finds video URLs (YouTube) for a given topic using yt-dlp."""
print(f"[TopicSearch] Finding video URLs for topic: '{topic}', max_results={max_results}")
video_urls = []
try:
# Add a common user-agent to avoid getting blocked
# Let yt-dlp find ffmpeg in the PATH instead of hardcoding it
ydl_opts = {
'quiet': True,
'extract_flat': 'discard_in_playlist',
'force_generic_extractor': False,
'default_search': f"ytsearch{max_results}",
'noplaylist': True,
'prefer_ffmpeg': True,
'http_headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
}
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
# extract_info with a search query like 'ytsearchN:query' returns a playlist dictionary
search_result = ydl.extract_info(topic, download=False)
if search_result and 'entries' in search_result:
for entry in search_result['entries']:
# Ensure entry is a dictionary and has 'webpage_url'
if isinstance(entry, dict) and entry.get('webpage_url'):
video_urls.append(entry['webpage_url'])
# yt-dlp search might return more than max_results, so we cap it here
if len(video_urls) >= max_results:
break
# Sometimes a single video result might not be in 'entries'
elif isinstance(search_result, dict) and search_result.get('webpage_url'):
video_urls.append(search_result['webpage_url'])
# Ensure we don't exceed max_results if the loop didn't break early enough
video_urls = video_urls[:max_results]
print(f"[TopicSearch] Found {len(video_urls)} video URLs for topic '{topic}': {video_urls}")
except Exception as e:
print(f"[TopicSearch] Error finding videos for topic '{topic}': {e}")
import traceback
traceback.print_exc()
return video_urls
# Helper function (not a Modal function) to extract video URLs from search results
def extract_video_urls_from_search(search_results: List[Dict[str, str]], max_urls: int = 3) -> List[str]:
"""Extracts video URLs from a list of search result dictionaries."""
video_urls = []
seen_urls = set()
# Regex for YouTube, Vimeo, and common video file extensions
# Simplified YouTube regex to catch most common video and shorts links
youtube_regex = r"(?:https?://)?(?:www\.)?(?:youtube\.com/(?:watch\?v=|embed/|shorts/)|youtu\.be/)([a-zA-Z0-9_-]{11})"
vimeo_regex = r"(?:https?://)?(?:www\.)?vimeo\.com/(\d+)"
direct_video_regex = r"https?://[^\s]+\.(mp4|mov|avi|webm|mkv)(\?[^\s]*)?"
patterns = [
re.compile(youtube_regex),
re.compile(vimeo_regex),
re.compile(direct_video_regex)
]
for item in search_results:
url = item.get("link") or item.get("url") # Common keys for URL in search results
if not url:
continue
for pattern in patterns:
match = pattern.search(url)
if match:
# Reconstruct canonical YouTube URL if it's a short link or embed
if pattern.pattern == youtube_regex and match.group(1):
normalized_url = f"https://www.youtube.com/watch?v={match.group(1)}"
else:
normalized_url = url
if normalized_url not in seen_urls:
video_urls.append(normalized_url)
seen_urls.add(normalized_url)
if len(video_urls) >= max_urls:
break
if len(video_urls) >= max_urls:
break
# === 7. Topic-Based Video Analysis Orchestrator ===
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
timeout=1800,
)
async def _analyze_video_worker(video_url: str) -> dict:
"""
Worker function to download a video from a URL and run comprehensive analysis.
This is designed to be called concurrently.
"""
print(f"[Worker] Starting analysis for {video_url}")
try:
async with httpx.AsyncClient() as client:
print(f"[Worker] Downloading video from {video_url}")
response = await client.get(video_url, follow_redirects=True, timeout=60.0)
response.raise_for_status()
video_bytes = await response.aread()
print(f"[Worker] Downloaded {len(video_bytes)} bytes from {video_url}")
if not video_bytes:
raise ValueError("Downloaded video content is empty.")
analysis_result = await analyze_video_comprehensive.coro(video_bytes)
if isinstance(analysis_result, dict) and any("error" in str(v).lower() for v in analysis_result.values()):
print(f"[Worker] Comprehensive analysis for {video_url} reported errors: {analysis_result}")
return {"url": video_url, "status": "error", "error_type": "analysis_error", "details": analysis_result}
else:
return {"url": video_url, "status": "success", "analysis": analysis_result}
except httpx.HTTPStatusError as e:
print(f"[Worker] HTTP error downloading {video_url}: {e}")
return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"HTTP {e.response.status_code}"}
except httpx.RequestError as e:
print(f"[Worker] Request error downloading {video_url}: {e}")
return {"url": video_url, "status": "error", "error_type": "download_error", "details": f"Failed to download: {str(e)}"}
except Exception as e:
print(f"[Worker] Error processing video {video_url}: {e}")
import traceback
return {"url": video_url, "status": "error", "error_type": "processing_error", "details": str(e), "traceback": traceback.format_exc()[:1000]}
@app.function(
image=video_analysis_image_v2,
secrets=[HF_TOKEN_SECRET],
timeout=3600,
gpu="any",
)
async def analyze_videos_by_topic(video_urls: List[str], topic: str) -> Dict[str, Any]:
"""Analyzes a list of videos (by URL) concurrently and aggregates results for a topic."""
print(f"[TopicAnalysis] Starting concurrent analysis for topic: '{topic}' with {len(video_urls)} video(s).")
results_aggregator = {
"topic": topic,
"analyzed_videos": [],
"errors": []
}
if not video_urls:
results_aggregator["errors"].append({"topic_error": "No video URLs provided or found for the topic."})
return results_aggregator
# Use .map to run the worker function concurrently on all video URLs
# The list() call forces the generator to execute and retrieve all results.
individual_results = list(_analyze_video_worker.map(video_urls))
for result in individual_results:
if isinstance(result, dict):
if result.get("status") == "error":
results_aggregator["errors"].append(result)
else:
results_aggregator["analyzed_videos"].append(result)
else:
# This case handles unexpected return types from the worker, like exceptions
print(f"[TopicAnalysis] Received an unexpected result type from worker: {type(result)}")
results_aggregator["errors"].append({"url": "unknown", "error_type": "unexpected_result", "details": str(result)})
print(f"[TopicAnalysis] Finished concurrent analysis for topic '{topic}'.")
return results_aggregator
# === Gradio Interface ===
def video_analyzer_gradio_ui():
print("[Gradio] UI function called to define interface.")
def analyze_video_all_models(video_filepath):
print(f"[Gradio] Received video filepath for analysis: {video_filepath}")
if not video_filepath or not os.path.exists(video_filepath):
return "Error: Video file path is invalid or does not exist.", "", "[]", "[]"
with open(video_filepath, "rb") as f:
video_bytes_content = f.read()
print(f"[Gradio] Read {len(video_bytes_content)} bytes from video path: {video_filepath}")
if not video_bytes_content:
return "Error: Could not read video bytes.", "", "[]", "[]"
print("[Gradio] Calling Whisper...")
transcription = transcribe_video_with_whisper.call(video_bytes_content)
print(f"[Gradio] Whisper result length: {len(transcription)}")
print("[Gradio] Calling SpaceTimeGPT...")
captions = generate_captions_with_spacetimegpt.call(video_bytes_content)
print(f"[Gradio] SpaceTimeGPT result: {captions}")
print("[Gradio] Calling VideoMAE...")
action_labels = generate_action_labels.call(video_bytes_content)
print(f"[Gradio] VideoMAE result: {action_labels}")
print("[Gradio] Calling DETR...")
object_detections = generate_object_detection.call(video_bytes_content)
print(f"[Gradio] DETR result: {object_detections}")
return transcription, captions, str(action_labels), str(object_detections)
with gr.Blocks(title="Comprehensive Video Analyzer", theme=gr.themes.Soft()) as demo:
gr.Markdown("# Comprehensive Video Analyzer")
gr.Markdown("Upload a video to get transcription, captions, action labels, and object detections.")
with gr.Row():
video_input = gr.Video(label="Upload Video", sources=["upload"], type="filepath")
submit_button = gr.Button("Analyze Video", variant="primary")
with gr.Tabs():
with gr.TabItem("Transcription (Whisper)"):
transcription_output = gr.Textbox(label="Transcription", lines=10, interactive=False)
with gr.TabItem("Dense Captions (SpaceTimeGPT)"):
caption_output = gr.Textbox(label="Captions", lines=10, interactive=False)
with gr.TabItem("Action Recognition (VideoMAE)"):
action_output = gr.Textbox(label="Predicted Actions (JSON format)", lines=10, interactive=False)
with gr.TabItem("Object Detection (DETR)"):
object_output = gr.Textbox(label="Detected Objects (JSON format)", lines=10, interactive=False)
submit_button.click(
fn=analyze_video_all_models,
inputs=[video_input],
outputs=[transcription_output, caption_output, action_output, object_output]
)
gr.Markdown("### Example Video")
gr.Markdown("You can test with a short video. Processing may take a few minutes depending on video length and model inference times.")
print("[Gradio] UI definition complete.")
return gr.routes.App.create_app(demo)