import constants import os from PIL import Image from gradio_client import Client import moviepy.editor as mp from structured_output_extractor import StructuredOutputExtractor from response_schemas import ScenesResponseSchema from typing import List, Dict from natsort import natsorted # Natural sorting for correct numerical order def get_scenes(text_script: str): read_time = calculate_read_time(text_script) prompt = f""" ROLE: Story to Scene Generator Tasks: For the given story 1. Read it Completely and Understand the Complete Context 2. Rewrite the story in tiny scenes(but without even changing a word) with highly detailed and context aware list of image prompts to visualize each scene 3. Never Describe complete scene in a single image prompt use multiple prompts RULE OF THUMB: 12 image prompts / 1 min audio (Note: You will be provided with estimated read Time for each story or script) here is the Estimated Read Time of the complete story: {read_time}\n\n and Here is the Complete Story: {text_script} """ extractor = StructuredOutputExtractor(response_schema=ScenesResponseSchema) result = extractor.extract(prompt) return result.model_dump() # returns dictionary version pydantic model def generate_video_assets(scenes: Dict, language: str, speaker: str, base_path: str = "media") -> str: try: # Ensure the base folder exists if not os.path.exists(base_path): os.makedirs(base_path) # Extract scenes from the input dictionary scenes_list = scenes.get("scenes", []) print(f"Total Scenes: {len(scenes_list)}") # Create a folder for the current video video_folder = os.path.join(base_path, f"video_{len(os.listdir(base_path)) + 1}") if not os.path.exists(video_folder): os.makedirs(video_folder) # Create 'images' and 'audio' folders inside the video folder images_folder = os.path.join(video_folder, "images") audio_folder = os.path.join(video_folder, "audio") os.makedirs(images_folder, exist_ok=True) os.makedirs(audio_folder, exist_ok=True) for scene_count, scene in enumerate(scenes_list): text: str = scene.get("text", "") image_prompts: List[str] = scene.get("image_prompts", []) # Create a folder for the current scene inside the 'images' folder scene_images_folder = os.path.join(images_folder, f"scene_{scene_count + 1}") os.makedirs(scene_images_folder, exist_ok=True) # Generate audio for the scene audio_path = os.path.join(audio_folder, f"scene_{scene_count + 1}.mp3") audio_result = generate_audio(text, language, speaker, path=audio_path) if "error" in audio_result: print(f"Error generating audio for scene {scene_count + 1}: {audio_result['error']}") continue # Generate images for the scene image_paths = [] for count, prompt in enumerate(image_prompts): image_path = os.path.join(scene_images_folder, f"scene_{scene_count + 1}_image_{count + 1}.png") image_result = generate_image(prompt=prompt, path=image_path) if "error" in image_result: print(f"Error generating image {count + 1} for scene {scene_count + 1}: {image_result['error']}") else: image_paths.append(image_path) print(f"Scene: {scene_count + 1}\t No of Images in Scene {scene_count + 1}: {len(image_paths)}") # Return the path of the main video folder return video_folder except Exception as e: print(f"Error during video asset generation: {e}") return {"error": str(e)} def generate_audio(text, language_code, speaker, path='test_audio.mp3'): try: # Initialize the Gradio Client with the hosted model client = Client("habib926653/Multilingual-TTS") # Make the API request result = client.predict( text=text, # Text input for audio generation language_code=language_code, # Language code (e.g., "Urdu") speaker=speaker, # Selected speaker (e.g., "Asad") api_name="/text_to_speech_edge" ) # The result is a tuple: (text, audio_file_path) audio_file_path = result[1] # The generated audio file path # Read the audio file as bytes with open(audio_file_path, 'rb') as f: audio_bytes = f.read() # Save the audio bytes to the specified path with open(path, 'wb') as f: f.write(audio_bytes) # Return the result (which includes the file path) return {"audio_file": path} except Exception as e: print(f"Error during audio generation: {e}") return {"error": str(e)} def generate_image(prompt, path='test_image.png'): try: # Initialize the Gradio Client with Hugging Face token client = Client(constants.IMAGE_GENERATION_SPACE_NAME, hf_token=constants.HF_TOKEN) # Make the API request result = client.predict( prompt=prompt, # Text prompt for image generation width=1280, height=720, api_name="/generate_image" ) image = Image.open(result) image.save(path) # Return the result (which includes the URL or file path) return result except Exception as e: print(f"Error during image generation: {e}") return {"error": str(e)} def generate_video(video_folder: str, output_filename: str = "final_video.mp4"): audio_folder = os.path.join(video_folder, "audio") images_folder = os.path.join(video_folder, "images") final_clips = [] # Get all scene folders with absolute paths and sorted order scene_folders = [ os.path.join(images_folder, scene) for scene in natsorted(os.listdir(images_folder)) if os.path.isdir(os.path.join(images_folder, scene)) ] for scene_path in scene_folders: scene_name = os.path.basename(scene_path) audio_path = os.path.join(audio_folder, f"{scene_name}.mp3") # Ensure audio file exists if not os.path.exists(audio_path): print(f"Warning: Audio file {audio_path} not found. Skipping scene {scene_name}.") continue # Get all image files for the scene (sorted) image_files = natsorted([ os.path.join(scene_path, img) for img in os.listdir(scene_path) if img.lower().endswith(('.png', '.jpg', '.jpeg')) ]) if not image_files: print(f"Warning: No images found in {scene_path}. Skipping scene {scene_name}.") continue # Load audio file audio_clip = mp.AudioFileClip(audio_path) # Calculate duration per image duration_per_image = audio_clip.duration / len(image_files) # Create image clips image_clips = [mp.ImageClip(img).set_duration(duration_per_image) for img in image_files] # Concatenate image clips scene_video = mp.concatenate_videoclips(image_clips, method="compose").set_audio(audio_clip) final_clips.append(scene_video) if not final_clips: print("Error: No valid scenes processed.") return None # Concatenate all scenes final_video = mp.concatenate_videoclips(final_clips, method="compose") output_path = os.path.join(video_folder, output_filename) final_video.write_videofile(output_path, fps=24, codec='libx264') return output_path def calculate_read_time(text: str, words_per_minute: int = 155) -> str: """ Calculate how long it will take to read a given text. Args: text (str): The input text to calculate reading time for. words_per_minute (int): Average reading speed in words per minute. Default is 155(an uneducated guess). Returns: str: A string describing the reading time in seconds, minutes, or hours. """ try: # Validate input if not text or not isinstance(text, str): return "Invalid input: Text must be a non-empty string." # Calculate the number of words in the text words = text.split() word_count = len(words) # Calculate total reading time in seconds total_seconds = (word_count / words_per_minute) * 60 # Convert to hours, minutes, and seconds hours = int(total_seconds // 3600) minutes = int((total_seconds % 3600) // 60) seconds = int(total_seconds % 60) # Format the output based on the duration if hours > 0: return f"Reading time: {hours} hour(s), {minutes} minute(s), and {seconds} second(s)." elif minutes > 0: return f"Reading time: {minutes} minute(s) and {seconds} second(s)." else: return f"Reading time: {seconds} second(s)." except Exception as e: return f"An error occurred: {e}" # Example usage: if __name__ == "__main__": short_story = """ In a quiet village, a young girl named Lily discovered a hidden garden. Every flower in the garden glowed with a magical light, revealing secrets of the past. Lily knew she had found something truly extraordinary. """ generate_audio(short_story, "Urdu", "Asad")