Spaces:
Runtime error
Runtime error
import os | |
import json | |
import argparse | |
import tempfile | |
from typing import Dict, List, Union | |
from datetime import datetime | |
from dotenv import load_dotenv | |
from moviepy import VideoFileClip | |
from mllm_tools.litellm import LiteLLMWrapper | |
from mllm_tools.gemini import GeminiWrapper | |
from eval_suite.utils import calculate_geometric_mean | |
from eval_suite.text_utils import parse_srt_to_text, fix_transcript, evaluate_text | |
from eval_suite.video_utils import evaluate_video_chunk_new | |
from eval_suite.image_utils import evaluate_sampled_images | |
load_dotenv() | |
with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "src", "utils", "allowed_models.json")) as f: | |
ALLOWED_MODELS = json.load(f)["allowed_models"] | |
def combine_results(output_folder: str, combined_file: str, results: Dict[str, Dict]) -> None: | |
""" | |
Combine all evaluation results into a single file. | |
Args: | |
output_folder (str): Directory to store the combined file. | |
combined_file (str): Name of the combined file. | |
results (Dict[str, Dict]): Dictionary of evaluation results with file names as keys. | |
Returns: | |
None | |
""" | |
combined_path = os.path.join(output_folder, combined_file) | |
with open(combined_path, 'w') as output_file: | |
json.dump(results, output_file, indent=4) | |
def save_individual_result(output_folder: str, file_name: str, result: Dict) -> None: | |
""" | |
Save individual evaluation result to a file. | |
Args: | |
output_folder (str): Directory to store the evaluation file. | |
file_name (str): Name of the file. | |
result (Dict): Evaluation result. | |
Returns: | |
None | |
""" | |
current_time = datetime.now().strftime("%Y%m%d_%H%M%S") | |
result_file = f"evaluation_{file_name}_{current_time}.json" | |
os.makedirs(output_folder, exist_ok=True) | |
result_path = os.path.join(output_folder, result_file) | |
with open(result_path, 'w') as output_file: | |
json.dump(result, output_file, indent=4) | |
def evaluate_text_file(model, transcript_path, retry_limit): | |
""" | |
Evaluate a text file using the provided model. | |
Args: | |
model: The model to use for evaluation. | |
transcript_path (str): Path to the transcript file (.srt or .txt). | |
retry_limit (int): Number of retry attempts for evaluation. | |
Returns: | |
Dict or None: Evaluation results if successful, None if file format unsupported. | |
""" | |
if not transcript_path.endswith(('.srt', '.txt')): | |
print(f"Skipping {transcript_path}: Unsupported file format for text evaluation.") | |
return None | |
if transcript_path.endswith(".srt"): | |
transcript = parse_srt_to_text(transcript_path) | |
elif transcript_path.endswith(".txt"): | |
with open(transcript_path) as f: | |
transcript = f.read().strip() | |
else: | |
raise ValueError("Unrecognized transcript file format.") | |
capital_letter_proportion = sum(1 for c in transcript if c.isupper()) / sum(1 for c in transcript if c.isalpha()) | |
if capital_letter_proportion < 0.01: | |
transcript = fix_transcript(model, transcript) | |
print(f"Performing text evaluation: {os.path.basename(transcript_path)}") | |
result = evaluate_text(model, transcript, retry_limit) | |
return result | |
def evaluate_video_file(model, video_path, transcript_path, description_path, target_fps=None, output_folder=None): | |
""" | |
Evaluate a video file using the provided model. | |
Args: | |
model: The model to use for evaluation. | |
video_path (str): Path to the video file. | |
transcript_path (str): Path to the transcript file. | |
description_path (str): Path to the description file. | |
target_fps (int, optional): Target frames per second for video processing. | |
output_folder (str, optional): Directory to store output files. | |
Returns: | |
Dict or None: Evaluation results if successful, None if file format unsupported. | |
""" | |
if not video_path.endswith(('.mp4', '.mkv')): | |
print(f"Skipping {video_path}: Unsupported file format for video evaluation.") | |
return None | |
moviepy_temp_dir = os.path.join(output_folder, "moviepy_temp") | |
# Chunking | |
num_chunks = 10 | |
with VideoFileClip(video_path) as clip: | |
duration = clip.duration | |
chunk_duration = duration / num_chunks | |
results = [] | |
# Create a temporary directory in the output_folder | |
temp_dir_parent = output_folder or os.getcwd() | |
with tempfile.TemporaryDirectory(dir=temp_dir_parent) as temp_dir: | |
for i in range(10): | |
start = i * chunk_duration | |
end = min(start + chunk_duration, duration) | |
chunk = clip.subclipped(start, end) | |
chunk_path = os.path.join(temp_dir, f"chunk_{i+1}.mp4") | |
# Explicitly set the temp_audiofile path with matching codec | |
temp_audiofile = os.path.join(moviepy_temp_dir, f"temp_audio_chunk_{i+1}.m4a") | |
chunk.write_videofile( | |
chunk_path, | |
codec="libx264", | |
audio_codec="aac", | |
temp_audiofile=temp_audiofile, | |
audio_bitrate="192k", | |
preset="ultrafast", # Speed up encoding | |
logger=None | |
) | |
# Create processed videos folder inside output_folder | |
processed_videos_dir = os.path.join(output_folder, "processed_videos") | |
save_path = os.path.join(processed_videos_dir, f"processed_chunk_{i+1}.mp4") | |
result = evaluate_video_chunk_new( | |
model, | |
chunk_path, | |
transcript_path, | |
description_path, | |
target_fps=target_fps, | |
save_processed_video=save_path | |
) | |
results.append(result) | |
score_dict = {} | |
for key in results[0]["evaluation"].keys(): | |
score_dict[key] = [] | |
for result in results: | |
score_dict[key].append(result["evaluation"][key]["score"]) | |
evaluation = {} | |
for key, scores in score_dict.items(): | |
evaluation[key] = {"score": calculate_geometric_mean(scores)} | |
result_json = { | |
"evaluation": evaluation, | |
"video_chunks": results | |
} | |
return result_json | |
def extract_scores(data: Union[Dict, List]) -> List[int]: | |
""" | |
Extract all score values from a nested dictionary or list structure. | |
Args: | |
data (Union[Dict, List]): The data structure to extract scores from. | |
Returns: | |
List[int]: List of extracted score values. | |
""" | |
scores = [] | |
if isinstance(data, dict): | |
for key, value in data.items(): | |
if "chunks" in key: | |
continue | |
elif isinstance(value, dict) or isinstance(value, list): | |
scores.extend(extract_scores(value)) | |
elif key == 'score': | |
scores.append(value) | |
elif isinstance(data, list): | |
for item in data: | |
scores.extend(extract_scores(item)) | |
return scores | |
def calculate_overall_score(result: Dict) -> float: | |
""" | |
Calculate the overall score from evaluation results. | |
Args: | |
result (Dict): Dictionary containing evaluation results. | |
Returns: | |
float: The calculated overall score. | |
""" | |
scores = extract_scores(result) | |
overall_score = calculate_geometric_mean(scores) | |
return overall_score | |
def process_topic_name(topic_name: str) -> str: | |
""" | |
Process a topic name by capitalizing words and handling special characters. | |
Args: | |
topic_name (str): The topic name to process. | |
Returns: | |
str: The processed topic name. | |
""" | |
words = topic_name.replace("_s_", "'s_").split("_") | |
return " ".join([word.capitalize() for word in words]) | |
def merge_dicts(dict1: dict, dict2: dict) -> dict: | |
""" | |
Recursively merge two dictionaries. | |
Args: | |
dict1 (dict): First dictionary. | |
dict2 (dict): Second dictionary. | |
Returns: | |
dict: Merged dictionary. | |
""" | |
merged = dict1.copy() | |
for key, value in dict2.items(): | |
if key in merged and isinstance(merged[key], dict) and isinstance(value, dict): | |
merged[key] = merge_dicts(merged[key], value) | |
else: | |
merged[key] = value | |
return merged | |
def process_theorem(models, file_path: str, eval_type: str, retry_limit: int, | |
target_fps: int = None, use_parent_folder_as_topic: bool = False, | |
output_folder: str = None) -> tuple[str, dict]: | |
""" | |
Process a theorem file or directory for evaluation. | |
Args: | |
models: Dictionary of models for different evaluation types. | |
file_path (str): Path to the file or directory to evaluate. | |
eval_type (str): Type of evaluation to perform. | |
retry_limit (int): Number of retry attempts. | |
target_fps (int, optional): Target frames per second for video processing. | |
use_parent_folder_as_topic (bool, optional): Use parent folder name as topic. | |
output_folder (str, optional): Directory to store output files. | |
Returns: | |
tuple[str, dict]: Tuple of file name and evaluation results. | |
""" | |
ext_map = { | |
'text': ('.txt', '.srt'), | |
'video': ('.mp4', '.mkv') | |
} | |
# Handle single file evaluation | |
if os.path.isfile(file_path): | |
file_ext = os.path.splitext(file_path)[1].lower() | |
file_name = os.path.basename(file_path) | |
if eval_type == "text" and file_ext in ext_map['text']: | |
return file_name, evaluate_text_file(models['text'], file_path, retry_limit) | |
elif eval_type == "video" and file_ext in ext_map['video']: | |
if use_parent_folder_as_topic: | |
topic_name = os.path.basename(os.path.dirname(file_path)) | |
else: | |
topic_name = None | |
topic_name = process_topic_name(topic_name) | |
return file_name, evaluate_video_file(models['video'], file_path, None, topic_name, target_fps, output_folder) | |
elif eval_type == "image" and file_ext in ext_map['video']: | |
if use_parent_folder_as_topic: | |
topic_name = os.path.basename(os.path.dirname(file_path)) | |
else: | |
topic_name = None | |
topic_name = process_topic_name(topic_name) | |
return file_name, evaluate_sampled_images(models['image'], file_path, topic_name, num_chunks=10, output_folder=output_folder) | |
elif eval_type == "all": | |
raise ValueError("Evaluation type 'all' is not supported for a single file. Try passing a folder with both a video and a subtitle file.") | |
else: | |
raise ValueError(f"File type of {file_path} does not match evaluation type {eval_type!r}") | |
# Handle directory evaluation | |
theorem_dir = file_path | |
all_files = os.listdir(theorem_dir) | |
# Look for transcript files, prioritizing .srt over .txt if both exist | |
transcript_file_candidates = [f for f in all_files if f.endswith(ext_map['text']) and not f.endswith('_scene_outline.txt')] | |
srt_files = [f for f in transcript_file_candidates if f.endswith('.srt')] | |
txt_files = [f for f in transcript_file_candidates if f.endswith('.txt')] | |
transcript_path = None | |
if srt_files: | |
transcript_path = os.path.join(theorem_dir, srt_files[0]) | |
elif txt_files: | |
transcript_path = os.path.join(theorem_dir, txt_files[0]) | |
video_file_candidates = [f for f in all_files if f.endswith(ext_map['video'])] | |
video_path = os.path.join(theorem_dir, video_file_candidates[0]) if len(video_file_candidates) == 1 else None | |
topic_name = os.path.basename(theorem_dir) | |
topic_name = process_topic_name(topic_name) | |
if not video_path: | |
print(f"Skipping {theorem_dir}: No video file found") | |
return None, None | |
text_result = video_result = image_result = None | |
if eval_type == "text" or eval_type == "all": | |
if transcript_path is None: | |
print(f"Warning: No suitable transcript file found in {theorem_dir}") | |
else: | |
text_result = evaluate_text_file(models['text'], transcript_path, retry_limit) | |
if eval_type == "video" or eval_type == "all": | |
assert video_path is not None, f"Expected 1 video file, got {len(video_file_candidates)} for {theorem_dir}" | |
video_result = evaluate_video_file(models['video'], video_path, transcript_path, topic_name, target_fps, output_folder) | |
if eval_type == "image" or eval_type == "all": | |
assert video_path is not None, f"Expected 1 video file, got {len(video_file_candidates)} for {theorem_dir}" | |
image_result = evaluate_sampled_images(models['image'], video_path, topic_name, num_chunks=10, output_folder=output_folder) | |
if eval_type == "all": | |
result = {} | |
if text_result: | |
result = merge_dicts(result, text_result) | |
if video_result: | |
result = merge_dicts(result, video_result) | |
if image_result: | |
result = merge_dicts(result, image_result) | |
if result: | |
result["evaluation"]["overall_score"] = calculate_overall_score(result) | |
else: | |
result = text_result if eval_type == "text" else video_result if eval_type == "video" else image_result if eval_type == "image" else None | |
file_name = os.path.basename(theorem_dir) | |
return file_name, result | |
def main(): | |
""" | |
Main function to run the evaluation script. | |
Parses command line arguments and orchestrates the evaluation process | |
for text, video, and image content using specified AI models. | |
""" | |
parser = argparse.ArgumentParser(description='Automatic evaluation of theorem explanation videos with LLMs') | |
parser.add_argument('--model_text', type=str, | |
choices=ALLOWED_MODELS, | |
default='azure/gpt-4o', | |
help='Select the AI model to use for text evaluation') | |
parser.add_argument('--model_video', type=str, | |
choices=['gemini/gemini-1.5-pro-002', | |
'gemini/gemini-2.0-flash-exp', | |
'gemini/gemini-2.0-pro-exp-02-05'], | |
default='gemini/gemini-1.5-pro-002', | |
help='Select the AI model to use for video evaluation') | |
parser.add_argument('--model_image', type=str, | |
choices=ALLOWED_MODELS, | |
default='azure/gpt-4o', | |
help='Select the AI model to use for image evaluation') | |
parser.add_argument('--eval_type', type=str, choices=['text', 'video', 'image', 'all'], default='all', help='Type of evaluation to perform') | |
parser.add_argument('--file_path', type=str, help='Path to a file or a theorem folder', required=True) | |
parser.add_argument('--output_folder', type=str, help='Directory to store the evaluation files', required=True) | |
parser.add_argument('--retry_limit', type=int, default=3, help='Number of retry attempts for each inference') | |
parser.add_argument('--combine', action='store_true', help='Combine all results into a single JSON file') | |
parser.add_argument('--bulk_evaluate', action='store_true', help='Evaluate a folder of theorems together', default=False) | |
parser.add_argument('--target_fps', type=int, help='Target FPS for video processing. If not set, original video FPS will be used', required=False) | |
parser.add_argument('--use_parent_folder_as_topic', action='store_true', help='Use parent folder name as topic name for single file evaluation', default=True) | |
parser.add_argument('--max_workers', type=int, default=4, help='Maximum number of concurrent workers for parallel processing') | |
args = parser.parse_args() | |
# Initialize separate models | |
text_model = LiteLLMWrapper( | |
model_name=args.model_text, | |
temperature=0.0, | |
) | |
video_model = GeminiWrapper( | |
model_name=args.model_video, | |
temperature=0.0, | |
) | |
image_model = LiteLLMWrapper( | |
model_name=args.model_image, | |
temperature=0.0, | |
) | |
models = { | |
'text': text_model, | |
'video': video_model, | |
'image': image_model | |
} | |
theorem_dirs = [] | |
if args.bulk_evaluate: | |
assert os.path.isdir(args.file_path), "File path must be a folder for --bulk_evaluate" | |
for root, dirnames, _ in os.walk(args.file_path): | |
if not any(f.endswith(".mp4") for f in os.listdir(root)): | |
continue | |
theorem_dirs.append(root) | |
elif os.path.isdir(args.file_path): | |
assert any(f.endswith(".mp4") for f in os.listdir(args.file_path)), "The provided folder must contain a video file" | |
theorem_dirs.append(args.file_path) | |
# Create output directory and its temp subdirectories if it doesn't exist | |
os.makedirs(args.output_folder, exist_ok=True) | |
moviepy_temp_dir = os.path.join(args.output_folder, "moviepy_temp") | |
os.makedirs(moviepy_temp_dir, exist_ok=True) | |
VideoFileClip.DEFAULT_TEMP_DIR = moviepy_temp_dir | |
processed_videos_dir = os.path.join(args.output_folder, "processed_videos") | |
os.makedirs(processed_videos_dir, exist_ok=True) | |
results = {} | |
if theorem_dirs: | |
for theorem_dir in theorem_dirs: | |
file_name, result = process_theorem( | |
models, | |
theorem_dir, | |
args.eval_type, | |
args.retry_limit, | |
args.target_fps, | |
args.use_parent_folder_as_topic, | |
args.output_folder | |
) | |
if result is not None: | |
results[file_name] = result | |
if not args.combine: | |
save_individual_result(args.output_folder, file_name, result) | |
else: | |
file_name, result = process_theorem( | |
models, | |
args.file_path, | |
args.eval_type, | |
args.retry_limit, | |
args.target_fps, | |
args.use_parent_folder_as_topic, | |
args.output_folder | |
) | |
if result is not None: | |
results[file_name] = result | |
if not args.combine: | |
save_individual_result(args.output_folder, file_name, result) | |
if args.combine: | |
if len(results) > 1: | |
current_time = datetime.now().strftime("%Y%m%d_%H%M%S") | |
combined_file = f"evaluation_{current_time}.json" | |
combine_results(args.output_folder, combined_file, results) | |
print("Combining results completed.") | |
else: | |
for file_name, result in results.items(): | |
save_individual_result(args.output_folder, file_name, result) | |
os.rmdir(moviepy_temp_dir) | |
if __name__ == "__main__": | |
main() | |