import gradio as gr import torch import spaces import numpy as np import random import os import yaml import requests import shutil from pathlib import Path import imageio import tempfile from PIL import Image from huggingface_hub import hf_hub_download from inference import ( create_ltx_video_pipeline, create_latent_upsampler, load_image_to_tensor_with_resize_and_crop, seed_everething, get_device, calculate_padding, load_media_file ) from ltx_video.pipelines.pipeline_ltx_video import ConditioningItem, LTXMultiScalePipeline, LTXVideoPipeline from ltx_video.utils.skip_layer_strategy import SkipLayerStrategy # Configuration config_file_path = "configs/ltxv-13b-0.9.7-distilled.yaml" with open(config_file_path, "r") as file: PIPELINE_CONFIG_YAML = yaml.safe_load(file) LTX_REPO = "Lightricks/LTX-Video" MAX_IMAGE_SIZE = 1024 MAX_NUM_FRAMES = 257 FPS = 24.0 # Default values DEFAULT_NEGATIVE_PROMPT = "worst quality, inconsistent motion, blurry, jittery, distorted" DEFAULT_GUIDANCE_SCALE = PIPELINE_CONFIG_YAML.get("first_pass", {}).get("guidance_scale", 1.0) DEFAULT_SEED = 42 DEFAULT_IMPROVE_TEXTURE = True TARGET_FIXED_SIDE = 768 # Global variables for loaded models pipeline_instance = None latent_upsampler_instance = None models_dir = "downloaded_models_gradio_cpu_init" Path(models_dir).mkdir(parents=True, exist_ok=True) def video_to_url(video_path): """ Uploads a video to a temporary hosting service and returns its URL. """ try: url = 'https://uguu.se/upload' with open(video_path, 'rb') as f: files = {'files[]': (os.path.basename(video_path), f)} response = requests.post(url, files=files) response_json = response.json() return response_json['files'][0]['url'] except FileNotFoundError: return "Error: File not found. Please check the video path." except Exception as e: return f"An error occurred: {e}" def calculate_new_dimensions(orig_w, orig_h): """ Calculates new dimensions for height and width based on original media dimensions. Rules: 1. If any dimension > 1024, resize so max dimension is 1024, maintaining aspect ratio. 2. Then ensure both dimensions are multiples of 32 by rounding to the nearest multiple. 3. Ensure dimensions are within [256, 1024]. """ if orig_w == 0 or orig_h == 0: return TARGET_FIXED_SIDE, TARGET_FIXED_SIDE # Step 1: Handle dimensions > 1024 new_w, new_h = orig_w, orig_h if max(orig_w, orig_h) > 1024: max_dim = max(orig_w, orig_h) scale = 1024 / max_dim new_w = int(orig_w * scale) new_h = int(orig_h * scale) # Step 2: Round to nearest multiples of 32 def round_to_multiple(x, multiple=32): return round(x / multiple) * multiple new_w = round_to_multiple(new_w) new_h = round_to_multiple(new_h) # Step 3: Ensure within bounds new_w = max(256, min(new_w, MAX_IMAGE_SIZE)) new_h = max(256, min(new_h, MAX_IMAGE_SIZE)) return new_h, new_w def resize_and_squash_image(image_path, target_width, target_height): """ Resize and potentially squash/stretch an image to the exact target dimensions. This ensures no cropping will be needed later. """ img = Image.open(image_path) # Resize to exact dimensions, possibly distorting aspect ratio img = img.resize((target_width, target_height), Image.LANCZOS) # Save to temporary file temp_path = os.path.join(tempfile.gettempdir(), f"resized_{os.path.basename(image_path)}") img.save(temp_path) return temp_path def initialize_models(): """Initialize and download models""" global pipeline_instance, latent_upsampler_instance print("Downloading models (if not present)...") distilled_model_actual_path = hf_hub_download( repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["checkpoint_path"], local_dir=models_dir, local_dir_use_symlinks=False ) PIPELINE_CONFIG_YAML["checkpoint_path"] = distilled_model_actual_path spatial_upscaler_actual_path = hf_hub_download( repo_id=LTX_REPO, filename=PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"], local_dir=models_dir, local_dir_use_symlinks=False ) PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"] = spatial_upscaler_actual_path print("Creating LTX Video pipeline on CPU...") pipeline_instance = create_ltx_video_pipeline( ckpt_path=PIPELINE_CONFIG_YAML["checkpoint_path"], precision=PIPELINE_CONFIG_YAML["precision"], text_encoder_model_name_or_path=PIPELINE_CONFIG_YAML["text_encoder_model_name_or_path"], sampler=PIPELINE_CONFIG_YAML["sampler"], device="cpu", enhance_prompt=False, prompt_enhancer_image_caption_model_name_or_path=PIPELINE_CONFIG_YAML["prompt_enhancer_image_caption_model_name_or_path"], prompt_enhancer_llm_model_name_or_path=PIPELINE_CONFIG_YAML["prompt_enhancer_llm_model_name_or_path"], ) if PIPELINE_CONFIG_YAML.get("spatial_upscaler_model_path"): print("Creating latent upsampler on CPU...") latent_upsampler_instance = create_latent_upsampler( PIPELINE_CONFIG_YAML["spatial_upscaler_model_path"], device="cpu" ) target_inference_device = "cuda" print(f"Target inference device: {target_inference_device}") pipeline_instance.to(target_inference_device) if latent_upsampler_instance: latent_upsampler_instance.to(target_inference_device) @spaces.GPU(duration=45) def generate(prompt, input_image_url=None, final_image_url=None, duration_ui=2, progress=gr.Progress(track_tqdm=True)): """Generate video from image(s) and prompt""" # Validate input - at least one image must be provided if input_image_url is None and final_image_url is None: raise gr.Error("Please provide at least one input image (either first frame or last frame)") input_image_filepath = input_image_url final_image_filepath = final_image_url # Set default values negative_prompt = DEFAULT_NEGATIVE_PROMPT seed_ui = DEFAULT_SEED randomize_seed = True ui_guidance_scale = DEFAULT_GUIDANCE_SCALE improve_texture_flag = DEFAULT_IMPROVE_TEXTURE if randomize_seed: seed_ui = random.randint(0, 2**32 - 1) seed_everething(int(seed_ui)) # Calculate target frames target_frames_ideal = duration_ui * FPS target_frames_rounded = max(1, round(target_frames_ideal)) n_val = round((float(target_frames_rounded) - 1.0) / 8.0) actual_num_frames = max(9, min(MAX_NUM_FRAMES, int(n_val * 8 + 1))) # Calculate dimensions based on the provided image(s) if input_image_filepath: try: img = Image.open(input_image_filepath) orig_w, orig_h = img.size actual_height, actual_width = calculate_new_dimensions(orig_w, orig_h) except Exception as e: print(f"Error processing input image: {e}") if final_image_filepath: try: img = Image.open(final_image_filepath) orig_w, orig_h = img.size actual_height, actual_width = calculate_new_dimensions(orig_w, orig_h) except Exception as e: print(f"Error processing final image: {e}") actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE else: actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE elif final_image_filepath: try: img = Image.open(final_image_filepath) orig_w, orig_h = img.size actual_height, actual_width = calculate_new_dimensions(orig_w, orig_h) except Exception as e: print(f"Error processing final image: {e}") actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE else: actual_height, actual_width = TARGET_FIXED_SIDE, TARGET_FIXED_SIDE # Since we're handling all resizing ourselves, we don't need padding height_padded = actual_height width_padded = actual_width num_frames_padded = ((actual_num_frames - 2) // 8 + 1) * 8 + 1 padding_values = calculate_padding(actual_height, actual_width, height_padded, width_padded) # Prepare call arguments call_kwargs = { "prompt": prompt, "negative_prompt": negative_prompt, "height": height_padded, "width": width_padded, "num_frames": num_frames_padded, "frame_rate": int(FPS), "generator": torch.Generator(device="cuda").manual_seed(int(seed_ui)), "output_type": "pt", "conditioning_items": None, "media_items": None, "decode_timestep": PIPELINE_CONFIG_YAML["decode_timestep"], "decode_noise_scale": PIPELINE_CONFIG_YAML["decode_noise_scale"], "stochastic_sampling": PIPELINE_CONFIG_YAML["stochastic_sampling"], "image_cond_noise_scale": 0.15, "is_video": True, "vae_per_channel_normalize": True, "mixed_precision": (PIPELINE_CONFIG_YAML["precision"] == "mixed_precision"), "offload_to_cpu": False, "enhance_prompt": False, } # Set skip layer strategy stg_mode_str = PIPELINE_CONFIG_YAML.get("stg_mode", "attention_values") if stg_mode_str.lower() in ["stg_av", "attention_values"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.AttentionValues elif stg_mode_str.lower() in ["stg_as", "attention_skip"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.AttentionSkip elif stg_mode_str.lower() in ["stg_r", "residual"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.Residual elif stg_mode_str.lower() in ["stg_t", "transformer_block"]: call_kwargs["skip_layer_strategy"] = SkipLayerStrategy.TransformerBlock # Add image conditioning if provided conditioning_items = [] # Add initial frame conditioning if provided if input_image_filepath: try: # First resize and squash the image to the exact dimensions we want resized_image_path = resize_and_squash_image(input_image_filepath, actual_width, actual_height) # Now load this pre-resized image with load_image_to_tensor_with_resize_and_crop # Since it's already the correct size, the "crop" part will be a no-op media_tensor = load_image_to_tensor_with_resize_and_crop( resized_image_path, actual_height, actual_width ) # Clean up temporary file if os.path.exists(resized_image_path): os.remove(resized_image_path) media_tensor = torch.nn.functional.pad(media_tensor, padding_values) conditioning_items.append(ConditioningItem(media_tensor.to("cuda"), 0, 1.0)) except Exception as e: print(f"Error loading initial image: {e}") raise gr.Error(f"Could not load initial image: {e}") # Add final frame conditioning if provided if final_image_filepath: try: # First resize and squash the final image to match the initial image dimensions resized_final_path = resize_and_squash_image( final_image_filepath, actual_width, actual_height ) # Now load this pre-resized image with load_image_to_tensor_with_resize_and_crop # Since it's already the correct size, the "crop" part will be a no-op final_media_tensor = load_image_to_tensor_with_resize_and_crop( resized_final_path, actual_height, actual_width ) # Clean up temporary file if os.path.exists(resized_final_path): os.remove(resized_final_path) final_media_tensor = torch.nn.functional.pad(final_media_tensor, padding_values) conditioning_items.append(ConditioningItem(final_media_tensor.to("cuda"), num_frames_padded - 1, 1.0)) except Exception as e: print(f"Error loading final image: {e}") raise gr.Error(f"Could not load final image: {e}") if conditioning_items: call_kwargs["conditioning_items"] = conditioning_items # Generate video active_latent_upsampler = latent_upsampler_instance if improve_texture_flag else None if improve_texture_flag and active_latent_upsampler: multi_scale_pipeline_obj = LTXMultiScalePipeline(pipeline_instance, active_latent_upsampler) first_pass_args = PIPELINE_CONFIG_YAML.get("first_pass", {}).copy() first_pass_args["guidance_scale"] = float(ui_guidance_scale) first_pass_args.pop("num_inference_steps", None) second_pass_args = PIPELINE_CONFIG_YAML.get("second_pass", {}).copy() second_pass_args["guidance_scale"] = float(ui_guidance_scale) second_pass_args.pop("num_inference_steps", None) multi_scale_call_kwargs = call_kwargs.copy() multi_scale_call_kwargs.update({ "downscale_factor": PIPELINE_CONFIG_YAML["downscale_factor"], "first_pass": first_pass_args, "second_pass": second_pass_args, }) result_images_tensor = multi_scale_pipeline_obj(**multi_scale_call_kwargs).images else: single_pass_call_kwargs = call_kwargs.copy() first_pass_config_from_yaml = PIPELINE_CONFIG_YAML.get("first_pass", {}) single_pass_call_kwargs.update({ "timesteps": first_pass_config_from_yaml.get("timesteps"), "guidance_scale": float(ui_guidance_scale), "stg_scale": first_pass_config_from_yaml.get("stg_scale"), "rescaling_scale": first_pass_config_from_yaml.get("rescaling_scale"), "skip_block_list": first_pass_config_from_yaml.get("skip_block_list") }) for key in ["num_inference_steps", "first_pass", "second_pass", "downscale_factor"]: single_pass_call_kwargs.pop(key, None) result_images_tensor = pipeline_instance(**single_pass_call_kwargs).images if result_images_tensor is None: raise gr.Error("Generation failed.") # Process the result pad_left, pad_right, pad_top, pad_bottom = padding_values slice_h_end = -pad_bottom if pad_bottom > 0 else None slice_w_end = -pad_right if pad_right > 0 else None result_images_tensor = result_images_tensor[ :, :, :actual_num_frames, pad_top:slice_h_end, pad_left:slice_w_end ] video_np = result_images_tensor[0].permute(1, 2, 3, 0).cpu().float().numpy() video_np = np.clip(video_np, 0, 1) video_np = (video_np * 255).astype(np.uint8) # Save video temporarily temp_dir = tempfile.mkdtemp() output_video_path = os.path.join(temp_dir, f"output_{random.randint(10000, 99999)}.mp4") try: with imageio.get_writer(output_video_path, fps=call_kwargs["frame_rate"], macro_block_size=1) as video_writer: for frame_idx in range(video_np.shape[0]): progress(frame_idx / video_np.shape[0], desc="Saving video") video_writer.append_data(video_np[frame_idx]) except Exception as e: print(f"Error saving video: {e}") try: with imageio.get_writer(output_video_path, fps=call_kwargs["frame_rate"], format='FFMPEG', codec='libx264', quality=8) as video_writer: for frame_idx in range(video_np.shape[0]): progress(frame_idx / video_np.shape[0], desc="Saving video (fallback)") video_writer.append_data(video_np[frame_idx]) except Exception as e2: shutil.rmtree(temp_dir, ignore_errors=True) raise gr.Error(f"Failed to save video: {e2}") # Upload video and get URL video_url = video_to_url(output_video_path) shutil.rmtree(temp_dir, ignore_errors=True) return video_url # Initialize models initialize_models() # Gradio interface css = """ #col-container { margin: 0 auto; max-width: 900px; } .gradio-container { background-color: #f5f5f5; } """ with gr.Blocks(css=css) as demo: gr.Markdown("# LTX Video Generator") gr.Markdown("Generate videos from images using AI. Provide at least one input image (first frame or last frame) and a prompt.") with gr.Row(): with gr.Column(): gr.Markdown("### Input Options") input_image_input = gr.Image(label="First Frame Image (Optional)", type="filepath", sources=["upload", "webcam", "clipboard"]) final_image_input = gr.Image(label="Last Frame Image (Optional)", type="filepath", sources=["upload", "webcam", "clipboard"]) prompt_input = gr.Textbox(label="Prompt", value="The creature from the image starts to move", lines=3) duration_input = gr.Slider( label="Video Duration (seconds)", minimum=1, maximum=8, value=2, step=0.1, info="Target video duration (1s to 8s)" ) generate_button = gr.Button("Generate Video", variant="primary") with gr.Column(): gr.Markdown("### Output") video_output = gr.Textbox(label="Generated Video URL", interactive=False) video_preview = gr.Video(label="Video Preview", interactive=False, visible=False) gr.Markdown("**Note:** You must provide at least one input image (either first frame or last frame).") generate_button.click( fn=generate, inputs=[prompt_input, input_image_input, final_image_input, duration_input], outputs=[video_output], api_name="generate_video" ) if __name__ == "__main__": demo.launch()