import os import google.generativeai as genai from moviepy.editor import * import moviepy.config as mp_config import tempfile import logging import srt import datetime from gtts import gTTS from flask import Flask, request, render_template, send_from_directory, url_for, flash, session, redirect from werkzeug.utils import secure_filename import uuid # --- Configuration --- # Suppress moviepy logs which can be verbose logging.basicConfig(level=logging.INFO) logging.getLogger("moviepy").setLevel(logging.ERROR) # Set a marker to check if ImageMagick is not found IMAGEMAGICK_NOT_FOUND = False try: # This will raise an exception if ImageMagick is not found TextClip("test", font='Arial', fontsize=24, color='white') except Exception as e: if "ImageMagick is not installed" in str(e) or "magick: not found" in str(e): IMAGEMAGICK_NOT_FOUND = True logging.warning("ImageMagick is not installed or not found in the system's PATH. Subtitle generation will be disabled.") logging.warning("Please install ImageMagick and ensure its 'magick' command is accessible from your shell.") else: # Handle other potential startup errors from TextClip logging.error(f"An unexpected error occurred with MoviePy/ImageMagick: {e}") # Configure Gemini API # IMPORTANT: Set your GEMINI_API_KEY as an environment variable for this to work. try: genai.configure(api_key=os.environ["GEMINI_API_KEY"]) except KeyError: # This will be handled in the Flask app to show a message to the user pass # --- Flask App Initialization --- app = Flask(__name__) app.config['SECRET_KEY'] = os.urandom(24) # Create a permanent directory for uploads if it doesn't exist UPLOAD_FOLDER = os.path.join(os.getcwd(), 'user_uploads') os.makedirs(UPLOAD_FOLDER, exist_ok=True) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # --- Model and Language Configuration --- # Create the Gemini model generation_config = { "temperature": 0.7, "top_p": 0.9, "top_k": 40, "max_output_tokens": 8192, "response_mime_type": "text/plain", } # List of all supported languages for the UI SUPPORTED_LANGUAGES = [ "Auto Detect", "English", "Chinese", "German", "Spanish", "Russian", "Korean", "French", "Japanese", "Portuguese", "Turkish", "Polish", "Catalan", "Dutch", "Arabic", "Swedish", "Italian", "Indonesian", "Hindi", "Finnish", "Vietnamese", "Hebrew", "Ukrainian", "Greek", "Malay", "Czech", "Romanian", "Danish", "Hungarian", "Tamil", "Norwegian", "Thai", "Urdu", "Croatian", "Bulgarian", "Lithuanian", "Latin", "Maori", "Malayalam", "Welsh", "Slovak", "Telugu", "Persian", "Latvian", "Bengali", "Serbian", "Azerbaijani", "Slovenian", "Kannada", "Estonian", "Macedonian", "Breton", "Basque", "Icelandic", "Armenian", "Nepali", "Mongolian", "Bosnian", "Kazakh", "Albanian", "Swahili", "Galician", "Marathi", "Punjabi", "Sinhala", "Khmer", "Shona", "Yoruba", "Somali", "Afrikaans", "Occitan", "Georgian", "Belarusian", "Tajik", "Sindhi", "Gujarati", "Amharic", "Yiddish", "Lao", "Uzbek", "Faroese", "Haitian Creole", "Pashto", "Turkmen", "Nynorsk", "Maltese", "Sanskrit", "Luxembourgish", "Burmese", "Tibetan", "Tagalog", "Malagasy", "Assamese", "Tatar", "Hawaiian", "Lingala", "Hausa", "Bashkir", "Javanese", "Sundanese" ] # Language code mapping for Google Text-to-Speech (gTTS) LANGUAGE_CODES = { "English": "en", "Chinese": "zh", "German": "de", "Spanish": "es", "Russian": "ru", "Korean": "ko", "French": "fr", "Japanese": "ja", "Portuguese": "pt", "Turkish": "tr", "Polish": "pl", "Catalan": "ca", "Dutch": "nl", "Arabic": "ar", "Swedish": "sv", "Italian": "it", "Indonesian": "id", "Hindi": "hi", "Finnish": "fi", "Vietnamese": "vi", "Hebrew": "iw", "Ukrainian": "uk", "Greek": "el", "Malay": "ms", "Czech": "cs", "Romanian": "ro", "Danish": "da", "Hungarian": "hu", "Tamil": "ta", "Norwegian": "no", "Thai": "th", "Urdu": "ur", "Croatian": "hr", "Bulgarian": "bg", "Lithuanian": "lt", "Latin": "la", "Malayalam": "ml", "Welsh": "cy", "Slovak": "sk", "Telugu": "te", "Persian": "fa", "Latvian": "lv", "Bengali": "bn", "Serbian": "sr", "Slovenian": "sl", "Kannada": "kn", "Estonian": "et", "Macedonian": "mk", "Armenian": "hy", "Nepali": "ne", "Mongolian": "mn", "Bosnian": "bs", "Kazakh": "kk", "Albanian": "sq", "Swahili": "sw", "Gujarati": "gu", "Lao": "lo", "Uzbek": "uz", "Maltese": "mt", "Burmese": "my", "Tagalog": "tl", "Javanese": "jw", "Sundanese": "su", "Afrikaans": "af" } # --- Core Processing Functions --- def get_gemini_model(): """Initializes and returns the Gemini model, checking for API key.""" if "GEMINI_API_KEY" not in os.environ: return None return genai.GenerativeModel( model_name="gemini-1.5-flash", generation_config=generation_config, ) def extract_audio_from_video(video_path, audio_path): """Extract audio from a video file into a WAV file.""" try: with VideoFileClip(video_path) as video: video.audio.write_audiofile(audio_path, fps=16000, logger=None) return audio_path except Exception as e: logging.error(f"Error extracting audio: {e}") return None def transcribe_audio_with_gemini(audio_path, source_language): """Transcribe audio using Gemini, requesting output in SRT format.""" model = get_gemini_model() if not model: return None try: logging.info(f"Uploading audio file: {audio_path}") audio_file = genai.upload_file(path=audio_path) language_prompt = f"in {source_language}" if source_language != "Auto Detect" else "" prompt = f""" You are a professional transcriber. Transcribe this audio accurately and verbatim {language_prompt}. Your output MUST be in the SRT (SubRip) format. Example: 1 00:00:01,234 --> 00:00:05,678 This is the first sentence. 2 00:00:06,123 --> 00:00:09,456 This is the second sentence. Ensure timestamps are precise. Respond ONLY with the transcription in the SRT format. Do not add explanations, notes, or any other text outside of the valid SRT content. """ logging.info("Sending transcription request to Gemini...") response = model.generate_content([prompt, audio_file], request_options={"timeout": 600}) genai.delete_file(audio_file.name) # Clean up the uploaded file logging.info("Transcription received from Gemini.") return response.text.strip() except Exception as e: logging.error(f"Error during Gemini transcription: {e}") return None def translate_srt(srt_text, target_language): """Translate an SRT file using Gemini while preserving its structure.""" model = get_gemini_model() if not model: return None try: prompt = f""" Translate the following SRT subtitles into {target_language}. Preserve the SRT format perfectly (index numbers, timestamps, and structure). Translate only the subtitle text itself. Your output must be only the translated and valid SRT file content. Do not add any explanations or extra text. SRT Content to Translate: {srt_text} """ logging.info(f"Sending translation request to Gemini for {target_language}...") response = model.generate_content(prompt) logging.info("Translation received.") return response.text.strip() except Exception as e: logging.error(f"Error during translation: {e}") return None def generate_tts_audio(srt_text, language, tts_audio_path): """Generate a single TTS audio file from the text content of an SRT file.""" try: subtitles = list(srt.parse(srt_text)) full_text = " ".join([sub.content.replace('\n', ' ') for sub in subtitles]) if not full_text: return None lang_code = LANGUAGE_CODES.get(language) if not lang_code: flash(f"Language '{language}' not supported for TTS, defaulting to English.", "warning") lang_code = "en" logging.info(f"Generating TTS audio in '{language}' ({lang_code})...") tts = gTTS(text=full_text, lang=lang_code, slow=False) tts.save(tts_audio_path) logging.info(f"TTS audio saved to {tts_audio_path}") return tts_audio_path except Exception as e: logging.error(f"Error generating TTS audio: {e}") return None def create_final_video(original_video_path, srt_text, new_audio_path, output_path): """ Creates the final video. - If srt_text is provided, subtitles are burned in. - If new_audio_path is provided, the original audio is replaced. """ try: logging.info("Creating final video...") original_clip = VideoFileClip(original_video_path) final_clip = original_clip # If new audio is provided, replace the original audio track if new_audio_path: tts_audio_clip = AudioFileClip(new_audio_path) # If the new audio is shorter than the video, loop it. if tts_audio_clip.duration < original_clip.duration: tts_audio_clip = tts_audio_clip.fx(vfx.loop, duration=original_clip.duration) final_clip = original_clip.set_audio(tts_audio_clip) # If subtitle text is provided, burn it into the video if srt_text and not IMAGEMAGICK_NOT_FOUND: with tempfile.NamedTemporaryFile(mode='w', suffix='.srt', delete=False, encoding='utf-8') as temp_srt: temp_srt.write(srt_text) srt_filepath = temp_srt.name # Subtitle styling generator = lambda txt: TextClip( txt, font='Arial-Bold', fontsize=24, color='white', stroke_color='black', stroke_width=1, method='caption', size=(final_clip.w * 0.8, None) # Subtitles take 80% of video width ) subtitles_clip = SubtitlesClip(srt_filepath, generator) # Composite the video with the subtitles final_video = CompositeVideoClip([final_clip, subtitles_clip.set_position(('center', 'bottom'))]) os.remove(srt_filepath) # Clean up temp srt file else: final_video = final_clip # Write the final video file final_video.write_videofile(output_path, codec='libx264', audio_codec='aac', threads=4, logger=None) # Close all clips to release memory if 'tts_audio_clip' in locals() and tts_audio_clip: tts_audio_clip.close() final_video.close() original_clip.close() logging.info(f"Final video saved to {output_path}") return output_path except Exception as e: logging.error(f"Error creating final video: {e}") return None # --- Flask Routes --- @app.route('/') def index(): """Render the main page. Clear session for a fresh start.""" session.clear() api_key_status = "GEMINI_API_KEY" in os.environ return render_template('index.html', supported_languages=SUPPORTED_LANGUAGES, imagemagick_missing=IMAGEMAGICK_NOT_FOUND, api_key_set=api_key_status) @app.route('/process', methods=['POST']) def process(): """Handle the video processing request from the form.""" if "GEMINI_API_KEY" not in os.environ: flash('Your GEMINI_API_KEY environment variable is not set. Please set it to use the application.', 'error') return redirect(url_for('index')) if 'video' not in request.files or request.files['video'].filename == '': flash('No video file selected. Please upload a video.', 'error') return redirect(url_for('index')) video_file = request.files['video'] # --- Get form options --- source_language = request.form.get('source_language', 'Auto Detect') translate_to = request.form.get('translate_to', 'None') add_tts = 'add_tts' in request.form add_subtitles = 'add_subtitles' in request.form if IMAGEMAGICK_NOT_FOUND and add_subtitles: flash("Cannot add subtitles because ImageMagick is not installed. The video will be processed without them.", "warning") add_subtitles = False # --- Setup a unique session directory for this request --- session_id = str(uuid.uuid4()) session['session_id'] = session_id session_dir = os.path.join(app.config['UPLOAD_FOLDER'], session_id) os.makedirs(session_dir, exist_ok=True) filename = secure_filename(video_file.filename) original_video_path = os.path.join(session_dir, filename) video_file.save(original_video_path) results = {} # 1. Extract Audio audio_path = os.path.join(session_dir, "extracted_audio.wav") if not extract_audio_from_video(original_video_path, audio_path): flash('Failed to extract audio from the video. The file might be corrupted or in an unsupported format.', 'error') return redirect(url_for('index')) # 2. Transcribe Audio original_srt_text = transcribe_audio_with_gemini(audio_path, source_language) os.remove(audio_path) # Clean up extracted audio immediately if not original_srt_text: flash('Failed to transcribe the audio. The API call might have failed or the audio is silent.', 'error') return redirect(url_for('index')) original_srt_path = os.path.join(session_dir, "original_subtitles.srt") with open(original_srt_path, "w", encoding="utf-8") as f: f.write(original_srt_text) results['original_srt_file'] = "original_subtitles.srt" srt_for_final_video = None tts_audio_path = None final_srt_text = original_srt_text # 3. Translate Subtitles (if requested) if translate_to != "None": translated_srt_text = translate_srt(original_srt_text, translate_to) if translated_srt_text: translated_srt_path = os.path.join(session_dir, "translated_subtitles.srt") with open(translated_srt_path, "w", encoding="utf-8") as f: f.write(translated_srt_text) results['translated_srt_file'] = "translated_subtitles.srt" final_srt_text = translated_srt_text # Use translated text for subsequent steps else: flash(f'Failed to translate subtitles to {translate_to}. Using original subtitles.', 'warning') # 4. Generate TTS Audio (if requested) if add_tts: tts_lang = translate_to if translate_to != "None" else source_language if tts_lang == 'Auto Detect': flash('TTS language cannot be "Auto Detect". Defaulting to English.', 'warning') tts_lang = 'English' tts_audio_path_out = os.path.join(session_dir, "tts_audio.mp3") tts_audio_path = generate_tts_audio(final_srt_text, tts_lang, tts_audio_path_out) if tts_audio_path: results['tts_audio_file'] = "tts_audio.mp3" else: flash('Failed to generate Text-to-Speech audio.', 'warning') tts_audio_path = None # Ensure it's None if generation failed # 5. Determine which subtitles to burn into the video (if requested) if add_subtitles: srt_for_final_video = final_srt_text # 6. Create the final output video if any processing was requested if add_subtitles or add_tts: output_video_path = os.path.join(session_dir, "final_video.mp4") if create_final_video(original_video_path, srt_for_final_video, tts_audio_path, output_video_path): results['output_video_file'] = "final_video.mp4" else: flash('Failed to create the final processed video.', 'error') api_key_status = "GEMINI_API_KEY" in os.environ return render_template('index.html', supported_languages=SUPPORTED_LANGUAGES, results=results, session_id=session_id, imagemagick_missing=IMAGEMAGICK_NOT_FOUND, api_key_set=api_key_status) @app.route('/download//') def download_file(session_id, filename): """Serve files from the session-specific directory for download.""" directory = os.path.join(app.config['UPLOAD_FOLDER'], session_id) return send_from_directory(directory, filename, as_attachment=True) # --- Run the App --- if __name__ == '__main__': app.run(host="0.0.0.0", port=7860)