Spaces:
Running
Running
File size: 9,472 Bytes
e98f22b 9c1a326 e98f22b b3d9d61 e98f22b ab4d4f6 f7dccf1 ab4d4f6 e98f22b ab4d4f6 b3d9d61 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 e98f22b 9c1a326 b3d9d61 e98f22b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
import constants
import os
from PIL import Image
from gradio_client import Client
import moviepy.editor as mp
from structured_output_extractor import StructuredOutputExtractor
from response_schemas import ScenesResponseSchema
from typing import List, Dict
from natsort import natsorted # Natural sorting for correct numerical order
def get_scenes(text_script: str):
read_time = calculate_read_time(text_script)
prompt = f"""
ROLE: Story to Scene Generator
Tasks: For the given story
1. Read it Completely and Understand the Complete Context
2. Rewrite the story in tiny scenes(but without even changing a word) with highly detailed and context aware list of image prompts to visualize each scene
3. Never Describe complete scene in a single image prompt use multiple prompts
RULE OF THUMB: 12 image prompts / 1 min audio (Note: You will be provided with estimated read Time for each story or script)
here is the Estimated Read Time of the complete story: {read_time}\n\n
and Here is the Complete Story: {text_script}
"""
extractor = StructuredOutputExtractor(response_schema=ScenesResponseSchema)
result = extractor.extract(prompt)
return result.model_dump() # returns dictionary version pydantic model
def generate_video_assets(scenes: Dict, language: str, speaker: str, base_path: str = "media") -> str:
try:
# Ensure the base folder exists
if not os.path.exists(base_path):
os.makedirs(base_path)
# Extract scenes from the input dictionary
scenes_list = scenes.get("scenes", [])
print(f"Total Scenes: {len(scenes_list)}")
# Create a folder for the current video
video_folder = os.path.join(base_path, f"video_{len(os.listdir(base_path)) + 1}")
if not os.path.exists(video_folder):
os.makedirs(video_folder)
# Create 'images' and 'audio' folders inside the video folder
images_folder = os.path.join(video_folder, "images")
audio_folder = os.path.join(video_folder, "audio")
os.makedirs(images_folder, exist_ok=True)
os.makedirs(audio_folder, exist_ok=True)
for scene_count, scene in enumerate(scenes_list):
text: str = scene.get("text", "")
image_prompts: List[str] = scene.get("image_prompts", [])
# Create a folder for the current scene inside the 'images' folder
scene_images_folder = os.path.join(images_folder, f"scene_{scene_count + 1}")
os.makedirs(scene_images_folder, exist_ok=True)
# Generate audio for the scene
audio_path = os.path.join(audio_folder, f"scene_{scene_count + 1}.mp3")
audio_result = generate_audio(text, language, speaker, path=audio_path)
if "error" in audio_result:
print(f"Error generating audio for scene {scene_count + 1}: {audio_result['error']}")
continue
# Generate images for the scene
image_paths = []
for count, prompt in enumerate(image_prompts):
image_path = os.path.join(scene_images_folder, f"scene_{scene_count + 1}_image_{count + 1}.png")
image_result = generate_image(prompt=prompt, path=image_path)
if "error" in image_result:
print(f"Error generating image {count + 1} for scene {scene_count + 1}: {image_result['error']}")
else:
image_paths.append(image_path)
print(f"Scene: {scene_count + 1}\t No of Images in Scene {scene_count + 1}: {len(image_paths)}")
# Return the path of the main video folder
return video_folder
except Exception as e:
print(f"Error during video asset generation: {e}")
return {"error": str(e)}
def generate_audio(text, language_code, speaker, path='test_audio.mp3'):
try:
# Initialize the Gradio Client with the hosted model
client = Client("habib926653/Multilingual-TTS")
# Make the API request
result = client.predict(
text=text, # Text input for audio generation
language_code=language_code, # Language code (e.g., "Urdu")
speaker=speaker, # Selected speaker (e.g., "Asad")
api_name="/text_to_speech_edge"
)
# The result is a tuple: (text, audio_file_path)
audio_file_path = result[1] # The generated audio file path
# Read the audio file as bytes
with open(audio_file_path, 'rb') as f:
audio_bytes = f.read()
# Save the audio bytes to the specified path
with open(path, 'wb') as f:
f.write(audio_bytes)
# Return the result (which includes the file path)
return {"audio_file": path}
except Exception as e:
print(f"Error during audio generation: {e}")
return {"error": str(e)}
def generate_image(prompt, path='test_image.png'):
try:
# Initialize the Gradio Client with Hugging Face token
client = Client(constants.IMAGE_GENERATION_SPACE_NAME, hf_token=constants.HF_TOKEN)
# Make the API request
result = client.predict(
prompt=prompt, # Text prompt for image generation
width=1280,
height=720,
api_name="/generate_image"
)
image = Image.open(result)
image.save(path)
# Return the result (which includes the URL or file path)
return result
except Exception as e:
print(f"Error during image generation: {e}")
return {"error": str(e)}
def generate_video(video_folder: str, output_filename: str = "final_video.mp4"):
audio_folder = os.path.join(video_folder, "audio")
images_folder = os.path.join(video_folder, "images")
final_clips = []
# Get all scene folders with absolute paths and sorted order
scene_folders = [
os.path.join(images_folder, scene)
for scene in natsorted(os.listdir(images_folder))
if os.path.isdir(os.path.join(images_folder, scene))
]
for scene_path in scene_folders:
scene_name = os.path.basename(scene_path)
audio_path = os.path.join(audio_folder, f"{scene_name}.mp3")
# Ensure audio file exists
if not os.path.exists(audio_path):
print(f"Warning: Audio file {audio_path} not found. Skipping scene {scene_name}.")
continue
# Get all image files for the scene (sorted)
image_files = natsorted([
os.path.join(scene_path, img)
for img in os.listdir(scene_path)
if img.lower().endswith(('.png', '.jpg', '.jpeg'))
])
if not image_files:
print(f"Warning: No images found in {scene_path}. Skipping scene {scene_name}.")
continue
# Load audio file
audio_clip = mp.AudioFileClip(audio_path)
# Calculate duration per image
duration_per_image = audio_clip.duration / len(image_files)
# Create image clips
image_clips = [mp.ImageClip(img).set_duration(duration_per_image) for img in image_files]
# Concatenate image clips
scene_video = mp.concatenate_videoclips(image_clips, method="compose").set_audio(audio_clip)
final_clips.append(scene_video)
if not final_clips:
print("Error: No valid scenes processed.")
return None
# Concatenate all scenes
final_video = mp.concatenate_videoclips(final_clips, method="compose")
output_path = os.path.join(video_folder, output_filename)
final_video.write_videofile(output_path, fps=24, codec='libx264')
return output_path
def calculate_read_time(text: str, words_per_minute: int = 155) -> str:
"""
Calculate how long it will take to read a given text.
Args:
text (str): The input text to calculate reading time for.
words_per_minute (int): Average reading speed in words per minute. Default is 155(an uneducated guess).
Returns:
str: A string describing the reading time in seconds, minutes, or hours.
"""
try:
# Validate input
if not text or not isinstance(text, str):
return "Invalid input: Text must be a non-empty string."
# Calculate the number of words in the text
words = text.split()
word_count = len(words)
# Calculate total reading time in seconds
total_seconds = (word_count / words_per_minute) * 60
# Convert to hours, minutes, and seconds
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)
# Format the output based on the duration
if hours > 0:
return f"Reading time: {hours} hour(s), {minutes} minute(s), and {seconds} second(s)."
elif minutes > 0:
return f"Reading time: {minutes} minute(s) and {seconds} second(s)."
else:
return f"Reading time: {seconds} second(s)."
except Exception as e:
return f"An error occurred: {e}"
# Example usage:
if __name__ == "__main__":
short_story = """
In a quiet village, a young girl named Lily discovered a hidden garden.
Every flower in the garden glowed with a magical light, revealing secrets of the past.
Lily knew she had found something truly extraordinary.
"""
generate_audio(short_story, "Urdu", "Asad")
|