#!/usr/bin/env python3 # --------------------------------------------------------------------------- # RELEASE – GhostPack Video Generator (📸 Showcase Tab Polished) # --------------------------------------------------------------------------- import os, sys, argparse, random, traceback, json, logging import numpy as np, torch, einops, gradio as gr from PIL import Image from diffusers import AutoencoderKLHunyuanVideo from transformers import ( LlamaModel, CLIPTextModel, LlamaTokenizerFast, CLIPTokenizer, SiglipImageProcessor, SiglipVisionModel, ) from diffusers_helper.hunyuan import ( encode_prompt_conds, vae_encode, vae_decode, vae_decode_fake, ) from diffusers_helper.utils import ( save_bcthw_as_mp4, crop_or_pad_yield_mask, soft_append_bcthw, resize_and_center_crop, generate_timestamp, ) from diffusers_helper.models.hunyuan_video_packed import HunyuanVideoTransformer3DModelPacked from diffusers_helper.pipelines.k_diffusion_hunyuan import sample_hunyuan from diffusers_helper.memory import ( gpu, get_cuda_free_memory_gb, DynamicSwapInstaller, unload_complete_models, load_model_as_complete, fake_diffusers_current_device, move_model_to_device_with_memory_preservation, offload_model_from_device_for_memory_preservation, ) from diffusers_helper.thread_utils import AsyncStream, async_run from diffusers_helper.gradio.progress_bar import make_progress_bar_css, make_progress_bar_html from diffusers_helper.clip_vision import hf_clip_vision_encode from diffusers_helper.bucket_tools import find_nearest_bucket # --------------------------------------------------------------------------- # SETUP LOGGING & TCMALLOC DETECTION # --------------------------------------------------------------------------- BASE = os.path.abspath(os.path.dirname(__file__)) LOG_PATH = os.path.join(BASE, "ghostpack.log") logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s', handlers=[logging.FileHandler(LOG_PATH), logging.StreamHandler(sys.stdout)] ) logger = logging.getLogger(__name__) def detect_tcmalloc(): try: with open('/proc/self/maps', 'r') as f: return 'tcmalloc' in f.read() except Exception: return False logger.info(f"TCMalloc loaded: {detect_tcmalloc()}") # --------------------------------------------------------------------------- # SETTINGS # --------------------------------------------------------------------------- SETTINGS_PATH = os.path.join(BASE, "settings.json") default_settings = { "latent_window": 9, "cfg_scale": 1.0, "cfg_rescale": 0.0, "gpu_keep": 6.0, "enable_teacache": True, "mp4_crf": 16 } try: with open(SETTINGS_PATH, "r") as f: loaded = json.load(f) settings = loaded if isinstance(loaded, dict) else default_settings except: settings = default_settings def save_settings(lw, cs, cr, gk, teac, crf): s = { "latent_window": lw, "cfg_scale": cs, "cfg_rescale": cr, "gpu_keep": gk, "enable_teacache": teac, "mp4_crf": crf } with open(SETTINGS_PATH, "w") as f: json.dump(s, f, indent=2) return "✅ Settings saved!" def load_logs(): if os.path.exists(LOG_PATH): with open(LOG_PATH, "r") as f: return f.read() return "No logs found." # --------------------------------------------------------------------------- # ENV / CACHE # --------------------------------------------------------------------------- CACHE = os.path.join(BASE, "hf_download") os.makedirs(CACHE, exist_ok=True) for _v in ("HF_HOME", "TRANSFORMERS_CACHE", "HF_DATASETS_CACHE"): os.environ[_v] = CACHE os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- ap = argparse.ArgumentParser() ap.add_argument("--share", action="store_true") ap.add_argument("--server", default="0.0.0.0") ap.add_argument("--port", type=int, default=7860) ap.add_argument("--inbrowser", action="store_true") args = ap.parse_args() # --------------------------------------------------------------------------- # VRAM # --------------------------------------------------------------------------- free_gb = get_cuda_free_memory_gb(gpu) hi_vram = free_gb > 60 logger.info(f"[GhostPack] Free VRAM: {free_gb:.1f} GB | High-VRAM: {hi_vram}") # --------------------------------------------------------------------------- # MODEL LOAD # --------------------------------------------------------------------------- def llm(sf): return LlamaModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder=sf, torch_dtype=torch.float16 ).cpu().eval() def clip(sf): return CLIPTextModel.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder=sf, torch_dtype=torch.float16 ).cpu().eval() text_enc = llm("text_encoder") text_enc2 = clip("text_encoder_2") tok = LlamaTokenizerFast.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder="tokenizer" ) tok2 = CLIPTokenizer.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder="tokenizer_2" ) vae = AutoencoderKLHunyuanVideo.from_pretrained( "hunyuanvideo-community/HunyuanVideo", subfolder="vae", torch_dtype=torch.float16 ).cpu().eval() feat_ext = SiglipImageProcessor.from_pretrained( "lllyasviel/flux_redux_bfl", subfolder="feature_extractor" ) img_enc = SiglipVisionModel.from_pretrained( "lllyasviel/flux_redux_bfl", subfolder="image_encoder", torch_dtype=torch.float16 ).cpu().eval() trans = HunyuanVideoTransformer3DModelPacked.from_pretrained( "lllyasviel/FramePackI2V_HY", torch_dtype=torch.bfloat16 ).cpu().eval() trans.high_quality_fp32_output_for_inference = True if not hi_vram: vae.enable_slicing(); vae.enable_tiling() else: for m in (text_enc, text_enc2, img_enc, vae, trans): m.to(gpu) trans.to(dtype=torch.bfloat16) for m in (vae, img_enc, text_enc, text_enc2): m.to(dtype=torch.float16) for m in (vae, img_enc, text_enc, text_enc2, trans): m.requires_grad_(False) if not hi_vram: DynamicSwapInstaller.install_model(trans, device=gpu) DynamicSwapInstaller.install_model(text_enc, device=gpu) OUT = os.path.join(BASE, "outputs") os.makedirs(OUT, exist_ok=True) stream = AsyncStream() # --------------------------------------------------------------------------- # DEMO IMAGE # --------------------------------------------------------------------------- demo_path = os.path.join(BASE, "img", "demo.png") demo_np = np.array(Image.open(demo_path).convert("RGB")) \ if os.path.exists(demo_path) else None # --------------------------------------------------------------------------- # WORKER & WRAPPER # --------------------------------------------------------------------------- @torch.no_grad() def worker(img, p, n_p, sd, secs, win, stp, cfg, gsc, rsc, keep, tea, crf): if sd == -1: sd = random.randint(0, 2**31 - 1) sections = max(round((secs * 30) / (win * 4)), 1) job = generate_timestamp() stream.output_queue.push(("progress", (None, "", make_progress_bar_html(0, "Start")))) try: if not hi_vram: unload_complete_models(text_enc, text_enc2, img_enc, vae, trans) fake_diffusers_current_device(text_enc, gpu) load_model_as_complete(text_enc2, gpu) lv, cp = encode_prompt_conds(p, text_enc, text_enc2, tok, tok2) lv_n, cp_n= (torch.zeros_like(lv), torch.zeros_like(cp)) \ if cfg == 1 else encode_prompt_conds(n_p, text_enc, text_enc2, tok, tok2) lv, m = crop_or_pad_yield_mask(lv, 512) lv_n, m_n = crop_or_pad_yield_mask(lv_n, 512) H, W, _ = img.shape h, w = find_nearest_bucket(H, W, 640) img_np = resize_and_center_crop(img, w, h) Image.fromarray(img_np).save(os.path.join(OUT, f"{job}.png")) img_pt = torch.from_numpy(img_np).float() / 127.5 - 1 img_pt = img_pt.permute(2, 0, 1)[None, :, None] if not hi_vram: load_model_as_complete(vae, gpu) start_lat = vae_encode(img_pt, vae) if not hi_vram: load_model_as_complete(img_enc, gpu) img_hidden= hf_clip_vision_encode(img_np, feat_ext, img_enc).last_hidden_state to = trans.dtype lv, lv_n, cp, cp_n, img_hidden = [x.to(to) for x in (lv, lv_n, cp, cp_n, img_hidden)] gen = torch.Generator("cpu").manual_seed(sd) frames = win * 4 - 3 hist_lat = torch.zeros((1,16,19,h//8,w//8), dtype=torch.float32).cpu() hist_px, total = None, 0 pad_seq = [3] + [2]*(sections-3) + [1,0] if sections>4 else list(reversed(range(sections))) for pad in pad_seq: last = (pad == 0) if stream.input_queue.top() == "end": stream.output_queue.push(("end", None)); return pad_sz = pad * win idx = torch.arange(0, sum([1,pad_sz,win,1,2,16])).unsqueeze(0) a,b,c,d,e,f = idx.split([1,pad_sz,win,1,2,16],1) clean_idx = torch.cat([a,d],1) pre = start_lat.to(hist_lat) post,two,four = hist_lat[:,:,:19].split([1,2,16],2) clean = torch.cat([pre,post],2) if not hi_vram: unload_complete_models() move_model_to_device_with_memory_preservation(trans, gpu, keep) trans.initialize_teacache(tea, stp) def cb(d): pv = vae_decode_fake(d["denoised"]) pv = (pv*255).cpu().numpy().clip(0,255).astype(np.uint8) pv = einops.rearrange(pv, "b c t h w->(b h)(t w)c") cur = d["i"] + 1 stream.output_queue.push(("progress", (pv, f"{total*4-3}f", make_progress_bar_html(int(100*cur/stp), f"{cur}/{stp}")))) if stream.input_queue.top() == "end": stream.output_queue.push(("end", None)); raise KeyboardInterrupt new_lat = sample_hunyuan( transformer=trans, sampler="unipc", width=w, height=h, frames=frames, real_guidance_scale=cfg, distilled_guidance_scale=gsc, guidance_rescale=rsc, num_inference_steps=stp, generator=gen, prompt_embeds=lv, prompt_embeds_mask=m, prompt_poolers=cp, negative_prompt_embeds=lv_n, negative_prompt_embeds_mask=m_n, negative_prompt_poolers=cp_n, device=gpu, dtype=torch.bfloat16, image_embeddings=img_hidden, latent_indices=c, clean_latents=clean, clean_latent_indices=clean_idx, clean_latents_2x=two, clean_latent_2x_indices=e, clean_latents_4x=four, clean_latent_4x_indices=f, callback=cb ) if last: new_lat = torch.cat([start_lat.to(new_lat), new_lat], 2) total += new_lat.shape[2] hist_lat = torch.cat([new_lat.to(hist_lat), hist_lat], 2) if not hi_vram: offload_model_from_device_for_memory_preservation(trans, gpu, 8) load_model_as_complete(vae, gpu) real = hist_lat[:,:,:total] if hist_px is None: hist_px = vae_decode(real, vae).cpu() else: sec_lat = win*2 + 1 if last else win*2 cur_px = vae_decode(real[:,:,:sec_lat], vae).cpu() hist_px = soft_append_bcthw(cur_px, hist_px, win*4-3) if not hi_vram: unload_complete_models() mp4 = os.path.join(OUT, f"{job}_{total}.mp4") save_bcthw_as_mp4(hist_px, mp4, fps=30, crf=crf) stream.output_queue.push(("file", mp4)) if last: break except Exception: traceback.print_exc() stream.output_queue.push(("end", None)) def wrapper(*args): yield from wrapper_logic(*args) def wrapper_logic(img,p,n_p,sd,secs,win,stp,cfg,gsc,rsc,keep,tea,crf): global stream if img is None: raise gr.Error("Upload an image.") yield None,None,"","","",gr.update(interactive=False),gr.update(interactive=True) stream = AsyncStream() async_run(worker, img, p, n_p, sd, secs, win, stp, cfg, gsc, rsc, keep, tea, crf) out, log = None, "" while True: flag, data = stream.output_queue.next() if flag == "file": out = data yield out, gr.update(), gr.update(), gr.update(), log, gr.update(interactive=False), gr.update(interactive=True) if flag == "progress": pv, desc, html = data log = desc yield gr.update(), gr.update(visible=True, value=pv), desc, html, log, gr.update(interactive=False), gr.update(interactive=True) if flag == "end": yield out, gr.update(visible=False), gr.update(), "", log, gr.update(interactive=True), gr.update(interactive=False) break # --------------------------------------------------------------------------- # UI # --------------------------------------------------------------------------- def run_ui(): css = make_progress_bar_css() + """ body,.gradio-container{background:#111821;color:#eee;font-family:Roboto,Arial,sans-serif} .gr-button,.gr-button-primary{background:#006dff;border:#006dff;font-size:16px;padding:10px 22px} .gr-button-primary.save{background:#28a745;border:#28a745} .gr-button-primary.save:hover{background:#3ec06d;border:#3ec06d} .gr-button:hover,.gr-button-primary:hover{background:#0099ff;border:#0099ff} input,textarea,.gr-input,.gr-textbox,.gr-slider,.gr-number{background:#1e1e1e;color:#eee;border-color:#006dff} .info-grid{display:grid;grid-template-columns:repeat(auto-fit,minmax(320px,1fr));gap:24px;margin-top:20px} .info-card{background:#0d1b2a;padding:28px;border:2px solid #0099ff;border-radius:14px;box-shadow:0 0 18px #000a} .info-card h2{margin:0 0 16px 0;color:#00e6ff;font-size:26px} .info-card p{margin:0 0 18px 0;line-height:1.6} .info-card ul,ol{margin:0;padding-left:22px} .info-card li{margin-bottom:12px} .info-card a{color:#37c4ff;text-decoration:none;font-weight:600} .info-card a:hover{text-decoration:underline;color:#6fe0ff} """ positive_examples = [ ["A neon specter gliding through a cyberpunk cityscape at dusk."], ["An astronaut exploring an alien jungle on Mars."], ["A brave knight facing a dragon in a stormy courtyard."], ["A futuristic city skyline at night with flying cars."] ] negative_examples = [ ["low quality, blurry, watermark, distorted, deformed"] ] blk = gr.Blocks(css=css).queue() with blk: gr.Markdown("## đŸ‘ģ **GhostPack – Phantom Float Math Edition**") with gr.Tabs(): with gr.TabItem("đŸŽ›ī¸ Generator"): with gr.Row(): with gr.Column(scale=6): img = gr.Image(sources=["upload"], type="numpy", label="Input Image", height=360, value=demo_np) prm = gr.Textbox(label="Prompt", placeholder="Enter positive prompt") ds_pos = gr.Dataset(samples=positive_examples, label="🎨 Quick Prompts", components=[prm]) ds_pos.click(lambda x: x[0], inputs=[ds_pos], outputs=[prm]) npr = gr.Textbox(label="Negative Prompt", placeholder="Enter negative prompt") ds_neg = gr.Dataset(samples=negative_examples, label="đŸšĢ Neg Prompts", components=[npr]) ds_neg.click(lambda x: x[0], inputs=[ds_neg], outputs=[npr]) se = gr.Number(label="Seed", value=-1, precision=0, minimum=-1) sec = gr.Slider(label="Length (s)", minimum=1, maximum=120, value=5, step=0.1) win = gr.Slider(label="Latent Window", minimum=1, maximum=33, value=settings["latent_window"], step=1) stp = gr.Slider(label="Steps", minimum=1, maximum=100, value=25, step=1) cfg = gr.Slider(label="CFG Scale", minimum=1, maximum=32, value=settings["cfg_scale"], step=0.01) gsc = gr.Slider(label="Distilled CFG", minimum=1, maximum=32, value=settings["cfg_scale"], step=0.01) rsc = gr.Slider(label="Guidance Rescale", minimum=0, maximum=1, value=settings["cfg_rescale"], step=0.01) keep = gr.Slider(label="GPU Keep (GB)", minimum=6, maximum=128, value=settings["gpu_keep"], step=0.1) tea = gr.Checkbox(label="Use TeaCache", value=settings["enable_teacache"]) crf = gr.Slider(label="MP4 CRF", minimum=0, maximum=100, value=settings["mp4_crf"], step=1) b_go, b_end = gr.Button("Start 🚀"), gr.Button("End", interactive=False) with gr.Column(scale=5): vid = gr.Video(label="Output Video", autoplay=True, height=540, loop=True) log = gr.Markdown() inputs = [img, prm, npr, se, sec, win, stp, cfg, gsc, rsc, keep, tea, crf] b_go.click(fn=wrapper, inputs=inputs, outputs=[vid, gr.Image(visible=False), gr.Markdown(), gr.HTML(), log, b_go, b_end]) b_end.click(fn=lambda: stream.input_queue.push("end")) with gr.TabItem("âš™ī¸ Settings"): gr.Markdown("### âš™ī¸ Advanced Settings & Save") lw = gr.Slider(label="Latent Window", minimum=1, maximum=33, value=settings["latent_window"], step=1) cs = gr.Slider(label="CFG Scale", minimum=1, maximum=32, value=settings["cfg_scale"], step=0.01) cr = gr.Slider(label="CFG Rescale", minimum=0, maximum=1, value=settings["cfg_rescale"], step=0.01) gk = gr.Slider(label="GPU Keep (GB)", minimum=6, maximum=128, value=settings["gpu_keep"], step=0.1) te_s = gr.Checkbox(label="Enable TeaCache", value=settings["enable_teacache"]) crf_s= gr.Slider(label="MP4 CRF", minimum=0, maximum=100, value=settings["mp4_crf"], step=1) save_btn = gr.Button("Save Settings ✅", elem_classes="save") save_status = gr.Markdown("") save_btn.click(fn=save_settings, inputs=[lw, cs, cr, gk, te_s, crf_s], outputs=[save_status]) with gr.TabItem("📝 Logs"): gr.Markdown("### 📝 GhostPack Logs") log_area = gr.Textbox(label="Logs", lines=20, interactive=False, value=load_logs()) refresh_btn = gr.Button("🔄 Refresh Logs") refresh_btn.click(fn=load_logs, inputs=None, outputs=[log_area]) with gr.TabItem("â„šī¸ About Me"): gr.HTML("""

