Spaces:
Running
Running
import constants | |
import os | |
from PIL import Image | |
from gradio_client import Client | |
import moviepy.editor as mp | |
from structured_output_extractor import StructuredOutputExtractor | |
from response_schemas import ScenesResponseSchema | |
from typing import List, Dict | |
from natsort import natsorted # Natural sorting for correct numerical order | |
def get_scenes(text_script: str): | |
read_time = calculate_read_time(text_script) | |
prompt = f""" | |
ROLE: Story to Scene Generator | |
Tasks: For the given story | |
1. Read it Completely and Understand the Complete Context | |
2. Rewrite the story in tiny scenes(but without even changing a word) with highly detailed and context aware list of image prompts to visualize each scene | |
3. Never Describe complete scene in a single image prompt use multiple prompts | |
RULE OF THUMB: 12 image prompts / 1 min audio (Note: You will be provided with estimated read Time for each story or script) | |
here is the Estimated Read Time of the complete story: {read_time}\n\n | |
and Here is the Complete Story: {text_script} | |
""" | |
extractor = StructuredOutputExtractor(response_schema=ScenesResponseSchema) | |
result = extractor.extract(prompt) | |
return result.model_dump() # returns dictionary version pydantic model | |
def generate_video_assets(scenes: Dict, language: str, speaker: str, base_path: str = "media") -> str: | |
try: | |
# Ensure the base folder exists | |
if not os.path.exists(base_path): | |
os.makedirs(base_path) | |
# Extract scenes from the input dictionary | |
scenes_list = scenes.get("scenes", []) | |
print(f"Total Scenes: {len(scenes_list)}") | |
# Create a folder for the current video | |
video_folder = os.path.join(base_path, f"video_{len(os.listdir(base_path)) + 1}") | |
if not os.path.exists(video_folder): | |
os.makedirs(video_folder) | |
# Create 'images' and 'audio' folders inside the video folder | |
images_folder = os.path.join(video_folder, "images") | |
audio_folder = os.path.join(video_folder, "audio") | |
os.makedirs(images_folder, exist_ok=True) | |
os.makedirs(audio_folder, exist_ok=True) | |
for scene_count, scene in enumerate(scenes_list): | |
text: str = scene.get("text", "") | |
image_prompts: List[str] = scene.get("image_prompts", []) | |
# Create a folder for the current scene inside the 'images' folder | |
scene_images_folder = os.path.join(images_folder, f"scene_{scene_count + 1}") | |
os.makedirs(scene_images_folder, exist_ok=True) | |
# Generate audio for the scene | |
audio_path = os.path.join(audio_folder, f"scene_{scene_count + 1}.mp3") | |
audio_result = generate_audio(text, language, speaker, path=audio_path) | |
if "error" in audio_result: | |
print(f"Error generating audio for scene {scene_count + 1}: {audio_result['error']}") | |
continue | |
# Generate images for the scene | |
image_paths = [] | |
for count, prompt in enumerate(image_prompts): | |
image_path = os.path.join(scene_images_folder, f"scene_{scene_count + 1}_image_{count + 1}.png") | |
image_result = generate_image(prompt=prompt, path=image_path) | |
if "error" in image_result: | |
print(f"Error generating image {count + 1} for scene {scene_count + 1}: {image_result['error']}") | |
else: | |
image_paths.append(image_path) | |
print(f"Scene: {scene_count + 1}\t No of Images in Scene {scene_count + 1}: {len(image_paths)}") | |
# Return the path of the main video folder | |
return video_folder | |
except Exception as e: | |
print(f"Error during video asset generation: {e}") | |
return {"error": str(e)} | |
def generate_audio(text, language_code, speaker, path='test_audio.mp3'): | |
try: | |
# Initialize the Gradio Client with the hosted model | |
client = Client("habib926653/Multilingual-TTS") | |
# Make the API request | |
result = client.predict( | |
text=text, # Text input for audio generation | |
language_code=language_code, # Language code (e.g., "Urdu") | |
speaker=speaker, # Selected speaker (e.g., "Asad") | |
api_name="/text_to_speech_edge" | |
) | |
# The result is a tuple: (text, audio_file_path) | |
audio_file_path = result[1] # The generated audio file path | |
# Read the audio file as bytes | |
with open(audio_file_path, 'rb') as f: | |
audio_bytes = f.read() | |
# Save the audio bytes to the specified path | |
with open(path, 'wb') as f: | |
f.write(audio_bytes) | |
# Return the result (which includes the file path) | |
return {"audio_file": path} | |
except Exception as e: | |
print(f"Error during audio generation: {e}") | |
return {"error": str(e)} | |
def generate_image(prompt, path='test_image.png'): | |
try: | |
# Initialize the Gradio Client with Hugging Face token | |
client = Client(constants.IMAGE_GENERATION_SPACE_NAME, hf_token=constants.HF_TOKEN) | |
# Make the API request | |
result = client.predict( | |
prompt=prompt, # Text prompt for image generation | |
width=1280, | |
height=720, | |
api_name="/generate_image" | |
) | |
image = Image.open(result) | |
image.save(path) | |
# Return the result (which includes the URL or file path) | |
return result | |
except Exception as e: | |
print(f"Error during image generation: {e}") | |
return {"error": str(e)} | |
def generate_video(video_folder: str, output_filename: str = "final_video.mp4"): | |
audio_folder = os.path.join(video_folder, "audio") | |
images_folder = os.path.join(video_folder, "images") | |
final_clips = [] | |
# Get all scene folders with absolute paths and sorted order | |
scene_folders = [ | |
os.path.join(images_folder, scene) | |
for scene in natsorted(os.listdir(images_folder)) | |
if os.path.isdir(os.path.join(images_folder, scene)) | |
] | |
for scene_path in scene_folders: | |
scene_name = os.path.basename(scene_path) | |
audio_path = os.path.join(audio_folder, f"{scene_name}.mp3") | |
# Ensure audio file exists | |
if not os.path.exists(audio_path): | |
print(f"Warning: Audio file {audio_path} not found. Skipping scene {scene_name}.") | |
continue | |
# Get all image files for the scene (sorted) | |
image_files = natsorted([ | |
os.path.join(scene_path, img) | |
for img in os.listdir(scene_path) | |
if img.lower().endswith(('.png', '.jpg', '.jpeg')) | |
]) | |
if not image_files: | |
print(f"Warning: No images found in {scene_path}. Skipping scene {scene_name}.") | |
continue | |
# Load audio file | |
audio_clip = mp.AudioFileClip(audio_path) | |
# Calculate duration per image | |
duration_per_image = audio_clip.duration / len(image_files) | |
# Create image clips | |
image_clips = [mp.ImageClip(img).set_duration(duration_per_image) for img in image_files] | |
# Concatenate image clips | |
scene_video = mp.concatenate_videoclips(image_clips, method="compose").set_audio(audio_clip) | |
final_clips.append(scene_video) | |
if not final_clips: | |
print("Error: No valid scenes processed.") | |
return None | |
# Concatenate all scenes | |
final_video = mp.concatenate_videoclips(final_clips, method="compose") | |
output_path = os.path.join(video_folder, output_filename) | |
final_video.write_videofile(output_path, fps=24, codec='libx264') | |
return output_path | |
def calculate_read_time(text: str, words_per_minute: int = 155) -> str: | |
""" | |
Calculate how long it will take to read a given text. | |
Args: | |
text (str): The input text to calculate reading time for. | |
words_per_minute (int): Average reading speed in words per minute. Default is 155(an uneducated guess). | |
Returns: | |
str: A string describing the reading time in seconds, minutes, or hours. | |
""" | |
try: | |
# Validate input | |
if not text or not isinstance(text, str): | |
return "Invalid input: Text must be a non-empty string." | |
# Calculate the number of words in the text | |
words = text.split() | |
word_count = len(words) | |
# Calculate total reading time in seconds | |
total_seconds = (word_count / words_per_minute) * 60 | |
# Convert to hours, minutes, and seconds | |
hours = int(total_seconds // 3600) | |
minutes = int((total_seconds % 3600) // 60) | |
seconds = int(total_seconds % 60) | |
# Format the output based on the duration | |
if hours > 0: | |
return f"Reading time: {hours} hour(s), {minutes} minute(s), and {seconds} second(s)." | |
elif minutes > 0: | |
return f"Reading time: {minutes} minute(s) and {seconds} second(s)." | |
else: | |
return f"Reading time: {seconds} second(s)." | |
except Exception as e: | |
return f"An error occurred: {e}" | |
# Example usage: | |
if __name__ == "__main__": | |
short_story = """ | |
In a quiet village, a young girl named Lily discovered a hidden garden. | |
Every flower in the garden glowed with a magical light, revealing secrets of the past. | |
Lily knew she had found something truly extraordinary. | |
""" | |
generate_audio(short_story, "Urdu", "Asad") | |