Spaces:
Runtime error
Runtime error
import os | |
import google.generativeai as genai | |
from moviepy.editor import * | |
import moviepy.config as mp_config | |
import tempfile | |
import logging | |
import srt | |
import datetime | |
from gtts import gTTS | |
from flask import Flask, request, render_template, send_from_directory, url_for, flash, session, redirect | |
from werkzeug.utils import secure_filename | |
import uuid | |
# --- Configuration --- | |
# Suppress moviepy logs which can be verbose | |
logging.basicConfig(level=logging.INFO) | |
logging.getLogger("moviepy").setLevel(logging.ERROR) | |
# Set a marker to check if ImageMagick is not found | |
IMAGEMAGICK_NOT_FOUND = False | |
try: | |
# This will raise an exception if ImageMagick is not found | |
TextClip("test", font='Arial', fontsize=24, color='white') | |
except Exception as e: | |
if "ImageMagick is not installed" in str(e) or "magick: not found" in str(e): | |
IMAGEMAGICK_NOT_FOUND = True | |
logging.warning("ImageMagick is not installed or not found in the system's PATH. Subtitle generation will be disabled.") | |
logging.warning("Please install ImageMagick and ensure its 'magick' command is accessible from your shell.") | |
else: | |
# Handle other potential startup errors from TextClip | |
logging.error(f"An unexpected error occurred with MoviePy/ImageMagick: {e}") | |
# Configure Gemini API | |
# IMPORTANT: Set your GEMINI_API_KEY as an environment variable for this to work. | |
try: | |
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
except KeyError: | |
# This will be handled in the Flask app to show a message to the user | |
pass | |
# --- Flask App Initialization --- | |
app = Flask(__name__) | |
app.config['SECRET_KEY'] = os.urandom(24) | |
# Create a permanent directory for uploads if it doesn't exist | |
UPLOAD_FOLDER = os.path.join(os.getcwd(), 'user_uploads') | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
# --- Model and Language Configuration --- | |
# Create the Gemini model | |
generation_config = { | |
"temperature": 0.7, | |
"top_p": 0.9, | |
"top_k": 40, | |
"max_output_tokens": 8192, | |
"response_mime_type": "text/plain", | |
} | |
# List of all supported languages for the UI | |
SUPPORTED_LANGUAGES = [ | |
"Auto Detect", "English", "Chinese", "German", "Spanish", "Russian", "Korean", | |
"French", "Japanese", "Portuguese", "Turkish", "Polish", "Catalan", "Dutch", | |
"Arabic", "Swedish", "Italian", "Indonesian", "Hindi", "Finnish", "Vietnamese", | |
"Hebrew", "Ukrainian", "Greek", "Malay", "Czech", "Romanian", "Danish", | |
"Hungarian", "Tamil", "Norwegian", "Thai", "Urdu", "Croatian", "Bulgarian", | |
"Lithuanian", "Latin", "Maori", "Malayalam", "Welsh", "Slovak", "Telugu", | |
"Persian", "Latvian", "Bengali", "Serbian", "Azerbaijani", "Slovenian", | |
"Kannada", "Estonian", "Macedonian", "Breton", "Basque", "Icelandic", | |
"Armenian", "Nepali", "Mongolian", "Bosnian", "Kazakh", "Albanian", | |
"Swahili", "Galician", "Marathi", "Punjabi", "Sinhala", "Khmer", "Shona", | |
"Yoruba", "Somali", "Afrikaans", "Occitan", "Georgian", "Belarusian", | |
"Tajik", "Sindhi", "Gujarati", "Amharic", "Yiddish", "Lao", "Uzbek", | |
"Faroese", "Haitian Creole", "Pashto", "Turkmen", "Nynorsk", "Maltese", | |
"Sanskrit", "Luxembourgish", "Burmese", "Tibetan", "Tagalog", "Malagasy", | |
"Assamese", "Tatar", "Hawaiian", "Lingala", "Hausa", "Bashkir", "Javanese", | |
"Sundanese" | |
] | |
# Language code mapping for Google Text-to-Speech (gTTS) | |
LANGUAGE_CODES = { | |
"English": "en", "Chinese": "zh", "German": "de", "Spanish": "es", "Russian": "ru", | |
"Korean": "ko", "French": "fr", "Japanese": "ja", "Portuguese": "pt", "Turkish": "tr", | |
"Polish": "pl", "Catalan": "ca", "Dutch": "nl", "Arabic": "ar", "Swedish": "sv", | |
"Italian": "it", "Indonesian": "id", "Hindi": "hi", "Finnish": "fi", "Vietnamese": "vi", | |
"Hebrew": "iw", "Ukrainian": "uk", "Greek": "el", "Malay": "ms", "Czech": "cs", | |
"Romanian": "ro", "Danish": "da", "Hungarian": "hu", "Tamil": "ta", "Norwegian": "no", | |
"Thai": "th", "Urdu": "ur", "Croatian": "hr", "Bulgarian": "bg", "Lithuanian": "lt", | |
"Latin": "la", "Malayalam": "ml", "Welsh": "cy", "Slovak": "sk", "Telugu": "te", | |
"Persian": "fa", "Latvian": "lv", "Bengali": "bn", "Serbian": "sr", "Slovenian": "sl", | |
"Kannada": "kn", "Estonian": "et", "Macedonian": "mk", "Armenian": "hy", "Nepali": "ne", | |
"Mongolian": "mn", "Bosnian": "bs", "Kazakh": "kk", "Albanian": "sq", "Swahili": "sw", | |
"Gujarati": "gu", "Lao": "lo", "Uzbek": "uz", "Maltese": "mt", "Burmese": "my", | |
"Tagalog": "tl", "Javanese": "jw", "Sundanese": "su", "Afrikaans": "af" | |
} | |
# --- Core Processing Functions --- | |
def get_gemini_model(): | |
"""Initializes and returns the Gemini model, checking for API key.""" | |
if "GEMINI_API_KEY" not in os.environ: | |
return None | |
return genai.GenerativeModel( | |
model_name="gemini-1.5-flash", | |
generation_config=generation_config, | |
) | |
def extract_audio_from_video(video_path, audio_path): | |
"""Extract audio from a video file into a WAV file.""" | |
try: | |
with VideoFileClip(video_path) as video: | |
video.audio.write_audiofile(audio_path, fps=16000, logger=None) | |
return audio_path | |
except Exception as e: | |
logging.error(f"Error extracting audio: {e}") | |
return None | |
def transcribe_audio_with_gemini(audio_path, source_language): | |
"""Transcribe audio using Gemini, requesting output in SRT format.""" | |
model = get_gemini_model() | |
if not model: | |
return None | |
try: | |
logging.info(f"Uploading audio file: {audio_path}") | |
audio_file = genai.upload_file(path=audio_path) | |
language_prompt = f"in {source_language}" if source_language != "Auto Detect" else "" | |
prompt = f""" | |
You are a professional transcriber. Transcribe this audio accurately and verbatim {language_prompt}. | |
Your output MUST be in the SRT (SubRip) format. | |
Example: | |
1 | |
00:00:01,234 --> 00:00:05,678 | |
This is the first sentence. | |
2 | |
00:00:06,123 --> 00:00:09,456 | |
This is the second sentence. | |
Ensure timestamps are precise. Respond ONLY with the transcription in the SRT format. | |
Do not add explanations, notes, or any other text outside of the valid SRT content. | |
""" | |
logging.info("Sending transcription request to Gemini...") | |
response = model.generate_content([prompt, audio_file], request_options={"timeout": 600}) | |
genai.delete_file(audio_file.name) # Clean up the uploaded file | |
logging.info("Transcription received from Gemini.") | |
return response.text.strip() | |
except Exception as e: | |
logging.error(f"Error during Gemini transcription: {e}") | |
return None | |
def translate_srt(srt_text, target_language): | |
"""Translate an SRT file using Gemini while preserving its structure.""" | |
model = get_gemini_model() | |
if not model: | |
return None | |
try: | |
prompt = f""" | |
Translate the following SRT subtitles into {target_language}. | |
Preserve the SRT format perfectly (index numbers, timestamps, and structure). | |
Translate only the subtitle text itself. | |
Your output must be only the translated and valid SRT file content. | |
Do not add any explanations or extra text. | |
SRT Content to Translate: | |
{srt_text} | |
""" | |
logging.info(f"Sending translation request to Gemini for {target_language}...") | |
response = model.generate_content(prompt) | |
logging.info("Translation received.") | |
return response.text.strip() | |
except Exception as e: | |
logging.error(f"Error during translation: {e}") | |
return None | |
def generate_tts_audio(srt_text, language, tts_audio_path): | |
"""Generate a single TTS audio file from the text content of an SRT file.""" | |
try: | |
subtitles = list(srt.parse(srt_text)) | |
full_text = " ".join([sub.content.replace('\n', ' ') for sub in subtitles]) | |
if not full_text: | |
return None | |
lang_code = LANGUAGE_CODES.get(language) | |
if not lang_code: | |
flash(f"Language '{language}' not supported for TTS, defaulting to English.", "warning") | |
lang_code = "en" | |
logging.info(f"Generating TTS audio in '{language}' ({lang_code})...") | |
tts = gTTS(text=full_text, lang=lang_code, slow=False) | |
tts.save(tts_audio_path) | |
logging.info(f"TTS audio saved to {tts_audio_path}") | |
return tts_audio_path | |
except Exception as e: | |
logging.error(f"Error generating TTS audio: {e}") | |
return None | |
def create_final_video(original_video_path, srt_text, new_audio_path, output_path): | |
""" | |
Creates the final video. | |
- If srt_text is provided, subtitles are burned in. | |
- If new_audio_path is provided, the original audio is replaced. | |
""" | |
try: | |
logging.info("Creating final video...") | |
original_clip = VideoFileClip(original_video_path) | |
final_clip = original_clip | |
# If new audio is provided, replace the original audio track | |
if new_audio_path: | |
tts_audio_clip = AudioFileClip(new_audio_path) | |
# If the new audio is shorter than the video, loop it. | |
if tts_audio_clip.duration < original_clip.duration: | |
tts_audio_clip = tts_audio_clip.fx(vfx.loop, duration=original_clip.duration) | |
final_clip = original_clip.set_audio(tts_audio_clip) | |
# If subtitle text is provided, burn it into the video | |
if srt_text and not IMAGEMAGICK_NOT_FOUND: | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.srt', delete=False, encoding='utf-8') as temp_srt: | |
temp_srt.write(srt_text) | |
srt_filepath = temp_srt.name | |
# Subtitle styling | |
generator = lambda txt: TextClip( | |
txt, font='Arial-Bold', fontsize=24, color='white', | |
stroke_color='black', stroke_width=1, method='caption', | |
size=(final_clip.w * 0.8, None) # Subtitles take 80% of video width | |
) | |
subtitles_clip = SubtitlesClip(srt_filepath, generator) | |
# Composite the video with the subtitles | |
final_video = CompositeVideoClip([final_clip, subtitles_clip.set_position(('center', 'bottom'))]) | |
os.remove(srt_filepath) # Clean up temp srt file | |
else: | |
final_video = final_clip | |
# Write the final video file | |
final_video.write_videofile(output_path, codec='libx264', audio_codec='aac', threads=4, logger=None) | |
# Close all clips to release memory | |
if 'tts_audio_clip' in locals() and tts_audio_clip: | |
tts_audio_clip.close() | |
final_video.close() | |
original_clip.close() | |
logging.info(f"Final video saved to {output_path}") | |
return output_path | |
except Exception as e: | |
logging.error(f"Error creating final video: {e}") | |
return None | |
# --- Flask Routes --- | |
def index(): | |
"""Render the main page. Clear session for a fresh start.""" | |
session.clear() | |
api_key_status = "GEMINI_API_KEY" in os.environ | |
return render_template('index.html', | |
supported_languages=SUPPORTED_LANGUAGES, | |
imagemagick_missing=IMAGEMAGICK_NOT_FOUND, | |
api_key_set=api_key_status) | |
def process(): | |
"""Handle the video processing request from the form.""" | |
if "GEMINI_API_KEY" not in os.environ: | |
flash('Your GEMINI_API_KEY environment variable is not set. Please set it to use the application.', 'error') | |
return redirect(url_for('index')) | |
if 'video' not in request.files or request.files['video'].filename == '': | |
flash('No video file selected. Please upload a video.', 'error') | |
return redirect(url_for('index')) | |
video_file = request.files['video'] | |
# --- Get form options --- | |
source_language = request.form.get('source_language', 'Auto Detect') | |
translate_to = request.form.get('translate_to', 'None') | |
add_tts = 'add_tts' in request.form | |
add_subtitles = 'add_subtitles' in request.form | |
if IMAGEMAGICK_NOT_FOUND and add_subtitles: | |
flash("Cannot add subtitles because ImageMagick is not installed. The video will be processed without them.", "warning") | |
add_subtitles = False | |
# --- Setup a unique session directory for this request --- | |
session_id = str(uuid.uuid4()) | |
session['session_id'] = session_id | |
session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
os.makedirs(session_dir, exist_ok=True) | |
filename = secure_filename(video_file.filename) | |
original_video_path = os.path.join(session_dir, filename) | |
video_file.save(original_video_path) | |
results = {} | |
# 1. Extract Audio | |
audio_path = os.path.join(session_dir, "extracted_audio.wav") | |
if not extract_audio_from_video(original_video_path, audio_path): | |
flash('Failed to extract audio from the video. The file might be corrupted or in an unsupported format.', 'error') | |
return redirect(url_for('index')) | |
# 2. Transcribe Audio | |
original_srt_text = transcribe_audio_with_gemini(audio_path, source_language) | |
os.remove(audio_path) # Clean up extracted audio immediately | |
if not original_srt_text: | |
flash('Failed to transcribe the audio. The API call might have failed or the audio is silent.', 'error') | |
return redirect(url_for('index')) | |
original_srt_path = os.path.join(session_dir, "original_subtitles.srt") | |
with open(original_srt_path, "w", encoding="utf-8") as f: | |
f.write(original_srt_text) | |
results['original_srt_file'] = "original_subtitles.srt" | |
srt_for_final_video = None | |
tts_audio_path = None | |
final_srt_text = original_srt_text | |
# 3. Translate Subtitles (if requested) | |
if translate_to != "None": | |
translated_srt_text = translate_srt(original_srt_text, translate_to) | |
if translated_srt_text: | |
translated_srt_path = os.path.join(session_dir, "translated_subtitles.srt") | |
with open(translated_srt_path, "w", encoding="utf-8") as f: | |
f.write(translated_srt_text) | |
results['translated_srt_file'] = "translated_subtitles.srt" | |
final_srt_text = translated_srt_text # Use translated text for subsequent steps | |
else: | |
flash(f'Failed to translate subtitles to {translate_to}. Using original subtitles.', 'warning') | |
# 4. Generate TTS Audio (if requested) | |
if add_tts: | |
tts_lang = translate_to if translate_to != "None" else source_language | |
if tts_lang == 'Auto Detect': | |
flash('TTS language cannot be "Auto Detect". Defaulting to English.', 'warning') | |
tts_lang = 'English' | |
tts_audio_path_out = os.path.join(session_dir, "tts_audio.mp3") | |
tts_audio_path = generate_tts_audio(final_srt_text, tts_lang, tts_audio_path_out) | |
if tts_audio_path: | |
results['tts_audio_file'] = "tts_audio.mp3" | |
else: | |
flash('Failed to generate Text-to-Speech audio.', 'warning') | |
tts_audio_path = None # Ensure it's None if generation failed | |
# 5. Determine which subtitles to burn into the video (if requested) | |
if add_subtitles: | |
srt_for_final_video = final_srt_text | |
# 6. Create the final output video if any processing was requested | |
if add_subtitles or add_tts: | |
output_video_path = os.path.join(session_dir, "final_video.mp4") | |
if create_final_video(original_video_path, srt_for_final_video, tts_audio_path, output_video_path): | |
results['output_video_file'] = "final_video.mp4" | |
else: | |
flash('Failed to create the final processed video.', 'error') | |
api_key_status = "GEMINI_API_KEY" in os.environ | |
return render_template('index.html', | |
supported_languages=SUPPORTED_LANGUAGES, | |
results=results, | |
session_id=session_id, | |
imagemagick_missing=IMAGEMAGICK_NOT_FOUND, | |
api_key_set=api_key_status) | |
def download_file(session_id, filename): | |
"""Serve files from the session-specific directory for download.""" | |
directory = os.path.join(app.config['UPLOAD_FOLDER'], session_id) | |
return send_from_directory(directory, filename, as_attachment=True) | |
# --- Run the App --- | |
if __name__ == '__main__': | |
app.run(host="0.0.0.0", port=7860) | |