đŸ‘ģ GhostAI: AI Media Innovator

I'm a DevOps AI engineer specializing in autonomous media pipelines. My passion is crafting cutting-edge AI tools for video, audio, and automation.

Mission: Empower creators with fast, innovative AI solutions.
Projects: GhostPack Video Generator, GhostAI Music Generator.
Vision: Redefine media creation with AI-driven precision.

🔗 HuggingFace Profile
âœ‰ī¸ Contact Me

""") with gr.TabItem("📸 Showcase"): gr.HTML("""

✨ GhostPack Showcase

Experience cinematic AI video creation with GhostPack, powered by advanced neural networks for fluid motion and stunning visuals. From cyberpunk cityscapes to alien jungles, GhostPack brings your imagination to life. Inspired by the success of the GhostAI Music Generator, this tool is your gateway to next-gen media creation.

GhostPack Animated Banner

GhostPack in Action: Dynamic video generation with phantom-like precision.

GhostPack Demo GIF

Demo: Create videos from a single image and prompt.

Main Interface

Main Interface: Intuitive controls for video generation.

Advanced Settings

Settings: Fine-tune for optimal performance.

Logs Display

Logs: Monitor VRAM and generation progress.

""") blk.launch(server_name=args.server, server_port=args.port, share=args.share, inbrowser=args.inbrowser) if __name__ == "__main__": run_ui()