Spaces:
Runtime error
Runtime error
import os | |
import tempfile | |
import numpy as np | |
from PIL import Image, ImageOps | |
from moviepy import VideoFileClip | |
from eval_suite.prompts_raw import _image_eval | |
from eval_suite.utils import extract_json, convert_score_fields, calculate_geometric_mean | |
from mllm_tools.utils import _prepare_text_image_inputs | |
from src.core.parse_video import image_with_most_non_black_space | |
def extract_key_frames(video_path, output_dir, num_chunks): | |
"""Extract key frames from a video by dividing it into chunks and selecting representative frames. | |
Args: | |
video_path (str): Path to the input video file | |
output_dir (str): Directory where extracted frames will be saved | |
num_chunks (int): Number of chunks to divide the video into | |
Returns: | |
list: List of paths to the extracted key frames | |
""" | |
# Create output directory if it doesn't exist | |
os.makedirs(output_dir, exist_ok=True) | |
# Extract all frames from the video | |
clip = VideoFileClip(video_path) | |
frames = list(clip.iter_frames(fps=1)) # one frame every second | |
total_frames = len(frames) | |
if total_frames == 0: | |
print("No frames extracted from the video.") | |
return [] | |
# Determine the number of frames per chunk | |
frames_per_chunk = total_frames // num_chunks | |
num_chunks = min(num_chunks, (total_frames + frames_per_chunk - 1) // frames_per_chunk) | |
key_frames = [] | |
# Process each chunk of frames | |
for i in range(num_chunks): | |
start_idx = i * frames_per_chunk | |
end_idx = min((i + 1) * frames_per_chunk, total_frames) | |
chunk_frames = frames[start_idx:end_idx] | |
if chunk_frames: | |
# Save the frame with most non-black space | |
output_path = os.path.join(output_dir, f"key_frame_{i+1}.jpg") | |
result = image_with_most_non_black_space(chunk_frames, output_path) | |
else: | |
print(f"No frames in chunk {i+1}. Skipping.") | |
result = None | |
if result is not None: | |
key_frames.append(output_path) | |
clip.close() | |
return key_frames | |
def evaluate_sampled_images(model, video_path, description="No description provided", num_chunks=10, output_folder=None): | |
"""Evaluate sampled frames from a video using an image evaluation model. | |
Args: | |
model: The image evaluation model to use | |
video_path (str): Path to the input video file | |
description (str, optional): Description of the video content. Defaults to "No description provided" | |
num_chunks (int, optional): Number of chunks to divide the video into. Defaults to 10 | |
output_folder (str, optional): Directory for temporary files. Defaults to None | |
Returns: | |
dict: Dictionary containing evaluation scores and individual frame assessments with keys: | |
- evaluation: Dictionary of averaged scores for each criterion | |
- image_chunks: List of individual frame evaluation results | |
""" | |
with tempfile.TemporaryDirectory(dir=output_folder) as temp_dir: | |
key_frames = extract_key_frames(video_path, temp_dir, num_chunks) | |
prompt = _image_eval.format(description=description) | |
responses = [] | |
for key_frame in key_frames: | |
inputs = _prepare_text_image_inputs(prompt, key_frame) | |
response = model(inputs) | |
response_json = extract_json(response) | |
response_json = convert_score_fields(response_json) | |
responses.append(response_json) | |
criteria = list(responses[0]["evaluation"].keys()) | |
scores_dict = {c: [] for c in criteria} | |
for response in responses: | |
for key, val in response["evaluation"].items(): | |
scores_dict[key].append(val["score"]) | |
res_score = {} | |
for key, scores in scores_dict.items(): | |
res_score[key] = {"score": calculate_geometric_mean(scores)} | |
return { | |
"evaluation": res_score, | |
"image_chunks": responses | |
} | |