TheoremExplainAgent / src /core /video_renderer.py
dfdfdsfgs's picture
Upload project files
d9486d1
import os
import re
import subprocess
import asyncio
from PIL import Image
from typing import Optional, List
import traceback
import sys
from src.core.parse_video import (
get_images_from_video,
image_with_most_non_black_space
)
from mllm_tools.vertex_ai import VertexAIWrapper
from mllm_tools.gemini import GeminiWrapper
class VideoRenderer:
"""Class for rendering and combining Manim animation videos."""
def __init__(self, output_dir="output", print_response=False, use_visual_fix_code=False):
"""Initialize the VideoRenderer.
Args:
output_dir (str, optional): Directory for output files. Defaults to "output".
print_response (bool, optional): Whether to print responses. Defaults to False.
use_visual_fix_code (bool, optional): Whether to use visual fix code. Defaults to False.
"""
self.output_dir = output_dir
self.print_response = print_response
self.use_visual_fix_code = use_visual_fix_code
async def render_scene(self, code: str, file_prefix: str, curr_scene: int, curr_version: int, code_dir: str, media_dir: str, max_retries: int = 3, use_visual_fix_code=False, visual_self_reflection_func=None, banned_reasonings=None, scene_trace_id=None, topic=None, session_id=None):
"""Render a single scene and handle error retries and visual fixes.
Args:
code (str): The Manim code to render
file_prefix (str): Prefix for output files
curr_scene (int): Current scene number
curr_version (int): Current version number
code_dir (str): Directory for code files
media_dir (str): Directory for media output
max_retries (int, optional): Maximum retry attempts. Defaults to 3.
use_visual_fix_code (bool, optional): Whether to use visual fix code. Defaults to False.
visual_self_reflection_func (callable, optional): Function for visual self-reflection. Defaults to None.
banned_reasonings (list, optional): List of banned reasoning strings. Defaults to None.
scene_trace_id (str, optional): Scene trace identifier. Defaults to None.
topic (str, optional): Topic name. Defaults to None.
session_id (str, optional): Session identifier. Defaults to None.
Returns:
tuple: (code, error_message) where error_message is None on success
"""
retries = 0
while retries < max_retries:
try:
# Execute manim in a thread to prevent blocking
file_path = os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py")
result = await asyncio.to_thread(
subprocess.run,
["manim", "-qh", file_path, "--media_dir", media_dir, "--progress_bar", "none"],
capture_output=True,
text=True
)
# if result.returncode != 0, it means that the code is not rendered successfully
# so we need to fix the code by returning the code and the error message
if result.returncode != 0:
raise Exception(result.stderr)
if use_visual_fix_code and visual_self_reflection_func and banned_reasonings:
# Get the rendered video path
video_path = os.path.join(
media_dir,
"videos",
f"{file_prefix}_scene{curr_scene}_v{curr_version}.mp4"
)
# For Gemini/Vertex AI models, pass the video directly
if self.scene_model.model_name.startswith(('gemini/', 'vertex_ai/')):
media_input = video_path
else:
# For other models, use image snapshot
media_input = self.create_snapshot_scene(
topic, curr_scene, curr_version, return_type="path"
)
new_code, log = visual_self_reflection_func(
code,
media_input,
scene_trace_id=scene_trace_id,
topic=topic,
scene_number=curr_scene,
session_id=session_id
)
with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_vfix_log.txt"), "w") as f:
f.write(log)
# Check for termination markers
if "<LGTM>" in new_code or any(word in new_code for word in banned_reasonings):
break
code = new_code
curr_version += 1
with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}.py"), "w") as f:
f.write(code)
print(f"Code saved to scene{curr_scene}/code/{file_prefix}_scene{curr_scene}_v{curr_version}.py")
retries = 0
continue
break # Exit retry loop on success
except Exception as e:
print(f"Error: {e}")
print(f"Retrying {retries+1} of {max_retries}...")
with open(os.path.join(code_dir, f"{file_prefix}_scene{curr_scene}_v{curr_version}_error.log"), "a") as f:
f.write(f"\nError in attempt {retries}:\n{str(e)}\n")
retries += 1
return code, str(e) # Indicate failure and return error message
print(f"Successfully rendered {file_path}")
with open(os.path.join(self.output_dir, file_prefix, f"scene{curr_scene}", "succ_rendered.txt"), "w") as f:
f.write("")
return code, None # Indicate success
def run_manim_process(self,
topic: str):
"""Run manim on all generated manim code for a specific topic.
Args:
topic (str): Topic name to process
Returns:
subprocess.CompletedProcess: Result of the final manim process
"""
file_prefix = topic.lower()
file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
search_path = os.path.join(self.output_dir, file_prefix)
# Iterate through scene folders
scene_folders = [f for f in os.listdir(search_path) if os.path.isdir(os.path.join(search_path, f))]
scene_folders.sort() # Sort to process scenes in order
for folder in scene_folders:
folder_path = os.path.join(search_path, folder)
# Get all Python files in version order
py_files = [f for f in os.listdir(folder_path) if f.endswith('.py')]
py_files.sort(key=lambda x: int(x.split('_v')[-1].split('.')[0])) # Sort by version number
for file in py_files:
file_path = os.path.join(folder_path, file)
try:
media_dir = os.path.join(self.output_dir, file_prefix, "media")
result = subprocess.run(
f"manim -qh {file_path} --media_dir {media_dir}",
shell=True,
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(result.stderr)
print(f"Successfully rendered {file}")
break # Move to next scene folder if successful
except Exception as e:
print(f"Error rendering {file}: {e}")
error_log_path = os.path.join(folder_path, f"{file.split('.')[0]}_error.log") # drop the extra py
with open(error_log_path, "w") as f:
f.write(f"Error:\n{str(e)}\n")
print(f"Error log saved to {error_log_path}")
return result
def create_snapshot_scene(self, topic: str, scene_number: int, version_number: int, return_type: str = "image"):
"""Create a snapshot of the video for a specific topic and scene.
Args:
topic (str): Topic name
scene_number (int): Scene number
version_number (int): Version number
return_type (str, optional): Type of return value - "path" or "image". Defaults to "image".
Returns:
Union[str, PIL.Image]: Path to saved image or PIL Image object
Raises:
FileNotFoundError: If no mp4 files found in video folder
"""
file_prefix = topic.lower()
file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
search_path = os.path.join(self.output_dir, file_prefix)
video_folder_path = os.path.join(search_path, "media", "videos", f"{file_prefix}_scene{scene_number}_v{version_number}", "1080p60")
os.makedirs(video_folder_path, exist_ok=True)
snapshot_path = os.path.join(video_folder_path, "snapshot.png")
# Get the mp4 video file from the video folder path
video_files = [f for f in os.listdir(video_folder_path) if f.endswith('.mp4')]
if not video_files:
raise FileNotFoundError(f"No mp4 files found in {video_folder_path}")
video_path = os.path.join(video_folder_path, video_files[0])
saved_image = image_with_most_non_black_space(get_images_from_video(video_path), snapshot_path, return_type=return_type)
return saved_image
def combine_videos(self, topic: str):
"""Combine all videos and subtitle files for a specific topic using ffmpeg.
Args:
topic (str): Topic name to combine videos for
This function will:
- Find all scene videos and subtitles
- Combine videos with or without audio
- Merge subtitle files with correct timing
- Save combined video and subtitles to output directory
"""
file_prefix = topic.lower()
file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
search_path = os.path.join(self.output_dir, file_prefix, "media", "videos")
# Create output directory if it doesn't exist
video_output_dir = os.path.join(self.output_dir, file_prefix)
os.makedirs(video_output_dir, exist_ok=True)
output_video_path = os.path.join(video_output_dir, f"{file_prefix}_combined.mp4")
output_srt_path = os.path.join(video_output_dir, f"{file_prefix}_combined.srt")
if os.path.exists(output_video_path) and os.path.exists(output_srt_path):
print(f"Combined video and subtitles already exist at {output_video_path}, not combining again.")
return
# Get scene count from outline
scene_outline_path = os.path.join(self.output_dir, file_prefix, f"{file_prefix}_scene_outline.txt")
if not os.path.exists(scene_outline_path):
print(f"Warning: Scene outline file not found at {scene_outline_path}. Cannot determine scene count.")
return
with open(scene_outline_path) as f:
plan = f.read()
scene_outline = re.search(r'(<SCENE_OUTLINE>.*?</SCENE_OUTLINE>)', plan, re.DOTALL).group(1)
scene_count = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline))
# Find all scene folders and videos
scene_folders = []
for root, dirs, files in os.walk(search_path):
for dir in dirs:
if dir.startswith(file_prefix + "_scene"):
scene_folders.append(os.path.join(root, dir))
scene_videos = []
scene_subtitles = []
for scene_num in range(1, scene_count + 1):
folders = [f for f in scene_folders if int(f.split("scene")[-1].split("_")[0]) == scene_num]
if not folders:
print(f"Warning: Missing scene {scene_num}")
continue
folders.sort(key=lambda f: int(f.split("_v")[-1]))
folder = folders[-1]
video_found = False
subtitles_found = False
for filename in os.listdir(os.path.join(folder, "1080p60")):
if filename.endswith('.mp4'):
scene_videos.append(os.path.join(folder, "1080p60", filename))
video_found = True
elif filename.endswith('.srt'):
scene_subtitles.append(os.path.join(folder, "1080p60", filename))
subtitles_found = True
if not video_found:
print(f"Warning: Missing video for scene {scene_num}")
if not subtitles_found:
scene_subtitles.append(None)
if len(scene_videos) != scene_count:
print("Not all videos/subtitles are found, aborting video combination.")
return
try:
import ffmpeg # You might need to install ffmpeg-python package: pip install ffmpeg-python
from tqdm import tqdm
print("Analyzing video streams...")
# Check if videos have audio streams
has_audio = []
for video in tqdm(scene_videos, desc="Checking audio streams"):
probe = ffmpeg.probe(video)
audio_streams = [stream for stream in probe['streams'] if stream['codec_type'] == 'audio']
has_audio.append(len(audio_streams) > 0)
print("Preparing video combination...")
# If any video has audio, we need to ensure all videos have audio streams
if any(has_audio):
# Create list to store video and audio streams
streams = []
for video, has_aud in tqdm(list(zip(scene_videos, has_audio)), desc="Processing videos"):
if has_aud:
# Video has audio, use as is
input_vid = ffmpeg.input(video)
streams.extend([input_vid['v'], input_vid['a']])
else:
# Video lacks audio, add silent audio
input_vid = ffmpeg.input(video)
# Generate silent audio for the duration of the video
probe = ffmpeg.probe(video)
duration = float(probe['streams'][0]['duration'])
silent_audio = ffmpeg.input(f'anullsrc=channel_layout=stereo:sample_rate=44100',
f='lavfi', t=duration)['a']
streams.extend([input_vid['v'], silent_audio])
print("Combining videos with audio...")
try:
# Concatenate all streams using optimized CPU encoding settings
concat = ffmpeg.concat(*streams, v=1, a=1, unsafe=True)
process = (
concat
.output(output_video_path,
**{'c:v': 'libx264',
'c:a': 'aac',
'preset': 'veryfast', # Changed from ultrafast for better speed/quality balance
'crf': '28', # Same quality setting
'threads': '0', # Use all CPU threads
'tune': 'fastdecode', # Optimize for decoding speed
'profile:v': 'baseline', # Simpler profile for faster encoding
'level': '4.0',
'x264-params': 'aq-mode=0:no-deblock:no-cabac:ref=1:subme=0:trellis=0:weightp=0', # Added aggressive speed optimizations
'movflags': '+faststart',
'stats': None,
'progress': 'pipe:1'})
.overwrite_output()
.run_async(pipe_stdout=True, pipe_stderr=True)
)
# Process progress output
while True:
line = process.stdout.readline().decode('utf-8')
if not line:
break
if 'frame=' in line:
sys.stdout.write('\rProcessing: ' + line.strip())
sys.stdout.flush()
# Wait for the process to complete and capture output
stdout, stderr = process.communicate()
print("\nEncoding complete!")
except ffmpeg.Error as e:
print(f"FFmpeg stdout:\n{e.stdout.decode('utf8')}")
print(f"FFmpeg stderr:\n{e.stderr.decode('utf8')}")
raise
else:
# No videos have audio, concatenate video streams only
streams = []
for video in tqdm(scene_videos, desc="Processing videos"):
streams.append(ffmpeg.input(video)['v'])
print("Combining videos without audio...")
try:
concat = ffmpeg.concat(*streams, v=1, unsafe=True)
process = (
concat
.output(output_video_path,
**{'c:v': 'libx264',
'preset': 'medium',
'crf': '23',
'stats': None, # Enable progress stats
'progress': 'pipe:1'}) # Output progress to pipe
.overwrite_output()
.run_async(pipe_stdout=True, pipe_stderr=True)
)
# Process progress output
while True:
line = process.stdout.readline().decode('utf-8')
if not line:
break
if 'frame=' in line:
sys.stdout.write('\rProcessing: ' + line.strip())
sys.stdout.flush()
# Wait for the process to complete and capture output
stdout, stderr = process.communicate()
print("\nEncoding complete!")
except ffmpeg.Error as e:
print(f"FFmpeg stdout:\n{e.stdout.decode('utf8')}")
print(f"FFmpeg stderr:\n{e.stderr.decode('utf8')}")
raise
print(f"Successfully combined videos into {output_video_path}")
# Handle subtitle combination (existing subtitle code remains the same)
if scene_subtitles:
with open(output_srt_path, 'w', encoding='utf-8') as outfile:
current_time_offset = 0
subtitle_index = 1
for srt_file, video_file in zip(scene_subtitles, scene_videos):
if srt_file is None:
continue
with open(srt_file, 'r', encoding='utf-8') as infile:
lines = infile.readlines()
i = 0
while i < len(lines):
line = lines[i].strip()
if line.isdigit(): # Subtitle index
outfile.write(f"{subtitle_index}\n")
subtitle_index += 1
i += 1
# Time codes line
time_line = lines[i].strip()
start_time, end_time = time_line.split(' --> ')
# Convert time codes and add offset
def adjust_time(time_str, offset):
h, m, s = time_str.replace(',', '.').split(':')
total_seconds = float(h) * 3600 + float(m) * 60 + float(s) + offset
h = int(total_seconds // 3600)
m = int((total_seconds % 3600) // 60)
s = total_seconds % 60
return f"{h:02d}:{m:02d}:{s:06.3f}".replace('.', ',')
new_start = adjust_time(start_time, current_time_offset)
new_end = adjust_time(end_time, current_time_offset)
outfile.write(f"{new_start} --> {new_end}\n")
i += 1
# Subtitle text (could be multiple lines)
while i < len(lines) and lines[i].strip():
outfile.write(lines[i])
i += 1
outfile.write('\n')
else:
i += 1
# Update time offset using ffprobe
probe = ffmpeg.probe(video_file)
duration = float(probe['streams'][0]['duration'])
current_time_offset += duration
print(f"Successfully combined videos into {output_video_path}")
if scene_subtitles:
print(f"Successfully combined subtitles into {output_srt_path}")
except Exception as e:
print(f"Error combining videos and subtitles: {e}")
traceback.print_exc()