Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
import os | |
import glob | |
import base64 | |
import streamlit as st | |
import pandas as pd | |
import torch | |
from torch.utils.data import Dataset, DataLoader | |
import csv | |
import time | |
from dataclasses import dataclass | |
from typing import Optional | |
import zipfile | |
import math | |
from PIL import Image | |
import random | |
import logging | |
import numpy as np | |
import cv2 | |
from diffusers import DiffusionPipeline | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
log_records = [] | |
class LogCaptureHandler(logging.Handler): | |
def emit(self, record): | |
log_records.append(record) | |
logger.addHandler(LogCaptureHandler()) | |
st.set_page_config( | |
page_title="SFT Tiny Titans 🚀", | |
page_icon="🤖", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
menu_items={ | |
'Get Help': 'https://huggingface.co/awacke1', | |
'Report a Bug': 'https://huggingface.co/spaces/awacke1', | |
'About': "Tiny Titans: Small diffusion models, big CV dreams! 🌌" | |
} | |
) | |
if 'captured_images' not in st.session_state: | |
st.session_state['captured_images'] = [] | |
if 'cv_builder' not in st.session_state: | |
st.session_state['cv_builder'] = None | |
if 'cv_loaded' not in st.session_state: | |
st.session_state['cv_loaded'] = False | |
if 'active_tab' not in st.session_state: | |
st.session_state['active_tab'] = "Build Titan 🌱" | |
class DiffusionConfig: | |
"""Config for our diffusion heroes 🦸♂️ - Keeps the blueprint snappy!""" | |
name: str | |
base_model: str | |
size: str | |
def model_path(self): | |
return f"diffusion_models/{self.name}" | |
class DiffusionDataset(Dataset): | |
"""Pixel party platter 🍕 - Images and text for diffusion delight!""" | |
def __init__(self, images, texts): | |
self.images = images | |
self.texts = texts | |
def __len__(self): | |
return len(self.images) | |
def __getitem__(self, idx): | |
return {"image": self.images[idx], "text": self.texts[idx]} | |
class MicroDiffusionBuilder: | |
"""Tiny titan of diffusion 🐣 - Small but mighty for quick demos!""" | |
def __init__(self): | |
self.config = None | |
self.pipeline = None | |
self.jokes = ["Micro but mighty! 💪", "Small pixels, big dreams! 🌟"] | |
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): | |
try: | |
with st.spinner(f"Loading {model_path}... ⏳ (Tiny titan powering up!)"): | |
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) | |
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") | |
if config: | |
self.config = config | |
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}") | |
logger.info(f"Loaded Micro Diffusion: {model_path}") | |
except Exception as e: | |
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Tiny titan tripped!)") | |
logger.error(f"Failed to load {model_path}: {str(e)}") | |
raise | |
return self | |
def fine_tune_sft(self, images, texts, epochs=3): | |
try: | |
dataset = DiffusionDataset(images, texts) | |
dataloader = DataLoader(dataset, batch_size=1, shuffle=True) | |
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) | |
self.pipeline.unet.train() | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
for epoch in range(epochs): | |
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Micro titan flexing!)"): | |
total_loss = 0 | |
for batch in dataloader: | |
optimizer.zero_grad() | |
image = batch["image"][0].to(device) | |
text = batch["text"][0] | |
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() | |
noise = torch.randn_like(latents) | |
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) | |
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) | |
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] | |
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample | |
loss = torch.nn.functional.mse_loss(pred_noise, noise) | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") | |
st.success(f"Micro Diffusion tuned! 🎉 {random.choice(self.jokes)}") | |
logger.info(f"Fine-tuned Micro Diffusion: {self.config.name}") | |
except Exception as e: | |
st.error(f"Tuning failed: {str(e)} 💥 (Micro snag!)") | |
logger.error(f"Tuning failed: {str(e)}") | |
raise | |
return self | |
def save_model(self, path: str): | |
try: | |
with st.spinner("Saving model... 💾 (Packing tiny pixels!)"): | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
self.pipeline.save_pretrained(path) | |
st.success(f"Saved at {path}! ✅ Tiny titan secured!") | |
logger.info(f"Saved at {path}") | |
except Exception as e: | |
st.error(f"Save failed: {str(e)} 💥 (Packing mishap!)") | |
logger.error(f"Save failed: {str(e)}") | |
raise | |
def generate(self, prompt: str): | |
try: | |
return self.pipeline(prompt, num_inference_steps=20).images[0] | |
except Exception as e: | |
st.error(f"Generation failed: {str(e)} 💥 (Pixel oopsie!)") | |
logger.error(f"Generation failed: {str(e)}") | |
raise | |
class LatentDiffusionBuilder: | |
"""Scaled-down dreamer 🌙 - Latent magic for efficient artistry!""" | |
def __init__(self): | |
self.config = None | |
self.pipeline = None | |
self.jokes = ["Latent vibes only! 🌀", "Small scale, big style! 🎨"] | |
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): | |
try: | |
with st.spinner(f"Loading {model_path}... ⏳ (Latent titan rising!)"): | |
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) | |
self.pipeline.unet = torch.nn.Sequential(*list(self.pipeline.unet.children())[:2]) # Scale down U-Net | |
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") | |
if config: | |
self.config = config | |
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}") | |
logger.info(f"Loaded Latent Diffusion: {model_path}") | |
except Exception as e: | |
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Latent hiccup!)") | |
logger.error(f"Failed to load {model_path}: {str(e)}") | |
raise | |
return self | |
def fine_tune_sft(self, images, texts, epochs=3): | |
try: | |
dataset = DiffusionDataset(images, texts) | |
dataloader = DataLoader(dataset, batch_size=1, shuffle=True) | |
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) | |
self.pipeline.unet.train() | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
for epoch in range(epochs): | |
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Latent titan shaping up!)"): | |
total_loss = 0 | |
for batch in dataloader: | |
optimizer.zero_grad() | |
image = batch["image"][0].to(device) | |
text = batch["text"][0] | |
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() | |
noise = torch.randn_like(latents) | |
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) | |
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) | |
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] | |
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample | |
loss = torch.nn.functional.mse_loss(pred_noise, noise) | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") | |
st.success(f"Latent Diffusion tuned! 🎉 {random.choice(self.jokes)}") | |
logger.info(f"Fine-tuned Latent Diffusion: {self.config.name}") | |
except Exception as e: | |
st.error(f"Tuning failed: {str(e)} 💥 (Latent snag!)") | |
logger.error(f"Tuning failed: {str(e)}") | |
raise | |
return self | |
def save_model(self, path: str): | |
try: | |
with st.spinner("Saving model... 💾 (Packing latent dreams!)"): | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
self.pipeline.save_pretrained(path) | |
st.success(f"Saved at {path}! ✅ Latent titan stashed!") | |
logger.info(f"Saved at {path}") | |
except Exception as e: | |
st.error(f"Save failed: {str(e)} 💥 (Dreamy mishap!)") | |
logger.error(f"Save failed: {str(e)}") | |
raise | |
def generate(self, prompt: str): | |
try: | |
return self.pipeline(prompt, num_inference_steps=30).images[0] | |
except Exception as e: | |
st.error(f"Generation failed: {str(e)} 💥 (Latent oopsie!)") | |
logger.error(f"Generation failed: {str(e)}") | |
raise | |
class FluxDiffusionBuilder: | |
"""Distilled dynamo ⚡ - High-quality pixels in a small package!""" | |
def __init__(self): | |
self.config = None | |
self.pipeline = None | |
self.jokes = ["Flux-tastic! ✨", "Small size, big wow! 🎇"] | |
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): | |
try: | |
with st.spinner(f"Loading {model_path}... ⏳ (Flux titan charging!)"): | |
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) | |
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") | |
if config: | |
self.config = config | |
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}") | |
logger.info(f"Loaded FLUX.1 Distilled: {model_path}") | |
except Exception as e: | |
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Flux fizzle!)") | |
logger.error(f"Failed to load {model_path}: {str(e)}") | |
raise | |
return self | |
def fine_tune_sft(self, images, texts, epochs=3): | |
try: | |
dataset = DiffusionDataset(images, texts) | |
dataloader = DataLoader(dataset, batch_size=1, shuffle=True) | |
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) | |
self.pipeline.unet.train() | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
for epoch in range(epochs): | |
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Flux titan powering up!)"): | |
total_loss = 0 | |
for batch in dataloader: | |
optimizer.zero_grad() | |
image = batch["image"][0].to(device) | |
text = batch["text"][0] | |
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() | |
noise = torch.randn_like(latents) | |
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) | |
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) | |
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] | |
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample | |
loss = torch.nn.functional.mse_loss(pred_noise, noise) | |
loss.backward() | |
optimizer.step() | |
total_loss += loss.item() | |
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") | |
st.success(f"FLUX Diffusion tuned! 🎉 {random.choice(self.jokes)}") | |
logger.info(f"Fine-tuned FLUX.1 Distilled: {self.config.name}") | |
except Exception as e: | |
st.error(f"Tuning failed: {str(e)} 💥 (Flux snag!)") | |
logger.error(f"Tuning failed: {str(e)}") | |
raise | |
return self | |
def save_model(self, path: str): | |
try: | |
with st.spinner("Saving model... 💾 (Packing flux magic!)"): | |
os.makedirs(os.path.dirname(path), exist_ok=True) | |
self.pipeline.save_pretrained(path) | |
st.success(f"Saved at {path}! ✅ Flux titan secured!") | |
logger.info(f"Saved at {path}") | |
except Exception as e: | |
st.error(f"Save failed: {str(e)} 💥 (Fluxy mishap!)") | |
logger.error(f"Save failed: {str(e)}") | |
raise | |
def generate(self, prompt: str): | |
try: | |
return self.pipeline(prompt, num_inference_steps=50).images[0] | |
except Exception as e: | |
st.error(f"Generation failed: {str(e)} 💥 (Flux oopsie!)") | |
logger.error(f"Generation failed: {str(e)}") | |
raise | |
def generate_filename(sequence, ext="png"): | |
"""Time-stamped snapshots ⏰ - Keeps our pics organized with cam flair!""" | |
from datetime import datetime | |
import pytz | |
central = pytz.timezone('US/Central') | |
dt = datetime.now(central) | |
return f"{dt.strftime('%m-%d-%Y-%I-%M-%S-%p')}-{sequence}.{ext}" | |
def get_download_link(file_path, mime_type="text/plain", label="Download"): | |
"""Magic link maker 🔗 - Snag your files with a click!""" | |
try: | |
with open(file_path, 'rb') as f: | |
data = f.read() | |
b64 = base64.b64encode(data).decode() | |
return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label} 📥</a>' | |
except Exception as e: | |
logger.error(f"Failed to generate link for {file_path}: {str(e)}") | |
return f"Error: Could not generate link for {file_path}" | |
def zip_files(files, zip_path): | |
"""Zip zap zoo 🎒 - Bundle up your goodies!""" | |
try: | |
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
for file in files: | |
zipf.write(file, os.path.basename(file)) | |
logger.info(f"Created ZIP file: {zip_path}") | |
except Exception as e: | |
logger.error(f"Failed to create ZIP {zip_path}: {str(e)}") | |
raise | |
def delete_files(files): | |
"""Trash titan 🗑️ - Clear the stage for new stars!""" | |
try: | |
for file in files: | |
os.remove(file) | |
logger.info(f"Deleted file: {file}") | |
st.session_state['captured_images'] = [f for f in st.session_state['captured_images'] if f not in files] | |
except Exception as e: | |
logger.error(f"Failed to delete files: {str(e)}") | |
raise | |
def get_model_files(): | |
"""Model treasure hunt 🗺️ - Find our diffusion gems!""" | |
return [d for d in glob.glob("diffusion_models/*") if os.path.isdir(d)] | |
def get_gallery_files(file_types): | |
"""Gallery curator 🖼️ - Showcase our pixel masterpieces!""" | |
return sorted(list(set(f for ext in file_types for f in glob.glob(f"*.{ext}")))) | |
def update_gallery(): | |
"""Gallery refresh 🌟 - Keep the art flowing!""" | |
media_files = get_gallery_files(["png"]) | |
if media_files: | |
cols = st.sidebar.columns(2) | |
for idx, file in enumerate(media_files[:gallery_size * 2]): | |
with cols[idx % 2]: | |
st.image(Image.open(file), caption=file, use_container_width=True) | |
st.markdown(get_download_link(file, "image/png", "Download Snap 📸"), unsafe_allow_html=True) | |
def get_available_video_devices(): | |
"""Camera roll call 🎥 - Who’s ready to shine? Fallback if OpenCV flops!""" | |
video_devices = [f"Camera {i} 🎥" for i in range(6)] # Default to 6 cams | |
try: | |
detected = [] | |
for i in range(6): # Limit to 6 as per your setup | |
cap = cv2.VideoCapture(i, cv2.CAP_V4L2) | |
if not cap.isOpened(): | |
cap = cv2.VideoCapture(i) | |
if cap.isOpened(): | |
detected.append(f"Camera {i} 🎥") | |
logger.info(f"Detected camera at index {i}") | |
cap.release() | |
if detected: | |
video_devices = detected | |
else: | |
logger.warning("No cameras detected by OpenCV; using defaults") | |
except Exception as e: | |
logger.error(f"Error detecting cameras: {str(e)} - Falling back to defaults") | |
return video_devices | |
st.title("SFT Tiny Titans 🚀 (Small Diffusion Delight!)") | |
st.sidebar.header("Media Gallery 🎨") | |
gallery_size = st.sidebar.slider("Gallery Size 📸", 1, 10, 4, help="How many snaps to flaunt? 🌟") | |
update_gallery() | |
col1, col2 = st.sidebar.columns(2) | |
with col1: | |
if st.button("Download All 📦"): | |
media_files = get_gallery_files(["png"]) | |
if media_files: | |
zip_path = f"snapshot_collection_{int(time.time())}.zip" | |
zip_files(media_files, zip_path) | |
st.sidebar.markdown(get_download_link(zip_path, "application/zip", "Download All Snaps 📦"), unsafe_allow_html=True) | |
st.sidebar.success("Snaps zipped! 🎉 Grab your loot!") | |
else: | |
st.sidebar.warning("No snaps to zip! 📸 Snap some first!") | |
with col2: | |
if st.button("Delete All 🗑️"): | |
media_files = get_gallery_files(["png"]) | |
if media_files: | |
delete_files(media_files) | |
st.sidebar.success("Snaps vanquished! 🧹 Gallery cleared!") | |
update_gallery() | |
else: | |
st.sidebar.warning("Nothing to delete! 📸 Snap some pics!") | |
uploaded_files = st.sidebar.file_uploader("Upload Goodies 🎵🎥🖼️📝📜", type=["mp3", "mp4", "png", "jpeg", "md", "pdf", "docx"], accept_multiple_files=True) | |
if uploaded_files: | |
for uploaded_file in uploaded_files: | |
filename = uploaded_file.name | |
with open(filename, "wb") as f: | |
f.write(uploaded_file.getvalue()) | |
logger.info(f"Uploaded file: {filename}") | |
st.sidebar.subheader("Image Gallery 🖼️") | |
image_files = get_gallery_files(["png", "jpeg"]) | |
if image_files: | |
cols = st.sidebar.columns(2) | |
for idx, file in enumerate(image_files[:gallery_size * 2]): | |
with cols[idx % 2]: | |
st.image(Image.open(file), caption=file, use_container_width=True) | |
st.markdown(get_download_link(file, "image/png" if file.endswith(".png") else "image/jpeg", f"Save Pic 🖼️"), unsafe_allow_html=True) | |
st.sidebar.subheader("Model Management 🗂️") | |
model_dirs = get_model_files() | |
selected_model = st.sidebar.selectbox("Select Saved Model", ["None"] + model_dirs) | |
model_type = st.sidebar.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"]) | |
if selected_model != "None" and st.sidebar.button("Load Model 📂"): | |
builder = { | |
"Micro Diffusion": MicroDiffusionBuilder, | |
"Latent Diffusion": LatentDiffusionBuilder, | |
"FLUX.1 Distilled": FluxDiffusionBuilder | |
}[model_type]() | |
config = DiffusionConfig(name=os.path.basename(selected_model), base_model="unknown", size="small") | |
try: | |
builder.load_model(selected_model, config) | |
st.session_state['cv_builder'] = builder | |
st.session_state['cv_loaded'] = True | |
st.rerun() | |
except Exception as e: | |
st.error(f"Model load failed: {str(e)} 💥 (Check logs for details!)") | |
st.sidebar.subheader("Model Status 🚦") | |
st.sidebar.write(f"**CV Model**: {'Loaded' if st.session_state['cv_loaded'] else 'Not Loaded'} {'(Active)' if st.session_state['cv_loaded'] and isinstance(st.session_state.get('cv_builder'), (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)) else ''}") | |
tabs = ["Build Titan 🌱", "Camera Snap 📷", "Fine-Tune Titan (CV) 🔧", "Test Titan (CV) 🧪", "Agentic RAG Party (CV) 🌐"] | |
tab1, tab2, tab3, tab4, tab5 = st.tabs(tabs) | |
for i, tab in enumerate(tabs): | |
if st.session_state['active_tab'] != tab and st.session_state.get(f'tab{i}_active', False): | |
logger.info(f"Switched to tab: {tab}") | |
st.session_state['active_tab'] = tab | |
st.session_state[f'tab{i}_active'] = (st.session_state['active_tab'] == tab) | |
with tab1: | |
st.header("Build Titan 🌱") | |
model_type = st.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"], key="build_type") | |
base_model = st.selectbox("Select Tiny Model", | |
["CompVis/ldm-text2im-large-256" if model_type == "Micro Diffusion" else "runwayml/stable-diffusion-v1-5" if model_type == "Latent Diffusion" else "black-forest-labs/flux.1-distilled"]) | |
model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}") | |
if st.button("Download Model ⬇️"): | |
config = DiffusionConfig(name=model_name, base_model=base_model, size="small") | |
builder = { | |
"Micro Diffusion": MicroDiffusionBuilder, | |
"Latent Diffusion": LatentDiffusionBuilder, | |
"FLUX.1 Distilled": FluxDiffusionBuilder | |
}[model_type]() | |
try: | |
builder.load_model(base_model, config) | |
builder.save_model(config.model_path) | |
st.session_state['cv_builder'] = builder | |
st.session_state['cv_loaded'] = True | |
st.rerun() | |
except Exception as e: | |
st.error(f"Model build failed: {str(e)} 💥 (Check logs for details!)") | |
with tab2: | |
st.header("Camera Snap 📷 (Dual Capture Fiesta!)") | |
video_devices = get_available_video_devices() | |
st.write(f"🎉 Detected Cameras: {', '.join(video_devices)}") | |
st.info("Switch cams in your browser settings (e.g., Chrome > Privacy > Camera) since I’m a browser star! 🌟") | |
st.subheader("Camera 0 🎬 - Lights, Camera, Action!") | |
cam0_cols = st.columns(4) | |
with cam0_cols[0]: | |
cam0_device = st.selectbox("Cam 📷", video_devices, index=0, key="cam0_device", help="Pick your star cam! 🌟") | |
with cam0_cols[1]: | |
cam0_label = st.text_input("Tag 🏷️", "Cam 0 Snap", key="cam0_label", help="Name your masterpiece! 🎨") | |
with cam0_cols[2]: | |
cam0_help = st.text_input("Hint 💡", "Snap a heroic moment! 🦸♂️", key="cam0_help", help="Give a fun tip!") | |
with cam0_cols[3]: | |
cam0_vis = st.selectbox("Show 🖼️", ["visible", "hidden", "collapsed"], index=0, key="cam0_vis", help="Label vibes: Visible, Sneaky, or Gone!") | |
st.subheader("Camera 1 🎥 - Roll the Film!") | |
cam1_cols = st.columns(4) | |
with cam1_cols[0]: | |
cam1_device = st.selectbox("Cam 📷", video_devices, index=1 if len(video_devices) > 1 else 0, key="cam1_device", help="Choose your blockbuster cam! 🎬") | |
with cam1_cols[1]: | |
cam1_label = st.text_input("Tag 🏷️", "Cam 1 Snap", key="cam1_label", help="Title your epic shot! 🌠") | |
with cam1_cols[2]: | |
cam1_help = st.text_input("Hint 💡", "Grab an epic frame! 🌟", key="cam1_help", help="Drop a cheeky hint!") | |
with cam1_cols[3]: | |
cam1_vis = st.selectbox("Show 🖼️", ["visible", "hidden", "collapsed"], index=0, key="cam1_vis", help="Label style: Show it, Hide it, Poof!") | |
cols = st.columns(2) | |
with cols[0]: | |
st.subheader(f"Camera 0 ({cam0_device}) 🎬") | |
cam0_img = st.camera_input( | |
label=cam0_label, | |
key="cam0", | |
help=cam0_help, | |
disabled=False, | |
label_visibility=cam0_vis | |
) | |
if cam0_img: | |
filename = generate_filename("cam0") | |
with open(filename, "wb") as f: | |
f.write(cam0_img.getvalue()) | |
st.image(Image.open(filename), caption=filename, use_container_width=True) | |
logger.info(f"Saved snapshot from Camera 0: {filename}") | |
st.session_state['captured_images'].append(filename) | |
update_gallery() | |
st.info("🚨 One snap at a time—your Titan’s too cool for bursts! 😎") | |
with cols[1]: | |
st.subheader(f"Camera 1 ({cam1_device}) 🎥") | |
cam1_img = st.camera_input( | |
label=cam1_label, | |
key="cam1", | |
help=cam1_help, | |
disabled=False, | |
label_visibility=cam1_vis | |
) | |
if cam1_img: | |
filename = generate_filename("cam1") | |
with open(filename, "wb") as f: | |
f.write(cam1_img.getvalue()) | |
st.image(Image.open(filename), caption=filename, use_container_width=True) | |
logger.info(f"Saved snapshot from Camera 1: {filename}") | |
st.session_state['captured_images'].append(filename) | |
update_gallery() | |
st.info("🚨 Single shots only—craft your masterpiece! 🎨") | |
with tab3: | |
st.header("Fine-Tune Titan (CV) 🔧 (Sculpt Your Pixel Prodigy!)") | |
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): | |
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no canvas!)") | |
else: | |
captured_images = get_gallery_files(["png"]) | |
if len(captured_images) >= 2: | |
st.subheader("Use Case 1: Denoise Snapshots 🌟") | |
denoising_data = [{"image": img, "text": f"Denoised {os.path.basename(img).split('-')[4]} snap"} for img in captured_images[:min(len(captured_images), 10)]] | |
denoising_edited = st.data_editor(pd.DataFrame(denoising_data), num_rows="dynamic", help="Craft denoising pairs! 🌟") | |
if st.button("Fine-Tune Denoising 🔄"): | |
images = [Image.open(row["image"]) for _, row in denoising_edited.iterrows()] | |
texts = [row["text"] for _, row in denoising_edited.iterrows()] | |
new_model_name = f"{st.session_state['cv_builder'].config.name}-denoise-{int(time.time())}" | |
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") | |
st.session_state['cv_builder'].config = new_config | |
with st.status("Fine-tuning for denoising... ⏳ (Polishing pixels!)", expanded=True) as status: | |
st.session_state['cv_builder'].fine_tune_sft(images, texts) | |
st.session_state['cv_builder'].save_model(new_config.model_path) | |
status.update(label="Denoising tuned! 🎉 (Pixel shine unleashed!)", state="complete") | |
zip_path = f"{new_config.model_path}.zip" | |
zip_files([new_config.model_path], zip_path) | |
st.markdown(get_download_link(zip_path, "application/zip", "Download Denoised Titan 📦"), unsafe_allow_html=True) | |
denoising_csv = f"denoise_dataset_{int(time.time())}.csv" | |
with open(denoising_csv, "w", newline="") as f: | |
writer = csv.writer(f) | |
writer.writerow(["image", "text"]) | |
for _, row in denoising_edited.iterrows(): | |
writer.writerow([row["image"], row["text"]]) | |
st.markdown(get_download_link(denoising_csv, "text/csv", "Download Denoising CSV 📜"), unsafe_allow_html=True) | |
st.subheader("Use Case 2: Stylize Snapshots 🎨") | |
stylize_data = [{"image": img, "text": f"Neon {os.path.basename(img).split('-')[4]} style"} for img in captured_images[:min(len(captured_images), 10)]] | |
stylize_edited = st.data_editor(pd.DataFrame(stylize_data), num_rows="dynamic", help="Craft stylized pairs! 🎨") | |
if st.button("Fine-Tune Stylization 🔄"): | |
images = [Image.open(row["image"]) for _, row in stylize_edited.iterrows()] | |
texts = [row["text"] for _, row in stylize_edited.iterrows()] | |
new_model_name = f"{st.session_state['cv_builder'].config.name}-stylize-{int(time.time())}" | |
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") | |
st.session_state['cv_builder'].config = new_config | |
with st.status("Fine-tuning for stylization... ⏳ (Painting pixels!)", expanded=True) as status: | |
st.session_state['cv_builder'].fine_tune_sft(images, texts) | |
st.session_state['cv_builder'].save_model(new_config.model_path) | |
status.update(label="Stylization tuned! 🎉 (Pixel art unleashed!)", state="complete") | |
zip_path = f"{new_config.model_path}.zip" | |
zip_files([new_config.model_path], zip_path) | |
st.markdown(get_download_link(zip_path, "application/zip", "Download Stylized Titan 📦"), unsafe_allow_html=True) | |
stylize_md = f"stylize_dataset_{int(time.time())}.md" | |
with open(stylize_md, "w") as f: | |
f.write("# Stylization Dataset\n\n") | |
for _, row in stylize_edited.iterrows(): | |
f.write(f"- `{row['image']}`: {row['text']}\n") | |
st.markdown(get_download_link(stylize_md, "text/markdown", "Download Stylization MD 📝"), unsafe_allow_html=True) | |
st.subheader("Use Case 3: Multi-Angle Snapshots 🌐") | |
multiangle_data = [{"image": img, "text": f"View from {os.path.basename(img).split('-')[4]}"} for img in captured_images[:min(len(captured_images), 10)]] | |
multiangle_edited = st.data_editor(pd.DataFrame(multiangle_data), num_rows="dynamic", help="Craft multi-angle pairs! 🌐") | |
if st.button("Fine-Tune Multi-Angle 🔄"): | |
images = [Image.open(row["image"]) for _, row in multiangle_edited.iterrows()] | |
texts = [row["text"] for _, row in multiangle_edited.iterrows()] | |
new_model_name = f"{st.session_state['cv_builder'].config.name}-multiangle-{int(time.time())}" | |
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") | |
st.session_state['cv_builder'].config = new_config | |
with st.status("Fine-tuning for multi-angle... ⏳ (Spinning pixels!)", expanded=True) as status: | |
st.session_state['cv_builder'].fine_tune_sft(images, texts) | |
st.session_state['cv_builder'].save_model(new_config.model_path) | |
status.update(label="Multi-angle tuned! 🎉 (Pixel views unleashed!)", state="complete") | |
zip_path = f"{new_config.model_path}.zip" | |
zip_files([new_config.model_path], zip_path) | |
st.markdown(get_download_link(zip_path, "application/zip", "Download Multi-Angle Titan 📦"), unsafe_allow_html=True) | |
multiangle_csv = f"multiangle_dataset_{int(time.time())}.csv" | |
with open(multiangle_csv, "w", newline="") as f: | |
writer = csv.writer(f) | |
writer.writerow(["image", "text"]) | |
for _, row in multiangle_edited.iterrows(): | |
writer.writerow([row["image"], row["text"]]) | |
st.markdown(get_download_link(multiangle_csv, "text/csv", "Download Multi-Angle CSV 📜"), unsafe_allow_html=True) | |
with tab4: | |
st.header("Test Titan (CV) 🧪 (Unleash Your Pixel Power!)") | |
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): | |
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no masterpiece!)") | |
else: | |
st.subheader("Test Your Titan 🎨") | |
test_prompt = st.text_area("Prompt 🎤", "Neon glow from cam0", help="Dream up a wild image—your Titan’s ready to paint! 🖌️") | |
if st.button("Generate ▶️"): | |
with st.spinner("Crafting your masterpiece... ⏳ (Titan’s mixing pixels!)"): | |
image = st.session_state['cv_builder'].generate(test_prompt) | |
st.image(image, caption=f"Generated: {test_prompt}", use_container_width=True) | |
with tab5: | |
st.header("Agentic RAG Party (CV) 🌐 (Pixel Party Extravaganza!)") | |
st.write("Generate superhero party vibes from your tuned Titan! 🎉") | |
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): | |
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no party!)") | |
else: | |
if st.button("Run RAG Demo 🎉"): | |
with st.spinner("Loading your pixel party titan... ⏳ (Titan’s grabbing its brush!)"): | |
class CVPartyAgent: | |
def __init__(self, pipeline): | |
self.pipeline = pipeline | |
def generate(self, prompt: str) -> Image.Image: | |
return self.pipeline(prompt, num_inference_steps=50).images[0] | |
def plan_party(self): | |
prompts = [ | |
"Gold-plated Batman statue from cam0", | |
"VR superhero battle scene from cam1", | |
"Neon-lit Avengers tower from cam2" | |
] | |
data = [{"Theme": f"Scene {i+1}", "Image Idea": prompt} for i, prompt in enumerate(prompts)] | |
return pd.DataFrame(data) | |
agent = CVPartyAgent(st.session_state['cv_builder'].pipeline) | |
st.write("Party agent ready! 🎨 (Time to paint an epic bash!)") | |
with st.spinner("Crafting superhero party visuals... ⏳ (Pixels assemble!)"): | |
try: | |
plan_df = agent.plan_party() | |
st.dataframe(plan_df) | |
for _, row in plan_df.iterrows(): | |
image = agent.generate(row["Image Idea"]) | |
st.image(image, caption=f"{row['Theme']} - {row['Image Idea']}", use_container_width=True) | |
except Exception as e: | |
st.error(f"Party crashed: {str(e)} 💥 (Pixel oopsie!)") | |
logger.error(f"RAG demo failed: {str(e)}") | |
st.sidebar.subheader("Action Logs 📜") | |
log_container = st.sidebar.empty() | |
with log_container: | |
for record in log_records: | |
st.write(f"{record.asctime} - {record.levelname} - {record.message}") | |
update_gallery() |