#!/usr/bin/env python3 # ========================================================== # FILE: ghostpack_gradio_f1.py # ========================================================== import os, sys, time, json, argparse, importlib.util, subprocess, traceback import torch, einops, numpy as np from PIL import Image import io import gradio as gr import asyncio from queue import Queue from threading import Thread, Event import re import logging from diffusers import AutoencoderKLHunyuanVideo from transformers import ( LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer, SiglipImageProcessor, SiglipVisionModel ) from diffusers_helper.hf_login import login from diffusers_helper.hunyuan import ( encode_prompt_conds, vae_decode, vae_encode, vae_decode_fake ) from diffusers_helper.utils import ( save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw, resize_and_center_crop, generate_timestamp ) from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked from diffusers_helper.pipelines.k_diffusion_hunyuan import sample_hunyuan from diffusers_helper.memory import ( gpu, get_cuda_free_memory_gb, move_model_to_device_with_memory_preservation, offload_model_from_device_for_memory_preservation, fake_diffusers_current_device, DynamicSwapInstaller, unload_complete_models, load_model_as_complete ) from diffusers_helper.clip_vision import hf_clip_vision_encode from diffusers_helper.bucket_tools import find_nearest_bucket # Set up logging logging.basicConfig(filename='/home/ubuntu/ghostpack/ghostpack.log', level=logging.ERROR, format='%(asctime)s %(levelname)s:%(message)s') # MODIFIED: Added version number VERSION = "1.0.0" # ------------------------- CLI ---------------------------- parser = argparse.ArgumentParser() parser.add_argument('--share', action='store_true') parser.add_argument('--server', type=str, default='0.0.0.0') parser.add_argument('--port', type=int) parser.add_argument('--inbrowser', action='store_true') parser.add_argument('--cli', action='store_true') args = parser.parse_args() # MODIFIED: Global state variables render_progress = 0.0 render_status = "idle" render_times = [] stream = None start_render_time = None BASE = os.path.abspath(os.path.dirname(__file__)) os.environ['HF_HOME'] = os.path.join(BASE, 'hf_download') if args.cli: print("👻 GhostPack F1 Pro CLI\n") print("python ghostpack_gradio_f1.py # launch UI") print("python ghostpack_gradio_f1.py --cli # show help\n") sys.exit(0) # ---------------------- Paths ----------------------------- OUT_BASE = os.path.join('/home/ubuntu/ghostpack', 'outputs') OUT_IMG = os.path.join(OUT_BASE, 'img') OUT_TEMP = os.path.join(OUT_BASE, 'tmp') OUT_VID = os.path.join(OUT_BASE, 'vid') OUT_DATA = os.path.join(OUT_BASE, 'data') PROMPT_LOG = os.path.join(OUT_DATA, 'prompts.txt') SAVED_PROMPTS = os.path.join(OUT_DATA, 'saved_prompts.json') INSTALL_LOG = os.path.join(OUT_DATA, 'install_logs.txt') LAST_CLEANUP_FILE = os.path.join(OUT_DATA, 'last_cleanup.txt') VIDEO_INFO_JSON = os.path.join(OUT_DATA, 'video_info.json') # MODIFIED: Create directories and initialize files with permissions for d in (OUT_BASE, OUT_IMG, OUT_TEMP, OUT_VID, OUT_DATA): try: os.makedirs(d, exist_ok=True) os.chmod(d, 0o775) except Exception as e: logging.error(f"Failed to create/chmod directory {d}: {e}") if not os.path.exists(SAVED_PROMPTS): try: with open(SAVED_PROMPTS, 'w') as f: json.dump([], f) os.chmod(SAVED_PROMPTS, 0o664) except Exception as e: logging.error(f"Failed to create/chmod {SAVED_PROMPTS}: {e}") if not os.path.exists(INSTALL_LOG): try: open(INSTALL_LOG, 'w').close() os.chmod(INSTALL_LOG, 0o664) except Exception as e: logging.error(f"Failed to create/chmod {INSTALL_LOG}: {e}") if not os.path.exists(PROMPT_LOG): try: open(PROMPT_LOG, 'w').close() os.chmod(PROMPT_LOG, 0o664) except Exception as e: logging.error(f"Failed to create/chmod {PROMPT_LOG}: {e}") if not os.path.exists(LAST_CLEANUP_FILE): try: with open(LAST_CLEANUP_FILE, 'w') as f: f.write(str(time.time())) os.chmod(LAST_CLEANUP_FILE, 0o664) except Exception as e: logging.error(f"Failed to create/chmod {LAST_CLEANUP_FILE}: {e}") if not os.path.exists(VIDEO_INFO_JSON): try: with open(VIDEO_INFO_JSON, 'w') as f: json.dump([], f) os.chmod(VIDEO_INFO_JSON, 0o664) except Exception as e: logging.error(f"Failed to create/chmod {VIDEO_INFO_JSON}: {e}") # ---------------- Prompt utils --------------------------- def get_last_prompts(): try: return json.load(open(SAVED_PROMPTS))[-5:][::-1] except Exception as e: logging.error(f"Failed to load prompts from {SAVED_PROMPTS}: {e}") return [] def save_prompt_fn(prompt, n_p): if not prompt: return "❌ No prompt" try: data = json.load(open(SAVED_PROMPTS)) entry = {'prompt': prompt, 'negative': n_p} if entry not in data: data.append(entry) with open(SAVED_PROMPTS, 'w') as f: json.dump(data, f) os.chmod(SAVED_PROMPTS, 0o664) return "✅ Saved" except Exception as e: logging.error(f"Failed to save prompt to {SAVED_PROMPTS}: {e}") return "❌ Save failed" def load_prompt_fn(idx): lst = get_last_prompts() return lst[idx]['prompt'] if idx < len(lst) else "" # ---------------- Cleanup utils -------------------------- def clear_temp_videos(): try: for f in os.listdir(OUT_TEMP): os.remove(os.path.join(OUT_TEMP, f)) return "✅ Temp cleared" except Exception as e: logging.error(f"Failed to clear temp videos in {OUT_TEMP}: {e}") return "❌ Clear failed" def clear_old_files(): cutoff = time.time() - 7 * 24 * 3600 c = 0 try: for d in (OUT_TEMP, OUT_IMG, OUT_VID, OUT_DATA): for f in os.listdir(d): p = os.path.join(d, f) if os.path.isfile(p) and os.path.getmtime(p) < cutoff: os.remove(p) c += 1 with open(LAST_CLEANUP_FILE, 'w') as f: f.write(str(time.time())) os.chmod(LAST_CLEANUP_FILE, 0o664) return f"✅ {c} old files removed" except Exception as e: logging.error(f"Failed to clear old files: {e}") return "❌ Clear failed" def clear_images(): try: for f in os.listdir(OUT_IMG): os.remove(os.path.join(OUT_IMG, f)) return "✅ Images cleared" except Exception as e: logging.error(f"Failed to clear images in {OUT_IMG}: {e}") return "❌ Clear failed" def clear_videos(): try: for f in os.listdir(OUT_VID): os.remove(os.path.join(OUT_VID, f)) return "✅ Videos cleared" except Exception as e: logging.error(f"Failed to clear videos in {OUT_VID}: {e}") return "❌ Clear failed" def check_and_run_weekly_cleanup(): try: with open(LAST_CLEANUP_FILE, 'r') as f: last_cleanup = float(f.read().strip()) except (FileNotFoundError, ValueError): last_cleanup = 0 if time.time() - last_cleanup > 7 * 24 * 3600: return clear_old_files() return "" # ---------------- Gallery helpers ------------------------ def list_images(): return sorted( [os.path.join(OUT_IMG, f) for f in os.listdir(OUT_IMG) if f.lower().endswith(('.png', '.jpg'))], key=os.path.getmtime ) def list_videos(): return sorted( [os.path.join(OUT_VID, f) for f in os.listdir(OUT_VID) if f.lower().endswith('.mp4')], key=os.path.getmtime ) def load_image(sel): imgs = list_images() if sel in [os.path.basename(p) for p in imgs]: pth = imgs[[os.path.basename(p) for p in imgs].index(sel)] return gr.update(value=pth), gr.update(value=os.path.basename(pth)) return gr.update(), gr.update() def load_video(sel): vids = list_videos() if sel in [os.path.basename(p) for p in vids]: pth = vids[[os.path.basename(p) for p in vids].index(sel)] return gr.update(value=pth), gr.update(value=os.path.basename(pth)) return gr.update(), gr.update() def next_image_and_load(sel): imgs = list_images() if not imgs: return gr.update(), gr.update() names = [os.path.basename(i) for i in imgs] idx = (names.index(sel) + 1) % len(names) if sel in names else 0 pth = imgs[idx] return gr.update(value=pth), gr.update(value=os.path.basename(pth)) def next_video_and_load(sel): vids = list_videos() if not vids: return gr.update(), gr.update() names = [os.path.basename(v) for v in vids] idx = (names.index(sel) + 1) % len(names) if sel in names else 0 pth = vids[idx] return gr.update(value=pth), gr.update(value=os.path.basename(pth)) def gallery_image_select(evt: gr.SelectData): imgs = list_images() if evt.index is not None and evt.index < len(imgs): pth = imgs[evt.index] return gr.update(value=pth), gr.update(value=os.path.basename(pth)) return gr.update(), gr.update() def gallery_video_select(evt: gr.SelectData): vids = list_videos() if evt.index is not None and evt.index < len(vids): pth = vids[evt.index] return gr.update(value=pth), gr.update(value=os.path.basename(pth)) return gr.update(), gr.update() # ---------------- Install status ------------------------- def check_mod(n): return importlib.util.find_spec(n) is not None def status_xformers(): return "✅ xformers" if check_mod("xformers") else "❌ xformers" def status_sage(): return "✅ sage-attn" if check_mod("sageattention") else "❌ sage-attn" def status_flash(): return "✅ flash-attn" if check_mod("flash_attn") else "⚠️ flash-attn" def install_pkg(pkg, warn=None): if warn: print(warn) time.sleep(1) try: out = subprocess.check_output( [sys.executable, "-m", "pip", "install", pkg], stderr=subprocess.STDOUT, text=True ) res = f"✅ {pkg}\n{out}\n" except subprocess.CalledProcessError as e: res = f"❌ {pkg}\n{e.output}\n" with open(INSTALL_LOG, 'a') as f: f.write(f"[{pkg}] {res}") return res install_xformers = lambda: install_pkg("xformers") install_sage_attn = lambda: install_pkg("sage-attn") install_flash_attn = lambda: install_pkg("flash-attn", "⚠️ long compile") refresh_logs = lambda: open(INSTALL_LOG).read() clear_logs = lambda: (open(INSTALL_LOG, 'w').close() or "✅ Logs cleared") # ---------------- Model load ----------------------------- free_mem = get_cuda_free_memory_gb(gpu) hv = free_mem > 60 try: text_encoder = LlamaModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder', torch_dtype=torch.float16 ).cpu().eval() text_encoder_2 = CLIPTextModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='text_encoder_2', torch_dtype=torch.float16 ).cpu().eval() tokenizer = LlamaTokenizerFast.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer' ) tokenizer_2 = CLIPTokenizer.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='tokenizer_2' ) vae = AutoencoderKLHunyuanVideo.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder='vae', torch_dtype=torch.float16 ).cpu().eval() feature_extractor = SiglipImageProcessor.from_pretrained( "lllyasviel/flux_redux_bfl", subfolder='feature_extractor' ) image_encoder = SiglipVisionModel.from_pretrained( "lllyasviel/flux_redux_bfl", subfolder='image_encoder', torch_dtype=torch.float16 ).cpu().eval() transformer = HunyuanVideoTransformer3DModelPacked.from_pretrained( "lllyasviel/FramePack_F1_I2V_HY_20250503", torch_dtype=torch.bfloat16 ).cpu().eval() except Exception as e: logging.error(f"Failed to load models: {e}") raise if not hv: vae.enable_slicing() vae.enable_tiling() transformer.high_quality_fp32_output_for_inference = True transformer.to(dtype=torch.bfloat16) for m in (vae, image_encoder, text_encoder, text_encoder_2): m.to(dtype=torch.float16) for m in (vae, image_encoder, text_encoder, text_encoder_2, transformer): m.requires_grad_(False) if not hv: DynamicSwapInstaller.install_model(transformer, device=gpu) DynamicSwapInstaller.install_model(text_encoder, device=gpu) else: for m in (text_encoder, text_encoder_2, image_encoder, vae, transformer): m.to(gpu) class AsyncStream: def __init__(self): self.input_queue = Queue() self.output_queue = Queue() self.stop_event = Event() def put(self, item): self.output_queue.put(item) def get(self): return self.output_queue.get() def is_stopped(self): return self.stop_event.is_set() def stop(self): self.stop_event.set() self.input_queue.put("end") # ---------------- Worker ------------------------------- @torch.no_grad() def worker(img, prompt, n_p, seed, secs, win, stp, cfg, gsc, rsc, keep, tea, crf, camera_action="Static Camera"): global render_progress, render_status, render_times, start_render_time, stream start_render_time = time.time() render_status = "rendering" render_progress = 0.0 stream = AsyncStream() # Validate prompt for smoothness, stop, and silence, and append camera action if "stop" not in prompt.lower() and secs > 5: prompt += " The subject stops moving after 5 seconds." if "smooth" not in prompt.lower(): prompt = f"Smooth animation: {prompt}" if "silent" not in prompt.lower(): prompt += ", silent" prompt = update_prompt(prompt, camera_action) if len(prompt.split()) > 50: print("Warning: Complex prompt may slow rendering or cause instability.") # Check VRAM availability if get_cuda_free_memory_gb(gpu) < 2: render_status = "error" logging.error("Low VRAM (<2GB). Lower 'kee' or 'win'.") raise Exception("Low VRAM (<2GB). Lower 'kee' or 'win'.") sections = max(round((secs * 30) / (win * 4)), 1) jid = generate_timestamp() try: with open(PROMPT_LOG, 'a') as f: f.write(f"{jid}\t{prompt}\t{n_p}\n") os.chmod(PROMPT_LOG, 0o664) except Exception as e: logging.error(f"Failed to write to {PROMPT_LOG}: {e}") stream.put(('progress', (None, "", ProgressBar().make_progress_bar_html(0, "Start")))) try: if not hv: unload_complete_models(text_encoder, text_encoder_2, image_encoder, vae, transformer) fake_diffusers_current_device(text_encoder, gpu) load_model_as_complete(text_encoder_2, gpu) lv, cp = encode_prompt_conds(prompt, text_encoder, text_encoder_2, tokenizer, tokenizer_2) if cfg == 1: lv_n = torch.zeros_like(lv) cp_n = torch.zeros_like(cp) else: lv_n, cp_n = encode_prompt_conds(n_p, text_encoder, text_encoder_2, tokenizer, tokenizer_2) lv, m = crop_or_pad_yield_mask(lv, 512) lv_n, m_n = crop_or_pad_yield_mask(lv_n, 512) lv, cp, lv_n, cp_n = [x.to(torch.bfloat16) for x in (lv, cp, lv_n, cp_n)] H, W, _ = img.shape h, w = find_nearest_bucket(H, W, 640) img_np = resize_and_center_crop(img, w, h) img_filename = f"{jid}.png" try: Image.fromarray(img_np).save(os.path.join(OUT_IMG, img_filename)) os.chmod(os.path.join(OUT_IMG, img_filename), 0o664) except Exception as e: logging.error(f"Failed to save image {img_filename}: {e}") raise img_pt = (torch.from_numpy(img_np).float() / 127.5 - 1).permute(2, 0, 1)[None, :, None] if not hv: load_model_as_complete(vae, gpu) start_lat = vae_encode(img_pt, vae) if not hv: load_model_as_complete(image_encoder, gpu) img_emb = hf_clip_vision_encode(img_np, feature_extractor, image_encoder).last_hidden_state.to(torch.bfloat16) gen = torch.Generator("cpu").manual_seed(seed) hist_lat = torch.zeros((1, 16, 1 + 2 + 16, h // 8, w // 8), dtype=torch.float32).cpu() hist_px = None total = 0 pad_seq = [3] + [2] * (sections - 3) + [1, 0] if sections > 4 else list(reversed(range(sections))) section_index = 0 for pad in pad_seq: if stream.is_stopped(): render_status = "stopped" stream.put(("stopped", None)) return None last = pad == 0 pad_sz = pad * win idx = torch.arange(0, sum([1, pad_sz, win, 1, 2, 16]))[None] a, b, c, d, e, f = idx.split([1, pad_sz, win, 1, 2, 16], 1) clean_idx = torch.cat([a, d], 1) pre = start_lat.to(hist_lat) post, two, four = hist_lat[:, :, :1 + 2 + 16].split([1, 2, 16], 2) clean = torch.cat([pre, post], 2) if not hv: unload_complete_models() move_model_to_device_with_memory_preservation(transformer, gpu, keep) transformer.initialize_teacache(tea, stp) def cb(d): global render_progress pv = vae_decode_fake(d["denoised"]) pv = (pv * 255).cpu().numpy().clip(0, 255).astype(np.uint8) pv = einops.rearrange(pv, "b c t h w->(b h)(t w)c") cur = d["i"] + 1 render_progress = (cur / stp) * 100 stream.put(('progress', (pv, f"{cur}/{stp}", ProgressBar().make_progress_bar_html(int(100 * cur / stp), f"{cur}/{stp}")))) if stream.is_stopped(): stream.put(("stopped", None)) raise KeyboardInterrupt new_lat = sample_hunyuan( transformer=transformer, sampler="unipc", width=w, height=h, frames=win * 4 - 3, real_guidance_scale=cfg, distilled_guidance_scale=gsc, guidance_rescale=rsc, num_inference_steps=stp, generator=gen, prompt_embeds=lv, prompt_embeds_mask=m, prompt_poolers=cp, negative_prompt_embeds=lv_n, negative_prompt_embeds_mask=m_n, negative_prompt_poolers=cp_n, device=gpu, dtype=torch.bfloat16, image_embeddings=img_emb, latent_indices=c, clean_latents=clean, clean_latent_indices=clean_idx, clean_latents_2x=two, clean_latent_2x_indices=e, clean_latents_4x=four, clean_latent_4x_indices=f, callback=cb ) if last: new_lat = torch.cat([start_lat.to(new_lat), new_lat], 2) total += new_lat.shape[2] hist_lat = torch.cat([new_lat.to(hist_lat), hist_lat], 2) if not hv: offload_model_from_device_for_memory_preservation(transformer, gpu, 8) load_model_as_complete(vae, gpu) real = hist_lat[:, :, :total] if hist_px is None: hist_px = vae_decode(real, vae).cpu() else: overlap = win * 4 - 3 curr = vae_decode(real[:, :, :win * 2], vae).cpu() hist_px = soft_append_bcthw(curr, hist_px, overlap) if not hv: unload_complete_models() tmp_filename = f"{jid}_{total}.mp4" tmp = os.path.join(OUT_TEMP, tmp_filename) try: save_bcthw_as_mp4(hist_px, tmp, fps=30, crf=crf) os.chmod(tmp, 0o664) except Exception as e: logging.error(f"Failed to save video {tmp}: {e}") raise stream.put(('file', tmp)) section_index += 1 if last: fin_filename = f"{jid}_{total}.mp4" fin = os.path.join(OUT_VID, fin_filename) try: os.replace(tmp, fin) os.chmod(fin, 0o664) save_video_info(prompt, n_p, fin_filename, seed, secs, None) stream.put(('complete', fin)) render_status = "complete" end_time = time.time() render_time = end_time - start_render_time render_times.append(render_time) if len(render_times) > 3: render_times.pop(0) return fin except Exception as e: logging.error(f"Failed to finalize video {fin}: {e}") raise except Exception as e: traceback.print_exc() render_status = "error" stream.put(("stopped", str(e))) logging.error(f"Worker failed: {e}") return None finally: render_progress = 0.0 start_render_time = None @torch.no_grad() def process(img, prm, npr, sd, sec, win, stp, cfg, gsc, rsc, kee, tea, crf): global stream if img is None: yield None, None, "Please upload an image to proceed.", "", gr.update(interactive=False), gr.update(interactive=True) return yield None, None, "", "", gr.update(interactive=False), gr.update(interactive=True) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: future = loop.run_in_executor(None, lambda: worker(img, prm, npr, sd, sec, win, stp, cfg, gsc, rsc, kee, tea, crf)) out, log = None, "" while True: try: if stream and not stream.output_queue.empty(): flag, data = stream.get() if flag == "file": out = data yield out, gr.update(), gr.update(), log, gr.update(interactive=False), gr.update(interactive=True) elif flag == "progress": pv, desc, html = data log = desc yield gr.update(), gr.update(visible=True, value=pv), desc, html, gr.update(interactive=False), gr.update(interactive=True) elif flag in ("complete", "stopped", "end"): yield out, gr.update(visible=False), gr.update(), "", gr.update(interactive=True), gr.update(interactive=False) break except Exception as e: logging.error(f"Error in process queue: {e}") yield None, gr.update(visible=False), "Error occurred during processing.", "", gr.update(interactive=True), gr.update(interactive=False) break finally: loop.close() def end_process(): if stream: stream.stop() # ------------------- UI ------------------------------ quick_prompts = [ ["Smooth animation: A character waves for 3 seconds, then stands still for 2 seconds, static camera, silent."], ["Smooth animation: A character moves for 5 seconds, static camera, silent."] ] css = """ .orange-button{background:#ff6200;color:#fff;border-color:#ff6200;} .load-button{background:#4CAF50;color:#fff;border-color:#4CAF50;margin-left:10px;} .big-setting-button{background:#0066cc;color:#fff;border:none;padding:14px 24px;font-size:18px;width:100%;border-radius:6px;margin:8px 0;} .styled-dropdown{width:250px;padding:5px;border-radius:4px;} .viewer-column{width:100%;max-width:900px;margin:0 auto;} .media-preview img,.media-preview video{max-width:100%;height:380px;object-fit:contain;border:1px solid #444;border-radius:6px;} .media-container{display:flex;gap:20px;align-items:flex-start;} .control-box{min-width:220px;} .control-grid{display:grid;grid-template-columns:1fr 1fr;gap:10px;} .image-gallery{display:grid!important;grid-template-columns:repeat(auto-fit,minmax(300px,1fr))!important;gap:10px;padding:10px!important;overflow-y:auto!important;max-height:360px!important;} .image-gallery .gallery-item{padding:10px;height:360px!important;width:300px!important;} .image-gallery img{object-fit:contain;height:360px!important;width:300px!important;} .video-gallery{display:grid!important;grid-template-columns:repeat(auto-fit,minmax(300px,1fr))!important;gap:10px;padding:10px!important;overflow-y:auto!important;max-height:360px!important;} .video-gallery .gallery-item{padding:10px;height:360px!important;width:300px!important;} .video-gallery video{object-fit:contain;height:360px!important;width:300px!important;} .stop-button {background-color: #ff4d4d !important; color: white !important;} .progress-bar { width: 100%; height: 20px; background-color: #444; border-radius: 10px; overflow: hidden; } .progress-bar-fill { height: 100%; background-color: #ff6200; border-radius: 10px; transition: width 0.3s ease-in-out; } """ blk = gr.Blocks(css=css, title="GhostPack F1 Pro").queue() with blk: gr.Markdown("# 👻 GhostPack F1 Pro") with gr.Tabs(): with gr.TabItem("👻 Generate"): with gr.Row(): with gr.Column(): img_in = gr.Image(sources="upload", type="numpy", label="Image", height=320) generate_button = gr.Button("Generate Video", elem_id="generate_button") stop_button = gr.Button("Stop Generation", elem_id="stop_button", elem_classes="stop-button") prm = gr.Textbox( label="Prompt", value="Smooth animation: A female stands with subtle, sensual micro-movements, breathing gently, slight head tilt, static camera, silent", elem_id="prompt_input" ) npr = gr.Textbox( label="Negative Prompt", value="low quality, blurry, speaking, talking, moaning, vocalizing, lip movement, mouth animation, sound, dialogue, speech, whispering, shouting, lip sync, facial animation, expressive face, verbal expression, animated mouth", elem_id="negative_prompt_input" ) save_msg = gr.Markdown("") btn_save = gr.Button("Save Prompt") btn1, btn2, btn3 = gr.Button("Load Most Recent"), gr.Button("Load 2nd Recent"), gr.Button("Load 3rd Recent") ds = gr.Dataset(samples=quick_prompts, label="Quick List", components=[prm]) ds.click(lambda x: x[0], [ds], [prm]) btn_save.click(save_prompt_fn, [prm, npr], [save_msg]) btn1.click(lambda: load_prompt_fn(0), [], [prm]) btn2.click(lambda: load_prompt_fn(1), [], [prm]) btn3.click(lambda: load_prompt_fn(2), [], [prm]) with gr.Column(): pv = gr.Image(label="Next Latents", height=200, visible=False) vid = gr.Video(label="Finished", autoplay=True, height=500, loop=True, show_share_button=False) log_md = gr.Markdown("") bar = gr.HTML("") with gr.Column(): se = gr.Number(label="Seed", value=31337, precision=0, elem_id="seed_input") sec = gr.Slider(label="Video Length (s)", minimum=1, maximum=120, value=5, step=0.1, elem_id="video_length_input") win = gr.Slider(label="Latent Window", minimum=1, maximum=33, value=5, step=1, elem_id="latent_window_input") stp = gr.Slider(label="Steps", minimum=1, maximum=100, value=12, step=1, elem_id="steps_input") cfg = gr.Slider(label="CFG", minimum=1, maximum=32, value=1, step=0.01, elem_id="cfg_input", visible=False) gsc = gr.Slider(label="Distilled CFG", minimum=1, maximum=32, value=7, step=0.1, elem_id="distilled_cfg_input") rsc = gr.Slider(label="CFG Re-Scale", minimum=0, maximum=1, value=0.7, step=0.01, elem_id="cfg_rescale_input") kee = gr.Slider(label="GPU Keep (GB)", minimum=4, maximum=free_mem, value=6, step=0.1, elem_id="gpu_keep_input") crf = gr.Slider(label="MP4 CRF", minimum=0, maximum=100, value=20, step=1, elem_id="mp4_crf_input") tea = gr.Checkbox(label="Use TeaCache", value=True, elem_id="use_teacache_input") generate_button.click( fn=process, inputs=[img_in, prm, npr, se, sec, win, stp, cfg, gsc, rsc, kee, tea, crf], outputs=[vid, pv, log_md, bar, generate_button, stop_button] ) stop_button.click(fn=end_process) gr.Button("Update Progress").click( fn=get_progress, outputs=[log_md, bar] ) with gr.TabItem("🖼️ Image Gallery"): with gr.Row(elem_classes="media-container"): with gr.Column(scale=3): image_preview = gr.Image( label="Viewer", value=(list_images()[0] if list_images() else None), interactive=False, elem_classes="media-preview" ) with gr.Column(elem_classes="control-box"): image_dropdown = gr.Dropdown( choices=[os.path.basename(i) for i in list_images()], value=(os.path.basename(list_images()[0]) if list_images() else None), label="Select", elem_classes="styled-dropdown" ) with gr.Row(elem_classes="control-grid"): load_btn = gr.Button("Load", elem_classes="load-button") next_btn = gr.Button("Next", elem_classes="load-button") with gr.Row(elem_classes="control-grid"): refresh_btn = gr.Button("Refresh") delete_btn = gr.Button("Delete", elem_classes="orange-button") image_gallery = gr.Gallery( value=list_images(), label="Thumbnails", columns=6, height=360, allow_preview=False, type="filepath", elem_classes="image-gallery" ) load_btn.click(load_image, [image_dropdown], [image_preview, image_dropdown]) next_btn.click(next_image_and_load, [image_dropdown], [image_preview, image_dropdown]) refresh_btn.click( lambda: ( gr.update(choices=[os.path.basename(i) for i in list_images()], value=os.path.basename(list_images()[0]) if list_images() else None), gr.update(value=list_images()[0] if list_images() else None), gr.update(value=list_images()) ), [], [image_dropdown, image_preview, image_gallery] ) delete_btn.click( lambda sel: (os.remove(os.path.join(OUT_IMG, sel)) if sel else None) or load_image(""), [image_dropdown], [image_preview, image_dropdown] ) image_gallery.select(gallery_image_select, [], [image_preview, image_dropdown]) with gr.TabItem("🎬 Video Gallery"): with gr.Row(elem_classes="media-container"): with gr.Column(scale=3): video_preview = gr.Video( label="Viewer", value=(list_videos()[0] if list_videos() else None), autoplay=True, loop=True, interactive=False, elem_classes="media-preview" ) with gr.Column(elem_classes="control-box"): video_dropdown = gr.Dropdown( choices=[os.path.basename(v) for v in list_videos()], value=(os.path.basename(list_videos()[0]) if list_videos() else None), label="Select", elem_classes="styled-dropdown" ) with gr.Row(elem_classes="control-grid"): load_vbtn = gr.Button("Load", elem_classes="load-button") next_vbtn = gr.Button("Next", elem_classes="load-button") with gr.Row(elem_classes="control-grid"): refresh_v = gr.Button("Refresh") delete_v = gr.Button("Delete", elem_classes="orange-button") video_gallery = gr.Gallery( value=list_videos(), label="Thumbnails", columns=6, height=360, allow_preview=False, type="filepath", elem_classes="video-gallery" ) load_vbtn.click(load_video, [video_dropdown], [video_preview, video_dropdown]) next_vbtn.click(next_video_and_load, [video_dropdown], [video_preview, video_dropdown]) refresh_v.click( lambda: ( gr.update(choices=[os.path.basename(v) for v in list_videos()], value=os.path.basename(list_videos()[0]) if list_videos() else None), gr.update(value=list_videos()[0] if list_videos() else None), gr.update(value=list_videos()) ), [], [video_dropdown, video_preview, video_gallery] ) delete_v.click( lambda sel: (os.remove(os.path.join(OUT_VID, sel)) if sel else None) or load_video(""), [video_dropdown], [video_preview, video_dropdown] ) video_gallery.select(gallery_video_select, [], [video_preview, video_dropdown]) with gr.TabItem("👻 About"): gr.Markdown("## GhostPack F1 Pro") with gr.Row(): with gr.Column(): gr.Markdown("**🛠️ Description**\nImage-to-Video toolkit powered by HunyuanVideo & FramePack-F1") with gr.Column(): gr.Markdown(f"**📦 Version**\n{VERSION}") with gr.Column(): gr.Markdown("**✍️ Author**\nGhostAI") with gr.Column(): gr.Markdown("**🔗 Repo**\nhttps://huggingface.co/spaces/ghostai1/GhostPack") with gr.TabItem("⚙️ Settings"): ct = gr.Button("Clear Temp", elem_classes="big-setting-button") ctmsg = gr.Markdown("") co = gr.Button("Clear Old", elem_classes="big-setting-button") comsg = gr.Markdown("") ci = gr.Button("Clear Images", elem_classes="big-setting-button") cimg = gr.Markdown("") cv = gr.Button("Clear Videos", elem_classes="big-setting-button") cvid = gr.Markdown("") ct.click(clear_temp_videos, [], ctmsg) co.click(clear_old_files, [], comsg) ci.click(clear_images, [], cimg) cv.click(clear_videos, [], cvid) with gr.TabItem("🛠️ Install"): xs = gr.Textbox(value=status_xformers(), interactive=False, label="xformers") bx = gr.Button("Install xformers", elem_classes="big-setting-button") ss = gr.Textbox(value=status_sage(), interactive=False, label="sage-attn") bs = gr.Button("Install sage-attn", elem_classes="big-setting-button") fs = gr.Textbox(value=status_flash(), interactive=False, label="flash-attn") bf = gr.Button("Install flash-attn", elem_classes="big-setting-button") bx.click(install_xformers, [], xs) bs.click(install_sage_attn, [], ss) bf.click(install_flash_attn, [], fs) with gr.TabItem("📜 Logs"): logs = gr.Textbox(lines=20, interactive=False, label="Install Logs") rl = gr.Button("Refresh", elem_classes="big-setting-button") cl = gr.Button("Clear", elem_classes="big-setting-button") rl.click(refresh_logs, [], logs) cl.click(clear_logs, [], logs) # Force video previews to seek to 2s gr.HTML(""" """) # Camera action update camera_action_input = gr.Dropdown( choices=[ "Static Camera", "Slight Orbit Left", "Slight Orbit Right", "Slight Orbit Up", "Slight Orbit Down", "Top-Down View", "Slight Zoom In", "Slight Zoom Out" ], label="Camera Action", value="Static Camera", elem_id="camera_action_input", info="Select a camera movement to append to the prompt." ) camera_action_input.change( fn=lambda prompt, camera_action: update_prompt(prompt, camera_action), inputs=[prm, camera_action_input], outputs=prm ) def update_prompt(prompt, camera_action): # Remove existing camera action from prompt camera_actions = [ "static camera", "slight camera orbit left", "slight camera orbit right", "slight camera orbit up", "slight camera orbit down", "top-down view", "slight camera zoom in", "slight camera zoom out" ] for action in camera_actions: prompt = re.sub(rf',\s*{re.escape(action)}\b', '', prompt, flags=re.IGNORECASE).strip() # Append selected camera action if camera_action and camera_action != "None": camera_phrase = f", {camera_action.lower()}" if len(prompt.split()) + len(camera_phrase.split()) <= 50: return prompt + camera_phrase else: logging.warning(f"Prompt exceeds 50 words after adding camera action: {prompt}") return prompt def get_progress(): markdown_text = f"Status: {render_status}\nProgress: {render_progress:.1f}%\nLast Render Time: {render_times[-1] if render_times else 0:.1f}s" progress_bar_html = ProgressBar().make_progress_bar_html(int(render_progress), f"{int(render_progress)}%") return markdown_text, progress_bar_html class ProgressBar: def make_progress_bar_css(self): return """ .progress-bar { width: 100%; height: 20px; background-color: #444; border-radius: 10px; overflow: hidden; } .progress-bar-fill { height: 100%; background-color: #ff6200; border-radius: 10px; transition: width 0.3s ease-in-out; } """ def make_progress_bar_html(self, percentage, label): css = self.make_progress_bar_css() fill_width = f"{percentage}%" html = f"""
{label}
""" return html blk.launch( server_name=args.server, server_port=args.port, share=args.share, inbrowser=args.inbrowser )