KingNish's picture
Update app.py
41c24bd verified
import gradio as gr
import torch
import spaces
import numpy as np
import random
import os
import yaml
import requests
import shutil
from pathlib import Path
import imageio
import tempfile
from PIL import Image
from huggingface_hub import hf_hub_download
from inference import (
create_ltx_video_pipeline,
create_latent_upsampler,
load_image_to_tensor_with_resize_and_crop,
seed_everething,
get_device,
calculate_padding,
load_media_file
)
from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXMultiScalePipeline, LTXVideoPipeline
from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy
# Configuration
config_file_path = "configs/ltxv-13b-0.9.7-distilled.yaml"
with open(config_file_path, "r") as file:
PIPELINE_CONFIG_YAML = yaml.safe_load(file)
LTX_REPO = "Lightricks/LTX-Video"
MAX_IMAGE_SIZE = 1024
MAX_NUM_FRAMES = 257
FPS = 24.0
# Default values
DEFAULT_NEGATIVE_PROMPT = "worst quality, inconsistent motion, blurry, jittery, distorted"
DEFAULT_GUIDANCE_SCALE = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("guidance_scale", 1.0)
DEFAULT_SEED = 42
DEFAULT_IMPROVE_TEXTURE = True
TARGET_FIXED_SIDE = 768
# Global variables for loaded models
pipeline_instance = None
latent_upsampler_instance = None
models_dir = "downloaded_models_gradio_cpu_init"
Path(models_dir).mkdir(parents=True, exist_ok=True)
def video_to_url(video_path):
"""
Uploads a video to a temporary hosting service and returns its URL.
"""
try:
url = 'https://uguu.se/upload'
with open(video_path, 'rb') as f:
files = {'files[]': (os.path.basename(video_path), f)}
response = requests.post(url, files=files)
response_json = response.json()
return response_json['files'][0]['url']
except FileNotFoundError:
return "Error: File not found. Please check the video path."
except Exception as e:
return f"An error occurred: {e}"
def calculate_new_dimensions(orig_w, orig_h):
"""
Calculates new dimensions for height and width based on original media dimensions.
Rules:
1. If any dimension > 1024, resize so max dimension is 1024, maintaining aspect ratio.
2. Then ensure both dimensions are multiples of 32 by rounding to the nearest multiple.
3. Ensure dimensions are within [256, 1024].
"""
if orig_w == 0 or orig_h == 0:
return TARGET_FIXED_SIDE, TARGET_FIXED_SIDE
# Step 1: Handle dimensions > 1024
new_w, new_h = orig_w, orig_h
if max(orig_w, orig_h) > 1024:
max_dim = max(orig_w, orig_h)
scale = 1024 / max_dim
new_w = int(orig_w * scale)
new_h = int(orig_h * scale)
# Step 2: Round to nearest multiples of 32
def round_to_multiple(x, multiple=32):
return round(x / multiple) * multiple
new_w = round_to_multiple(new_w)
new_h = round_to_multiple(new_h)
# Step 3: Ensure within bounds
new_w = max(256, min(new_w, MAX_IMAGE_SIZE))
new_h = max(256, min(new_h, MAX_IMAGE_SIZE))
return new_h, new_w
def resize_and_squash_image(image_path, target_width, target_height):
"""
Resize and potentially squash/stretch an image to the exact target dimensions.
This ensures no cropping will be needed later.
"""
img = Image.open(image_path)
# Resize to exact dimensions, possibly distorting aspect ratio
img = img.resize((target_width, target_height), Image.LANCZOS)
# Save to temporary file
temp_path = os.path.join(tempfile.gettempdir(), f"resized_{os.path.basename(image_path)}")
img.save(temp_path)
return temp_path
def initialize_models():
"""Initialize and download models"""
global pipeline_instance, latent_upsampler_instance
print("Downloading models (if not present)...")
distilled_model_actual_path = hf_hub_download(
repo_id=LTX_REPO,
filename=PIPELINE_CONFIG_YAML["checkpoint_path"],
local_dir=models_dir,
local_dir_use_symlinks=False
)
PIPELINE_CONFIG_YAML["checkpoint_path"] = distilled_model_actual_path
spatial_upscaler_actual_path = hf_hub_download(
repo_id=LTX_REPO,
filename=PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"],
local_dir=models_dir,
local_dir_use_symlinks=False
)
PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"] = spatial_upscaler_actual_path
print("Creating LTX Video pipeline on CPU...")
pipeline_instance = create_ltx_video_pipeline(
ckpt_path=PIPELINE_CONFIG_YAML["checkpoint_path"],
precision=PIPELINE_CONFIG_YAML["precision"],
text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"],
sampler=PIPELINE_CONFIG_YAML["sampler"],
device="cpu",
enhance_prompt=False,
prompt_enhancer_image_caption_model_name_or_path=PIPELINE_CONFIG_YAML["prompt_enhancer_image_caption_model_name_or_path"],
prompt_enhancer_llm_model_name_or_path=PIPELINE_CONFIG_YAML["prompt_enhancer_llm_model_name_or_path"],
)
if PIPELINE_CONFIG_YAML.get("spatial_upscaler_model_path"):
print("Creating latent upsampler on CPU...")
latent_upsampler_instance = create_latent_upsampler(
PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"],
device="cpu"
)
target_inference_device = "cuda"
print(f"Target inference device: {target_inference_device}")
pipeline_instance.to(target_inference_device)
if latent_upsampler_instance:
latent_upsampler_instance.to(target_inference_device)
@spaces.GPU(duration=45)
def generate(prompt, input_image_url=None, final_image_url=None, duration_ui=2, progress=gr.Progress(track_tqdm=True)):
"""Generate video from image(s) and prompt"""
# Validate input - at least one image must be provided
if input_image_url is None and final_image_url is None:
raise gr.Error("Please provide at least one input image (either first frame or last frame)")
input_image_filepath = input_image_url
final_image_filepath = final_image_url
# Set default values
negative_prompt = DEFAULT_NEGATIVE_PROMPT
seed_ui = DEFAULT_SEED
randomize_seed = True
ui_guidance_scale = DEFAULT_GUIDANCE_SCALE
improve_texture_flag = DEFAULT_IMPROVE_TEXTURE
if randomize_seed:
seed_ui = random.randint(0, 2**32 - 1)
seed_everething(int(seed_ui))
# Calculate target frames
target_frames_ideal = duration_ui * FPS
target_frames_rounded = max(1, round(target_frames_ideal))
n_val = round((float(target_frames_rounded) - 1.0) / 8.0)
actual_num_frames = max(9, min(MAX_NUM_FRAMES, int(n_val * 8 + 1)))
# Calculate dimensions based on the provided image(s)
if input_image_filepath:
try:
img = Image.open(input_image_filepath)
orig_w, orig_h = img.size
actual_height, actual_width = calculate_new_dimensions(orig_w, orig_h)
except Exception as e:
print(f"Error processing input image: {e}")
if final_image_filepath:
try:
img = Image.open(final_image_filepath)
orig_w, orig_h = img.size
actual_height, actual_width = calculate_new_dimensions(orig_w, orig_h)
except Exception as e:
print(f"Error processing final image: {e}")
actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE
else:
actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE
elif final_image_filepath:
try:
img = Image.open(final_image_filepath)
orig_w, orig_h = img.size
actual_height, actual_width = calculate_new_dimensions(orig_w, orig_h)
except Exception as e:
print(f"Error processing final image: {e}")
actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE
else:
actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE
# Since we're handling all resizing ourselves, we don't need padding
height_padded = actual_height
width_padded = actual_width
num_frames_padded = ((actual_num_frames - 2) // 8 + 1) * 8 + 1
padding_values = calculate_padding(actual_height, actual_width, height_padded, width_padded)
# Prepare call arguments
call_kwargs = {
"prompt": prompt,
"negative_prompt": negative_prompt,
"height": height_padded,
"width": width_padded,
"num_frames": num_frames_padded,
"frame_rate": int(FPS),
"generator": torch.Generator(device="cuda").manual_seed(int(seed_ui)),
"output_type": "pt",
"conditioning_items": None,
"media_items": None,
"decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"],
"decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"],
"stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"],
"image_cond_noise_scale": 0.15,
"is_video": True,
"vae_per_channel_normalize": True,
"mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"),
"offload_to_cpu": False,
"enhance_prompt": False,
}
# Set skip layer strategy
stg_mode_str = PIPELINE_CONFIG_YAML.get("stg_mode", "attention_values")
if stg_mode_str.lower() in ["stg_av", "attention_values"]:
call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.AttentionValues
elif stg_mode_str.lower() in ["stg_as", "attention_skip"]:
call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.AttentionSkip
elif stg_mode_str.lower() in ["stg_r", "residual"]:
call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.Residual
elif stg_mode_str.lower() in ["stg_t", "transformer_block"]:
call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.TransformerBlock
# Add image conditioning if provided
conditioning_items = []
# Add initial frame conditioning if provided
if input_image_filepath:
try:
# First resize and squash the image to the exact dimensions we want
resized_image_path = resize_and_squash_image(input_image_filepath, actual_width, actual_height)
# Now load this pre-resized image with load_image_to_tensor_with_resize_and_crop
# Since it's already the correct size, the "crop" part will be a no-op
media_tensor = load_image_to_tensor_with_resize_and_crop(
resized_image_path, actual_height, actual_width
)
# Clean up temporary file
if os.path.exists(resized_image_path):
os.remove(resized_image_path)
media_tensor = torch.nn.functional.pad(media_tensor, padding_values)
conditioning_items.append(ConditioningItem(media_tensor.to("cuda"), 0, 1.0))
except Exception as e:
print(f"Error loading initial image: {e}")
raise gr.Error(f"Could not load initial image: {e}")
# Add final frame conditioning if provided
if final_image_filepath:
try:
# First resize and squash the final image to match the initial image dimensions
resized_final_path = resize_and_squash_image(
final_image_filepath, actual_width, actual_height
)
# Now load this pre-resized image with load_image_to_tensor_with_resize_and_crop
# Since it's already the correct size, the "crop" part will be a no-op
final_media_tensor = load_image_to_tensor_with_resize_and_crop(
resized_final_path, actual_height, actual_width
)
# Clean up temporary file
if os.path.exists(resized_final_path):
os.remove(resized_final_path)
final_media_tensor = torch.nn.functional.pad(final_media_tensor, padding_values)
conditioning_items.append(ConditioningItem(final_media_tensor.to("cuda"), num_frames_padded - 1, 1.0))
except Exception as e:
print(f"Error loading final image: {e}")
raise gr.Error(f"Could not load final image: {e}")
if conditioning_items:
call_kwargs["conditioning_items"] = conditioning_items
# Generate video
active_latent_upsampler = latent_upsampler_instance if improve_texture_flag else None
if improve_texture_flag and active_latent_upsampler:
multi_scale_pipeline_obj = LTXMultiScalePipeline(pipeline_instance, active_latent_upsampler)
first_pass_args = PIPELINE_CONFIG_YAML.get("first_pass", {}).copy()
first_pass_args["guidance_scale"] = float(ui_guidance_scale)
first_pass_args.pop("num_inference_steps", None)
second_pass_args = PIPELINE_CONFIG_YAML.get("second_pass", {}).copy()
second_pass_args["guidance_scale"] = float(ui_guidance_scale)
second_pass_args.pop("num_inference_steps", None)
multi_scale_call_kwargs = call_kwargs.copy()
multi_scale_call_kwargs.update({
"downscale_factor": PIPELINE_CONFIG_YAML["downscale_factor"],
"first_pass": first_pass_args,
"second_pass": second_pass_args,
})
result_images_tensor = multi_scale_pipeline_obj(**multi_scale_call_kwargs).images
else:
single_pass_call_kwargs = call_kwargs.copy()
first_pass_config_from_yaml = PIPELINE_CONFIG_YAML.get("first_pass", {})
single_pass_call_kwargs.update({
"timesteps": first_pass_config_from_yaml.get("timesteps"),
"guidance_scale": float(ui_guidance_scale),
"stg_scale": first_pass_config_from_yaml.get("stg_scale"),
"rescaling_scale": first_pass_config_from_yaml.get("rescaling_scale"),
"skip_block_list": first_pass_config_from_yaml.get("skip_block_list")
})
for key in ["num_inference_steps", "first_pass", "second_pass", "downscale_factor"]:
single_pass_call_kwargs.pop(key, None)
result_images_tensor = pipeline_instance(**single_pass_call_kwargs).images
if result_images_tensor is None:
raise gr.Error("Generation failed.")
# Process the result
pad_left, pad_right, pad_top, pad_bottom = padding_values
slice_h_end = -pad_bottom if pad_bottom > 0 else None
slice_w_end = -pad_right if pad_right > 0 else None
result_images_tensor = result_images_tensor[
:, :, :actual_num_frames, pad_top:slice_h_end, pad_left:slice_w_end
]
video_np = result_images_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy()
video_np = np.clip(video_np, 0, 1)
video_np = (video_np * 255).astype(np.uint8)
# Save video temporarily
temp_dir = tempfile.mkdtemp()
output_video_path = os.path.join(temp_dir, f"output_{random.randint(10000, 99999)}.mp4")
try:
with imageio.get_writer(output_video_path, fps=call_kwargs["frame_rate"], macro_block_size=1) as video_writer:
for frame_idx in range(video_np.shape[0]):
progress(frame_idx / video_np.shape[0], desc="Saving video")
video_writer.append_data(video_np[frame_idx])
except Exception as e:
print(f"Error saving video: {e}")
try:
with imageio.get_writer(output_video_path, fps=call_kwargs["frame_rate"], format='FFMPEG', codec='libx264', quality=8) as video_writer:
for frame_idx in range(video_np.shape[0]):
progress(frame_idx / video_np.shape[0], desc="Saving video (fallback)")
video_writer.append_data(video_np[frame_idx])
except Exception as e2:
shutil.rmtree(temp_dir, ignore_errors=True)
raise gr.Error(f"Failed to save video: {e2}")
# Upload video and get URL
video_url = video_to_url(output_video_path)
shutil.rmtree(temp_dir, ignore_errors=True)
return video_url
# Initialize models
initialize_models()
# Gradio interface
css = """
#col-container {
margin: 0 auto;
max-width: 900px;
}
.gradio-container {
background-color: #f5f5f5;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown("# LTX Video Generator")
gr.Markdown("Generate videos from images using AI. Provide at least one input image (first frame or last frame) and a prompt.")
with gr.Row():
with gr.Column():
gr.Markdown("### Input Options")
input_image_input = gr.Image(label="First Frame Image (Optional)", type="filepath", sources=["upload", "webcam", "clipboard"])
final_image_input = gr.Image(label="Last Frame Image (Optional)", type="filepath", sources=["upload", "webcam", "clipboard"])
prompt_input = gr.Textbox(label="Prompt", value="The creature from the image starts to move", lines=3)
duration_input = gr.Slider(
label="Video Duration (seconds)",
minimum=1,
maximum=8,
value=2,
step=0.1,
info="Target video duration (1s to 8s)"
)
generate_button = gr.Button("Generate Video", variant="primary")
with gr.Column():
gr.Markdown("### Output")
video_output = gr.Textbox(label="Generated Video URL", interactive=False)
video_preview = gr.Video(label="Video Preview", interactive=False, visible=False)
gr.Markdown("**Note:** You must provide at least one input image (either first frame or last frame).")
generate_button.click(
fn=generate,
inputs=[prompt_input, input_image_input, final_image_input, duration_input],
outputs=[video_output],
api_name="generate_video"
)
if __name__ == "__main__":
demo.launch()