|
import re |
|
import subprocess |
|
import os |
|
import glob |
|
import logging |
|
import platform |
|
|
|
|
|
def get_scene_name(manim_code): |
|
"""Extracts the scene class name from Manim code.""" |
|
|
|
match = re.search( |
|
r"class\s+(\w+)\s*\(\s*(?:ThreeD|Multi)?[Ss]cene\s*\)", manim_code |
|
) |
|
if match: |
|
return match.group(1) |
|
raise ValueError("No Scene class found in generated code") |
|
|
|
|
|
def sanitize_path_for_ffmpeg(path: str) -> str: |
|
if platform.system() == "Windows": |
|
|
|
return path.replace("\\", "\\\\").replace(":", "\\:") |
|
else: |
|
|
|
return ( |
|
path.replace("'", "'\\''") |
|
.replace(":", "\\:") |
|
.replace(",", "\\,") |
|
.replace("[", "\\[") |
|
.replace("]", "\\]") |
|
) |
|
|
|
|
|
def create_manim_video(video_data, manim_code, audio_file=None, subtitle_file=None): |
|
logging.info("Starting to create Manim video") |
|
with open("generated_video.py", "w", encoding="utf-8") as f: |
|
f.write(manim_code) |
|
|
|
scene_name = get_scene_name(manim_code) |
|
logging.info(f"Identified scene name: {scene_name}") |
|
|
|
command = ["manim", "-qh", "generated_video.py", scene_name] |
|
logging.info(f"Running Manim with command: {' '.join(command)}") |
|
|
|
|
|
manim_process = subprocess.run(command, check=True, capture_output=True, text=True) |
|
if manim_process.returncode != 0: |
|
logging.error(f"Manim failed with stderr:\n{manim_process.stderr}") |
|
raise subprocess.CalledProcessError( |
|
manim_process.returncode, command, stderr=manim_process.stderr |
|
) |
|
|
|
video_path = os.path.join( |
|
"media", "videos", "generated_video", "1080p60", f"{scene_name}.mp4" |
|
) |
|
if not os.path.exists(video_path): |
|
logging.error(f"No rendered video found at: {video_path}") |
|
raise FileNotFoundError(f"No rendered video found for scene {scene_name}") |
|
|
|
input_video = video_path |
|
final_output = "final_output.mp4" |
|
extended_video_temp = "extended_video.mp4" |
|
|
|
if audio_file and os.path.exists(audio_file): |
|
logging.info(f"Audio file found: {audio_file}") |
|
|
|
video_duration_cmd = [ |
|
"ffprobe", |
|
"-v", |
|
"error", |
|
"-show_entries", |
|
"format=duration", |
|
"-of", |
|
"default=noprint_wrappers=1:nokey=1", |
|
input_video, |
|
] |
|
audio_duration_cmd = [ |
|
"ffprobe", |
|
"-v", |
|
"error", |
|
"-show_entries", |
|
"format=duration", |
|
"-of", |
|
"default=noprint_wrappers=1:nokey=1", |
|
audio_file, |
|
] |
|
|
|
video_duration = float( |
|
subprocess.check_output(video_duration_cmd).decode("utf-8").strip() |
|
) |
|
audio_duration = float( |
|
subprocess.check_output(audio_duration_cmd).decode("utf-8").strip() |
|
) |
|
|
|
logging.info( |
|
f"Video duration: {video_duration}s, Audio duration: {audio_duration}s" |
|
) |
|
|
|
|
|
if audio_duration > video_duration: |
|
logging.info( |
|
"Audio is longer than video, extending video with freeze frame." |
|
) |
|
|
|
extend_cmd = [ |
|
"ffmpeg", |
|
"-y", |
|
"-i", |
|
input_video, |
|
"-vf", |
|
f"tpad=stop_mode=clone:stop_duration={audio_duration - video_duration}", |
|
"-c:v", |
|
"libx264", |
|
extended_video_temp, |
|
] |
|
|
|
logging.info(f"Extending video with command: {' '.join(extend_cmd)}") |
|
subprocess.run(extend_cmd, check=True, capture_output=True, text=True) |
|
input_video = extended_video_temp |
|
|
|
|
|
merge_cmd = ["ffmpeg", "-y", "-i", input_video] |
|
|
|
if audio_file and os.path.exists(audio_file): |
|
merge_cmd.extend(["-i", audio_file]) |
|
|
|
filter_complex = [] |
|
maps = ["-map", "0:v:0"] |
|
if audio_file and os.path.exists(audio_file): |
|
maps.extend(["-map", "1:a:0"]) |
|
|
|
|
|
if subtitle_file and os.path.exists(subtitle_file): |
|
sanitized_path = sanitize_path_for_ffmpeg(os.path.abspath(subtitle_file)) |
|
filter_complex.append(f"ass='{sanitized_path}'") |
|
|
|
if filter_complex: |
|
merge_cmd.extend(["-vf", ",".join(filter_complex)]) |
|
|
|
merge_cmd.extend(maps) |
|
merge_cmd.extend(["-c:v", "libx264", "-c:a", "aac", "-shortest", final_output]) |
|
|
|
logging.info(f"Merging with final command: {' '.join(merge_cmd)}") |
|
subprocess.run(merge_cmd, check=True, capture_output=True, text=True) |
|
|
|
if os.path.exists(extended_video_temp): |
|
os.remove(extended_video_temp) |
|
logging.info("Removed temporary extended video file.") |
|
if os.path.exists("generated_video.py"): |
|
os.remove("generated_video.py") |
|
logging.info("Removed generated_video.py") |
|
|
|
logging.info(f"Final video created at: {final_output}") |
|
return final_output |
|
|