TheoremExplainAgent / src /core /video_renderer.py
dfdfdsfgs's picture
Upload project files
d9486d1
raw
history blame
22 kB
import os
import re
import subprocess
import asyncio
from PIL import Image
from typing import Optional, List
import traceback
import sys
from src.core.parse_video import (
get_images_from_video,
image_with_most_non_black_space
)
from mllm_tools.vertex_ai import VertexAIWrapper
from mllm_tools.gemini import GeminiWrapper
class VideoRenderer:
"""Class for rendering and combining Manim animation videos."""
def __init__(self, output_dir="output", print_response=False, use_visual_fix_code=False):
"""Initialize the VideoRenderer.
Args:
output_dir (str, optional): Directory for output files. Defaults to "output".
print_response (bool, optional): Whether to print responses. Defaults to False.
use_visual_fix_code (bool, optional): Whether to use visual fix code. Defaults to False.
"""
self.output_dir = output_dir
self.print_response = print_response
self.use_visual_fix_code = use_visual_fix_code
async def render_scene(self, code: str, file_prefix: str, curr_scene: int, curr_version: int, code_dir: str, media_dir: str, max_retries: int = 3, use_visual_fix_code=False, visual_self_reflection_func=None, banned_reasonings=None, scene_trace_id=None, topic=None, session_id=None):
"""Render a single scene and handle error retries and visual fixes.
Args:
code (str): The Manim code to render
file_prefix (str): Prefix for output files
curr_scene (int): Current scene number
curr_version (int): Current version number
code_dir (str): Directory for code files
media_dir (str): Directory for media output
max_retries (int, optional): Maximum retry attempts. Defaults to 3.
use_visual_fix_code (bool, optional): Whether to use visual fix code. Defaults to False.
visual_self_reflection_func (callable, optional): Function for visual self-reflection. Defaults to None.
banned_reasonings (list, optional): List of banned reasoning strings. Defaults to None.
scene_trace_id (str, optional): Scene trace identifier. Defaults to None.
topic (str, optional): Topic name. Defaults to None.
session_id (str, optional): Session identifier. Defaults to None.
Returns:
tuple: (code, error_message) where error_message is None on success
"""
retries = 0
while retries < max_retries:
try:
# Execute manim in a thread to prevent blocking
file_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py")
result = await asyncio.to_thread(
subprocess.run,
["manim", "-qh", file_path, "--media_dir", media_dir, "--progress_bar", "none"],
capture_output=True,
text=True
)
# if result.returncode != 0, it means that the code is not rendered successfully
# so we need to fix the code by returning the code and the error message
if result.returncode != 0:
raise Exception(result.stderr)
if use_visual_fix_code and visual_self_reflection_func and banned_reasonings:
# Get the rendered video path
video_path = os.path.join(
media_dir,
"videos",
f"{file_prefix}_scene{curr_scene}_v{curr_version}.mp4"
)
# For Gemini/Vertex AI models, pass the video directly
if self.scene_model.model_name.startswith(('gemini/', 'vertex_ai/')):
media_input = video_path
else:
# For other models, use image snapshot
media_input = self.create_snapshot_scene(
topic, curr_scene, curr_version, return_type="path"
)
new_code, log = visual_self_reflection_func(
code,
media_input,
scene_trace_id=scene_trace_id,
topic=topic,
scene_number=curr_scene,
session_id=session_id
)
with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_vfix_log.txt"), "w") as f:
f.write(log)
# Check for termination markers
if "<LGTM>" in new_code or any(word in new_code for word in banned_reasonings):
break
code = new_code
curr_version += 1
with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py"), "w") as f:
f.write(code)
print(f"Code saved to scene{curr_scene}/code/{file_prefix}_scene{curr_scene}_v{curr_version}.py")
retries = 0
continue
break # Exit retry loop on success
except Exception as e:
print(f"Error: {e}")
print(f"Retrying {retries+1} of {max_retries}...")
with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_error.log"), "a") as f:
f.write(f"\nError in attempt {retries}:\n{str(e)}\n")
retries += 1
return code, str(e) # Indicate failure and return error message
print(f"Successfully rendered {file_path}")
with open(os.path.join(self.output_dir, file_prefix, f"scene{curr_scene}", "succ_rendered.txt"), "w") as f:
f.write("")
return code, None # Indicate success
def run_manim_process(self,
topic: str):
"""Run manim on all generated manim code for a specific topic.
Args:
topic (str): Topic name to process
Returns:
subprocess.CompletedProcess: Result of the final manim process
"""
file_prefix = topic.lower()
file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
search_path = os.path.join(self.output_dir, file_prefix)
# Iterate through scene folders
scene_folders = [f for f in os.listdir(search_path) if os.path.isdir(os.path.join(search_path, f))]
scene_folders.sort() # Sort to process scenes in order
for folder in scene_folders:
folder_path = os.path.join(search_path, folder)
# Get all Python files in version order
py_files = [f for f in os.listdir(folder_path) if f.endswith('.py')]
py_files.sort(key=lambda x: int(x.split('_v')[-1].split('.')[0])) # Sort by version number
for file in py_files:
file_path = os.path.join(folder_path, file)
try:
media_dir = os.path.join(self.output_dir, file_prefix, "media")
result = subprocess.run(
f"manim -qh {file_path} --media_dir {media_dir}",
shell=True,
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(result.stderr)
print(f"Successfully rendered {file}")
break # Move to next scene folder if successful
except Exception as e:
print(f"Error rendering {file}: {e}")
error_log_path = os.path.join(folder_path, f"{file.split('.')[0]}_error.log") # drop the extra py
with open(error_log_path, "w") as f:
f.write(f"Error:\n{str(e)}\n")
print(f"Error log saved to {error_log_path}")
return result
def create_snapshot_scene(self, topic: str, scene_number: int, version_number: int, return_type: str = "image"):
"""Create a snapshot of the video for a specific topic and scene.
Args:
topic (str): Topic name
scene_number (int): Scene number
version_number (int): Version number
return_type (str, optional): Type of return value - "path" or "image". Defaults to "image".
Returns:
Union[str, PIL.Image]: Path to saved image or PIL Image object
Raises:
FileNotFoundError: If no mp4 files found in video folder
"""
file_prefix = topic.lower()
file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
search_path = os.path.join(self.output_dir, file_prefix)
video_folder_path = os.path.join(search_path, "media", "videos", f"{file_prefix}_scene{scene_number}_v{version_number}", "1080p60")
os.makedirs(video_folder_path, exist_ok=True)
snapshot_path = os.path.join(video_folder_path, "snapshot.png")
# Get the mp4 video file from the video folder path
video_files = [f for f in os.listdir(video_folder_path) if f.endswith('.mp4')]
if not video_files:
raise FileNotFoundError(f"No mp4 files found in {video_folder_path}")
video_path = os.path.join(video_folder_path, video_files[0])
saved_image = image_with_most_non_black_space(get_images_from_video(video_path), snapshot_path, return_type=return_type)
return saved_image
def combine_videos(self, topic: str):
"""Combine all videos and subtitle files for a specific topic using ffmpeg.
Args:
topic (str): Topic name to combine videos for
This function will:
- Find all scene videos and subtitles
- Combine videos with or without audio
- Merge subtitle files with correct timing
- Save combined video and subtitles to output directory
"""
file_prefix = topic.lower()
file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
search_path = os.path.join(self.output_dir, file_prefix, "media", "videos")
# Create output directory if it doesn't exist
video_output_dir = os.path.join(self.output_dir, file_prefix)
os.makedirs(video_output_dir, exist_ok=True)
output_video_path = os.path.join(video_output_dir, f"{file_prefix}_combined.mp4")
output_srt_path = os.path.join(video_output_dir, f"{file_prefix}_combined.srt")
if os.path.exists(output_video_path) and os.path.exists(output_srt_path):
print(f"Combined video and subtitles already exist at {output_video_path}, not combining again.")
return
# Get scene count from outline
scene_outline_path = os.path.join(self.output_dir, file_prefix, f"{file_prefix}_scene_outline.txt")
if not os.path.exists(scene_outline_path):
print(f"Warning: Scene outline file not found at {scene_outline_path}. Cannot determine scene count.")
return
with open(scene_outline_path) as f:
plan = f.read()
scene_outline = re.search(r'(<SCENE_OUTLINE>.*?</SCENE_OUTLINE>)', plan, re.DOTALL).group(1)
scene_count = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline))
# Find all scene folders and videos
scene_folders = []
for root, dirs, files in os.walk(search_path):
for dir in dirs:
if dir.startswith(file_prefix + "_scene"):
scene_folders.append(os.path.join(root, dir))
scene_videos = []
scene_subtitles = []
for scene_num in range(1, scene_count + 1):
folders = [f for f in scene_folders if int(f.split("scene")[-1].split("_")[0]) == scene_num]
if not folders:
print(f"Warning: Missing scene {scene_num}")
continue
folders.sort(key=lambda f: int(f.split("_v")[-1]))
folder = folders[-1]
video_found = False
subtitles_found = False
for filename in os.listdir(os.path.join(folder, "1080p60")):
if filename.endswith('.mp4'):
scene_videos.append(os.path.join(folder, "1080p60", filename))
video_found = True
elif filename.endswith('.srt'):
scene_subtitles.append(os.path.join(folder, "1080p60", filename))
subtitles_found = True
if not video_found:
print(f"Warning: Missing video for scene {scene_num}")
if not subtitles_found:
scene_subtitles.append(None)
if len(scene_videos) != scene_count:
print("Not all videos/subtitles are found, aborting video combination.")
return
try:
import ffmpeg # You might need to install ffmpeg-python package: pip install ffmpeg-python
from tqdm import tqdm
print("Analyzing video streams...")
# Check if videos have audio streams
has_audio = []
for video in tqdm(scene_videos, desc="Checking audio streams"):
probe = ffmpeg.probe(video)
audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio']
has_audio.append(len(audio_streams) > 0)
print("Preparing video combination...")
# If any video has audio, we need to ensure all videos have audio streams
if any(has_audio):
# Create list to store video and audio streams
streams = []
for video, has_aud in tqdm(list(zip(scene_videos, has_audio)), desc="Processing videos"):
if has_aud:
# Video has audio, use as is
input_vid = ffmpeg.input(video)
streams.extend([input_vid['v'], input_vid['a']])
else:
# Video lacks audio, add silent audio
input_vid = ffmpeg.input(video)
# Generate silent audio for the duration of the video
probe = ffmpeg.probe(video)
duration = float(probe['streams'][0]['duration'])
silent_audio = ffmpeg.input(f'anullsrc=channel_layout=stereo:sample_rate=44100',
f='lavfi', t=duration)['a']
streams.extend([input_vid['v'], silent_audio])
print("Combining videos with audio...")
try:
# Concatenate all streams using optimized CPU encoding settings
concat = ffmpeg.concat(*streams, v=1, a=1, unsafe=True)
process = (
concat
.output(output_video_path,
**{'c:v': 'libx264',
'c:a': 'aac',
'preset': 'veryfast', # Changed from ultrafast for better speed/quality balance
'crf': '28', # Same quality setting
'threads': '0', # Use all CPU threads
'tune': 'fastdecode', # Optimize for decoding speed
'profile:v': 'baseline', # Simpler profile for faster encoding
'level': '4.0',
'x264-params': 'aq-mode=0:no-deblock:no-cabac:ref=1:subme=0:trellis=0:weightp=0', # Added aggressive speed optimizations
'movflags': '+faststart',
'stats': None,
'progress': 'pipe:1'})
.overwrite_output()
.run_async(pipe_stdout=True, pipe_stderr=True)
)
# Process progress output
while True:
line = process.stdout.readline().decode('utf-8')
if not line:
break
if 'frame=' in line:
sys.stdout.write('\rProcessing: ' + line.strip())
sys.stdout.flush()
# Wait for the process to complete and capture output
stdout, stderr = process.communicate()
print("\nEncoding complete!")
except ffmpeg.Error as e:
print(f"FFmpeg stdout:\n{e.stdout.decode('utf8')}")
print(f"FFmpeg stderr:\n{e.stderr.decode('utf8')}")
raise
else:
# No videos have audio, concatenate video streams only
streams = []
for video in tqdm(scene_videos, desc="Processing videos"):
streams.append(ffmpeg.input(video)['v'])
print("Combining videos without audio...")
try:
concat = ffmpeg.concat(*streams, v=1, unsafe=True)
process = (
concat
.output(output_video_path,
**{'c:v': 'libx264',
'preset': 'medium',
'crf': '23',
'stats': None, # Enable progress stats
'progress': 'pipe:1'}) # Output progress to pipe
.overwrite_output()
.run_async(pipe_stdout=True, pipe_stderr=True)
)
# Process progress output
while True:
line = process.stdout.readline().decode('utf-8')
if not line:
break
if 'frame=' in line:
sys.stdout.write('\rProcessing: ' + line.strip())
sys.stdout.flush()
# Wait for the process to complete and capture output
stdout, stderr = process.communicate()
print("\nEncoding complete!")
except ffmpeg.Error as e:
print(f"FFmpeg stdout:\n{e.stdout.decode('utf8')}")
print(f"FFmpeg stderr:\n{e.stderr.decode('utf8')}")
raise
print(f"Successfully combined videos into {output_video_path}")
# Handle subtitle combination (existing subtitle code remains the same)
if scene_subtitles:
with open(output_srt_path, 'w', encoding='utf-8') as outfile:
current_time_offset = 0
subtitle_index = 1
for srt_file, video_file in zip(scene_subtitles, scene_videos):
if srt_file is None:
continue
with open(srt_file, 'r', encoding='utf-8') as infile:
lines = infile.readlines()
i = 0
while i < len(lines):
line = lines[i].strip()
if line.isdigit(): # Subtitle index
outfile.write(f"{subtitle_index}\n")
subtitle_index += 1
i += 1
# Time codes line
time_line = lines[i].strip()
start_time, end_time = time_line.split(' --> ')
# Convert time codes and add offset
def adjust_time(time_str, offset):
h, m, s = time_str.replace(',', '.').split(':')
total_seconds = float(h) * 3600 + float(m) * 60 + float(s) + offset
h = int(total_seconds // 3600)
m = int((total_seconds % 3600) // 60)
s = total_seconds % 60
return f"{h:02d}:{m:02d}:{s:06.3f}".replace('.', ',')
new_start = adjust_time(start_time, current_time_offset)
new_end = adjust_time(end_time, current_time_offset)
outfile.write(f"{new_start} --> {new_end}\n")
i += 1
# Subtitle text (could be multiple lines)
while i < len(lines) and lines[i].strip():
outfile.write(lines[i])
i += 1
outfile.write('\n')
else:
i += 1
# Update time offset using ffprobe
probe = ffmpeg.probe(video_file)
duration = float(probe['streams'][0]['duration'])
current_time_offset += duration
print(f"Successfully combined videos into {output_video_path}")
if scene_subtitles:
print(f"Successfully combined subtitles into {output_srt_path}")
except Exception as e:
print(f"Error combining videos and subtitles: {e}")
traceback.print_exc()