Spaces:
Runtime error
Runtime error
import os | |
import cv2 | |
import tempfile | |
from dotenv import load_dotenv | |
from mllm_tools.utils import _prepare_text_video_inputs | |
from eval_suite.prompts_raw import _video_eval_new | |
from eval_suite.utils import extract_json, convert_score_fields | |
load_dotenv() | |
def reduce_video_framerate(input_path, target_fps=1, output_path=None): | |
""" | |
Reduces the frame rate of a video by only keeping frames at the target interval. | |
Args: | |
input_path (str): Path to the input video | |
target_fps (int): Target frames per second (default: 1) | |
output_path (str, optional): Path to save the processed video. If None, uses a temporary file. | |
Returns: | |
str: Path to the processed video | |
Raises: | |
ValueError: If input video cannot be opened or has invalid FPS | |
RuntimeError: If video writer initialization fails or output video creation fails | |
""" | |
cap = cv2.VideoCapture(input_path) | |
if not cap.isOpened(): | |
raise ValueError(f"Could not open input video: {input_path}") | |
original_fps = cap.get(cv2.CAP_PROP_FPS) | |
if original_fps <= 0: | |
raise ValueError(f"Invalid FPS ({original_fps}) detected in input video") | |
frame_interval = int(original_fps / target_fps) | |
# Get video properties | |
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
# Use provided output path or create temporary file | |
if output_path is None: | |
temp_output = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) | |
output_path = temp_output.name | |
# Ensure output directory exists | |
os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
# Try different codecs in order of preference | |
codecs = [ | |
('avc1', '.mp4'), # H.264 codec | |
('mp4v', '.mp4'), # MP4V codec | |
('XVID', '.avi'), # XVID codec | |
('MJPG', '.avi'), # Motion JPEG codec | |
] | |
success = False | |
for codec, ext in codecs: | |
if output_path.endswith('.mp4') and not ext.endswith('.mp4'): | |
# If we're switching to AVI format, change the extension | |
output_path = output_path[:-4] + ext | |
fourcc = cv2.VideoWriter_fourcc(*codec) | |
out = cv2.VideoWriter(output_path, fourcc, target_fps, (width, height)) | |
if out.isOpened(): | |
success = True | |
print(f"Successfully initialized video writer with codec: {codec}") | |
break | |
else: | |
out.release() | |
if os.path.exists(output_path): | |
os.remove(output_path) | |
if not success: | |
raise RuntimeError("Could not initialize video writer with any available codec") | |
frame_count = 0 | |
frames_written = 0 | |
while cap.isOpened(): | |
ret, frame = cap.read() | |
if not ret: | |
break | |
# Only write frames at the specified interval | |
if frame_count % frame_interval == 0: | |
out.write(frame) | |
frames_written += 1 | |
frame_count += 1 | |
cap.release() | |
out.release() | |
# Verify the output | |
verify_cap = cv2.VideoCapture(output_path) | |
if not verify_cap.isOpened(): | |
raise RuntimeError(f"Failed to create output video at {output_path}") | |
actual_fps = verify_cap.get(cv2.CAP_PROP_FPS) | |
total_frames = verify_cap.get(cv2.CAP_PROP_FRAME_COUNT) | |
verify_cap.release() | |
if actual_fps <= 0: | |
print("Warning: Output video reports invalid FPS. This might be a codec issue.") | |
actual_fps = target_fps # Use target FPS for duration calculation | |
print(f"Created video with {frames_written} frames at {actual_fps} FPS") | |
print(f"Total duration: {total_frames/actual_fps:.2f} seconds") | |
print(f"Video saved to: {output_path}") | |
return output_path | |
def evaluate_video_chunk_new(model, video_path, transcript="No transcript provided", description="No description provided", | |
save_processed_video=None, target_fps=None, retry_limit=5): | |
""" | |
Evaluate a single video chunk using a multimodal model. | |
Args: | |
model: The multimodal model to use for evaluation | |
video_path (str): Path to the video file to evaluate | |
transcript (str, optional): Video transcript text. Defaults to "No transcript provided" | |
description (str, optional): Video description text. Defaults to "No description provided" | |
save_processed_video (str, optional): Path to save processed video. If None, uses temporary file | |
target_fps (int, optional): Target frames per second for video processing. If None, no processing | |
retry_limit (int, optional): Maximum number of retry attempts. Defaults to 5 | |
Returns: | |
dict: Evaluation results as a JSON object with scores converted to integers | |
Raises: | |
FileNotFoundError: If video file does not exist | |
Exception: If evaluation fails after all retry attempts | |
""" | |
if not os.path.exists(video_path): | |
raise FileNotFoundError(f"Video file not found: {video_path}") | |
# Only process video if target_fps is specified | |
if target_fps is not None: | |
processed_video_path = reduce_video_framerate(video_path, target_fps=target_fps, output_path=save_processed_video) | |
video_to_use = processed_video_path | |
else: | |
video_to_use = video_path | |
prompt = _video_eval_new.format(description=description) | |
inputs = _prepare_text_video_inputs(prompt, video_to_use) | |
try: | |
for attempt in range(retry_limit): | |
try: | |
response = model(inputs) | |
response_json = extract_json(response) | |
response_json = convert_score_fields(response_json) | |
return response_json | |
except Exception as e: | |
print(f"Attempt {attempt + 1} failed: {e}") | |
if attempt + 1 == retry_limit: | |
print("Reached maximum retry limit. Evaluation failed.") | |
raise | |
finally: | |
# Clean up the temporary processed video if we created one | |
if target_fps is not None and save_processed_video is None and os.path.exists(processed_video_path): | |
os.unlink(processed_video_path) |