Text2Video / utils.py
wower99's picture
1. handled sorted scenes issue with natsorted
9c1a326
import constants
import os
from PIL import Image
from gradio_client import Client
import moviepy.editor as mp
from structured_output_extractor import StructuredOutputExtractor
from response_schemas import ScenesResponseSchema
from typing import List, Dict
from natsort import natsorted # Natural sorting for correct numerical order
def get_scenes(text_script: str):
read_time = calculate_read_time(text_script)
prompt = f"""
ROLE: Story to Scene Generator
Tasks: For the given story
1. Read it Completely and Understand the Complete Context
2. Rewrite the story in tiny scenes(but without even changing a word) with highly detailed and context aware list of image prompts to visualize each scene
3. Never Describe complete scene in a single image prompt use multiple prompts
RULE OF THUMB: 12 image prompts / 1 min audio (Note: You will be provided with estimated read Time for each story or script)
here is the Estimated Read Time of the complete story: {read_time}\n\n
and Here is the Complete Story: {text_script}
"""
extractor = StructuredOutputExtractor(response_schema=ScenesResponseSchema)
result = extractor.extract(prompt)
return result.model_dump() # returns dictionary version pydantic model
def generate_video_assets(scenes: Dict, language: str, speaker: str, base_path: str = "media") -> str:
try:
# Ensure the base folder exists
if not os.path.exists(base_path):
os.makedirs(base_path)
# Extract scenes from the input dictionary
scenes_list = scenes.get("scenes", [])
print(f"Total Scenes: {len(scenes_list)}")
# Create a folder for the current video
video_folder = os.path.join(base_path, f"video_{len(os.listdir(base_path)) + 1}")
if not os.path.exists(video_folder):
os.makedirs(video_folder)
# Create 'images' and 'audio' folders inside the video folder
images_folder = os.path.join(video_folder, "images")
audio_folder = os.path.join(video_folder, "audio")
os.makedirs(images_folder, exist_ok=True)
os.makedirs(audio_folder, exist_ok=True)
for scene_count, scene in enumerate(scenes_list):
text: str = scene.get("text", "")
image_prompts: List[str] = scene.get("image_prompts", [])
# Create a folder for the current scene inside the 'images' folder
scene_images_folder = os.path.join(images_folder, f"scene_{scene_count + 1}")
os.makedirs(scene_images_folder, exist_ok=True)
# Generate audio for the scene
audio_path = os.path.join(audio_folder, f"scene_{scene_count + 1}.mp3")
audio_result = generate_audio(text, language, speaker, path=audio_path)
if "error" in audio_result:
print(f"Error generating audio for scene {scene_count + 1}: {audio_result['error']}")
continue
# Generate images for the scene
image_paths = []
for count, prompt in enumerate(image_prompts):
image_path = os.path.join(scene_images_folder, f"scene_{scene_count + 1}_image_{count + 1}.png")
image_result = generate_image(prompt=prompt, path=image_path)
if "error" in image_result:
print(f"Error generating image {count + 1} for scene {scene_count + 1}: {image_result['error']}")
else:
image_paths.append(image_path)
print(f"Scene: {scene_count + 1}\t No of Images in Scene {scene_count + 1}: {len(image_paths)}")
# Return the path of the main video folder
return video_folder
except Exception as e:
print(f"Error during video asset generation: {e}")
return {"error": str(e)}
def generate_audio(text, language_code, speaker, path='test_audio.mp3'):
try:
# Initialize the Gradio Client with the hosted model
client = Client("habib926653/Multilingual-TTS")
# Make the API request
result = client.predict(
text=text, # Text input for audio generation
language_code=language_code, # Language code (e.g., "Urdu")
speaker=speaker, # Selected speaker (e.g., "Asad")
api_name="/text_to_speech_edge"
)
# The result is a tuple: (text, audio_file_path)
audio_file_path = result[1] # The generated audio file path
# Read the audio file as bytes
with open(audio_file_path, 'rb') as f:
audio_bytes = f.read()
# Save the audio bytes to the specified path
with open(path, 'wb') as f:
f.write(audio_bytes)
# Return the result (which includes the file path)
return {"audio_file": path}
except Exception as e:
print(f"Error during audio generation: {e}")
return {"error": str(e)}
def generate_image(prompt, path='test_image.png'):
try:
# Initialize the Gradio Client with Hugging Face token
client = Client(constants.IMAGE_GENERATION_SPACE_NAME, hf_token=constants.HF_TOKEN)
# Make the API request
result = client.predict(
prompt=prompt, # Text prompt for image generation
width=1280,
height=720,
api_name="/generate_image"
)
image = Image.open(result)
image.save(path)
# Return the result (which includes the URL or file path)
return result
except Exception as e:
print(f"Error during image generation: {e}")
return {"error": str(e)}
def generate_video(video_folder: str, output_filename: str = "final_video.mp4"):
audio_folder = os.path.join(video_folder, "audio")
images_folder = os.path.join(video_folder, "images")
final_clips = []
# Get all scene folders with absolute paths and sorted order
scene_folders = [
os.path.join(images_folder, scene)
for scene in natsorted(os.listdir(images_folder))
if os.path.isdir(os.path.join(images_folder, scene))
]
for scene_path in scene_folders:
scene_name = os.path.basename(scene_path)
audio_path = os.path.join(audio_folder, f"{scene_name}.mp3")
# Ensure audio file exists
if not os.path.exists(audio_path):
print(f"Warning: Audio file {audio_path} not found. Skipping scene {scene_name}.")
continue
# Get all image files for the scene (sorted)
image_files = natsorted([
os.path.join(scene_path, img)
for img in os.listdir(scene_path)
if img.lower().endswith(('.png', '.jpg', '.jpeg'))
])
if not image_files:
print(f"Warning: No images found in {scene_path}. Skipping scene {scene_name}.")
continue
# Load audio file
audio_clip = mp.AudioFileClip(audio_path)
# Calculate duration per image
duration_per_image = audio_clip.duration / len(image_files)
# Create image clips
image_clips = [mp.ImageClip(img).set_duration(duration_per_image) for img in image_files]
# Concatenate image clips
scene_video = mp.concatenate_videoclips(image_clips, method="compose").set_audio(audio_clip)
final_clips.append(scene_video)
if not final_clips:
print("Error: No valid scenes processed.")
return None
# Concatenate all scenes
final_video = mp.concatenate_videoclips(final_clips, method="compose")
output_path = os.path.join(video_folder, output_filename)
final_video.write_videofile(output_path, fps=24, codec='libx264')
return output_path
def calculate_read_time(text: str, words_per_minute: int = 155) -> str:
"""
Calculate how long it will take to read a given text.
Args:
text (str): The input text to calculate reading time for.
words_per_minute (int): Average reading speed in words per minute. Default is 155(an uneducated guess).
Returns:
str: A string describing the reading time in seconds, minutes, or hours.
"""
try:
# Validate input
if not text or not isinstance(text, str):
return "Invalid input: Text must be a non-empty string."
# Calculate the number of words in the text
words = text.split()
word_count = len(words)
# Calculate total reading time in seconds
total_seconds = (word_count / words_per_minute) * 60
# Convert to hours, minutes, and seconds
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
# Format the output based on the duration
if hours > 0:
return f"Reading time: {hours} hour(s), {minutes} minute(s), and {seconds} second(s)."
elif minutes > 0:
return f"Reading time: {minutes} minute(s) and {seconds} second(s)."
else:
return f"Reading time: {seconds} second(s)."
except Exception as e:
return f"An error occurred: {e}"
# Example usage:
if __name__ == "__main__":
short_story = """
In a quiet village, a young girl named Lily discovered a hidden garden.
Every flower in the garden glowed with a magical light, revealing secrets of the past.
Lily knew she had found something truly extraordinary.
"""
generate_audio(short_story, "Urdu", "Asad")