Spaces:
Running
Running
import gradio as gr | |
from PIL import Image | |
import os | |
import zipfile | |
import pandas as pd | |
import requests | |
import tempfile | |
import shutil | |
from io import BytesIO | |
import threading | |
from datetime import datetime | |
from concurrent.futures import ThreadPoolExecutor | |
import time | |
import random | |
import cloudscraper | |
from urllib.parse import urlparse | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
stop_event = threading.Event() | |
cpu_cores = os.cpu_count() or 4 | |
USER_AGENTS = [ | |
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)", | |
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)", | |
"Mozilla/5.0 (Linux; Android 10)", | |
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X)" | |
] | |
PROXY_LIST = [ | |
"http://user:pass@proxy1:port", | |
"http://user:pass@proxy2:port", | |
] | |
FAILED_LOG_PATH = "failed_downloads.txt" | |
BLOCKED_SITES = ["bigw.com.au"] | |
def random_headers(referer): | |
"""Return a rotating UserβAgent + referer header dict.""" | |
return { | |
"User-Agent": random.choice(USER_AGENTS), | |
"Referer": referer | |
} | |
def try_download(url, headers, proxies): | |
""" | |
Attempt a simple GET through a proxy dict {"http":..., "https":...}. | |
Returns the response if status_code==200 and content is image, else None. | |
""" | |
try: | |
resp = session.get(url, headers=headers, proxies=proxies, stream=True, timeout=15) | |
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): | |
return resp | |
except Exception as e: | |
print(f"β οΈ Proxy download error ({proxies}): {e}") | |
return None | |
session = requests.Session() | |
# --- Utility Functions --- | |
def process_url_images(data, fmt, w, h): | |
return process_and_zip(data, fmt, w, h) | |
def process_uploaded_images(files, fmt, w, h): | |
tmp = tempfile.mkdtemp() | |
data = [] | |
for f in files: | |
src = str(f) | |
dst = os.path.join(tmp, os.path.basename(src)) | |
shutil.copyfile(src, dst) | |
data.append({"url": dst, "name": os.path.splitext(os.path.basename(dst))[0]}) | |
return process_and_zip(data, fmt, w, h) | |
def process_single_url_image(url, fmt, w, h): | |
if not url.strip(): | |
return [], None, "No URL provided", None | |
return process_and_zip([{"url": url.strip(), "name": "single"}], fmt, w, h) | |
handle_process = lambda mode, sd, ups, pu, fmt, w, h: ( | |
process_url_images(sd, fmt, w, h) if mode.startswith("π") and sd else | |
process_uploaded_images(ups, fmt, w, h) if mode.startswith("π€") and ups else | |
process_single_url_image(pu, fmt, w, h) if pu.strip() else | |
([], None, "β οΈ No valid input provided", None) | |
) | |
from PIL import ImageFile | |
ImageFile.LOAD_TRUNCATED_IMAGES = True | |
def fetch_with_retries(url, headers, timeout=15, retries=2, backoff=1): | |
""" | |
Try up to retries+1 times to GET url with timeout seconds. | |
Returns a requests.Response or None. | |
""" | |
for attempt in range(retries + 1): | |
try: | |
resp = session.get(url, headers=headers, stream=True, timeout=timeout) | |
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): | |
return resp | |
except Exception as e: | |
print(f"β οΈ Attempt {attempt+1} failed for {url}: {e}") | |
time.sleep(backoff * (attempt + 1)) | |
return None | |
def download_image(url, save_path): | |
# Skip cached | |
if os.path.exists(save_path) and os.path.getsize(save_path) > 1000: | |
return save_path | |
# Skip blocked | |
if any(b in url for b in BLOCKED_SITES): | |
log_failure(url, "Skipped known slow site") | |
return None | |
referer = f"{urlparse(url).scheme}://{urlparse(url).netloc}/" | |
headers = random_headers(referer) | |
# 1) Direct with retries | |
print(f"π Fetching with retries: {url}") | |
resp = fetch_with_retries(url, headers, timeout=15, retries=2) | |
if resp: | |
with open(save_path, 'wb') as f: | |
for chunk in resp.iter_content(8192): | |
f.write(chunk) | |
print("β Direct fetch succeeded") | |
return save_path | |
else: | |
print("β Direct fetch failed after retries") | |
# 2) cloudscraper fallback | |
print("π cloudscraper fallback...") | |
try: | |
scraper = cloudscraper.create_scraper(sess=session) | |
resp = scraper.get(url, headers=headers, stream=True, timeout=15) | |
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): | |
with open(save_path, 'wb') as f: | |
for chunk in resp.iter_content(8192): | |
f.write(chunk) | |
print("β cloudscraper worked") | |
return save_path | |
except Exception as e: | |
print(f"β cloudscraper failed: {e}") | |
# 3) Selenium fallback | |
try: | |
print("π§ͺ Headless browser fallback...") | |
chrome_opts = Options() | |
chrome_opts.add_argument("--headless") | |
chrome_opts.add_argument("--disable-gpu") | |
chrome_opts.add_argument("--no-sandbox") | |
chrome_opts.add_argument("--disable-dev-shm-usage") | |
chrome_opts.add_argument(f"user-agent={headers['User-Agent']}") | |
driver = webdriver.Chrome( | |
service=Service(ChromeDriverManager().install()), | |
options=chrome_opts | |
) | |
driver.get(url) | |
time.sleep(4) | |
final_url = driver.current_url | |
cookies = {c['name']: c['value'] for c in driver.get_cookies()} | |
driver.quit() | |
with session: | |
for k, v in cookies.items(): | |
session.cookies.set(k, v) | |
r = session.get(final_url, headers=headers, stream=True, timeout=15) | |
if r.status_code == 200 and "image" in r.headers.get("Content-Type", ""): | |
with open(save_path, 'wb') as f: | |
for chunk in r.iter_content(8192): | |
f.write(chunk) | |
print("β Selenium + cookies worked") | |
return save_path | |
except Exception as e: | |
print(f"β Selenium failed: {e}") | |
# 4) Give up | |
log_failure(url, "All methods failed") | |
return None | |
lamp_html = """ | |
<style> | |
* { | |
margin: 0; | |
padding: 0; | |
box-sizing: border-box; | |
} | |
body { | |
background-color: #D8D2C2; | |
transition: background 0.4s ease; | |
} | |
body.night { | |
background: radial-gradient(#555, #111); | |
} | |
.lamp-fixed { | |
position: fixed; | |
top: 0; | |
right: 0; | |
width: 100px; | |
height: 100vh; | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
z-index: 9999; | |
pointer-events: none; | |
} | |
.mount { | |
width: 30px; | |
height: 5px; | |
background: #111; | |
border-radius: 6px; | |
pointer-events: none; | |
} | |
.wire { | |
width: 4px; | |
height: 120px; | |
background: #000; | |
margin: 0; | |
pointer-events: none; | |
} | |
.blub { | |
position: relative; | |
width: 47px; | |
height: 47px; | |
background: #444; | |
border-radius: 50%; | |
cursor: pointer; | |
margin-top: -1px; | |
transition: all 0.3s ease; | |
z-index: 2; | |
pointer-events: auto; | |
} | |
.blub.night { | |
background: #ffec99; | |
box-shadow: 0 0 28px #ffec99, 0 0 56px #ffec99, | |
0 0 84px #ffec99, 0 0 168px #ffec99; | |
} | |
.blub.night::before { | |
background: #ffec99; | |
} | |
.blub.night::after { | |
content: ""; | |
position: absolute; | |
top: 50%; | |
left: 50%; | |
transform: translate(-60%, -50%); | |
width: 78px; | |
height: 78px; | |
background: #ffec99; | |
border-radius: 100%; | |
filter: blur(26px); | |
z-index: -1; | |
} | |
.blub::before { | |
content: ""; | |
position: absolute; | |
top: -30px; | |
left: 11px; | |
width: 25px; | |
height: 47px; | |
background: #444; | |
border-top: 19px solid #000; | |
border-radius: 8px; | |
transition: background 0.3s ease; | |
} | |
.blub span:nth-child(1) { | |
position: absolute; | |
top: -10px; | |
left: -2px; | |
width: 20px; | |
height: 18px; | |
background: transparent; | |
transform: rotate(343deg); | |
border-bottom-right-radius: 40px; | |
box-shadow: 12px 12px 0 5px #444; | |
transition: box-shadow 0.3s ease; | |
} | |
.blub.night span:nth-child(1) { | |
box-shadow: 12px 12px 0 5px #ffec99; | |
} | |
.blub span:nth-child(2) { | |
position: absolute; | |
top: -10px; | |
right: -2px; | |
width: 20px; | |
height: 18px; | |
background: transparent; | |
transform: rotate(18deg); | |
border-bottom-left-radius: 50px; | |
box-shadow: -12px 12px 0 5px #444; | |
transition: box-shadow 0.3s ease; | |
} | |
.blub.night span:nth-child(2) { | |
box-shadow: -12px 12px 0 5px #ffec99; | |
} | |
</style> | |
<div class="lamp-fixed"> | |
<div class="mount"></div> | |
<div class="wire"></div> | |
<div class="blub" onclick="this.classList.toggle('night'); document.body.classList.toggle('night')"> | |
<span></span> | |
<span></span> | |
</div> | |
</div> | |
""" | |
def resize_with_padding(img, target_w, target_h, fill=(255,255,255)): | |
img.thumbnail((target_w, target_h), Image.LANCZOS) | |
bg = Image.new("RGB", (target_w, target_h), fill) | |
x = (target_w - img.width) // 2 | |
y = (target_h - img.height) // 2 | |
bg.paste(img, (x, y)) | |
return bg | |
def threaded_download_and_open(item, temp_dir): | |
from PIL import ImageSequence | |
name, src = item['name'], item['url'] | |
try: | |
if os.path.exists(src): | |
path = src | |
else: | |
temp_path = os.path.join(temp_dir, f"{name}.gif") | |
path = download_image(src, temp_path) | |
if not path: | |
return (name, None, 'Download failed or invalid image') | |
img = Image.open(path) | |
# Rewind to first frame in case it's not | |
if getattr(img, "is_animated", False): | |
img.seek(0) | |
frames = [] | |
for frame in ImageSequence.Iterator(img): | |
frame_copy = frame.convert("RGBA").copy() | |
frames.append(frame_copy) | |
return (name, frames, None) | |
else: | |
return (name, [img.convert("RGBA")], None) | |
except Exception as e: | |
return (name, None, str(e)) | |
def resize_and_save_worker(args): | |
name, img, fmt, w, h, out_dir = args | |
try: | |
# 1) Composite RGBA over white | |
img = img.convert('RGBA') | |
bg = Image.new('RGBA', img.size, (255,255,255,255)) | |
img = Image.alpha_composite(bg, img) | |
# 2) Determine save format & extension | |
fmt_upper = fmt.upper() | |
save_fmt = "JPEG" if fmt_upper in ("JPEG", "JPG") else fmt_upper | |
ext = "jpg" if fmt_upper in ("JPEG", "JPG") else fmt.lower() | |
# 3) Convert to RGB if needed | |
if save_fmt == "JPEG": | |
img = img.convert("RGB") | |
# 4) Resize with padding | |
img = resize_with_padding(img, w, h) | |
# 5) Build filename & save | |
out_name = f"{name}.{ext}" | |
out_path = os.path.join(out_dir, out_name) | |
img.save(out_path, format=save_fmt, quality=90) | |
return out_path, None | |
except Exception as e: | |
return None, f"{name}: {e}" | |
from multiprocessing import cpu_count | |
import zipfile | |
def process_and_zip(items, fmt, w, h): | |
tmp = tempfile.mkdtemp() | |
proc = os.path.join(tmp, 'out') | |
os.makedirs(proc, exist_ok=True) | |
files, fails = [], [] | |
# 1) Download & open in parallel | |
workers = cpu_count() * 2 | |
with ThreadPoolExecutor(max_workers=workers) as ex: | |
results = list(ex.map(lambda it: threaded_download_and_open(it, tmp), items)) | |
# 2) Process each downloaded image/list of frames | |
for name, imgs, err in results: | |
if err or not imgs: | |
fails.append(f"{name}: {err}") | |
continue | |
for i, img in enumerate(imgs): | |
try: | |
# Composite RGBA over white | |
img = img.convert("RGBA") | |
bg = Image.new("RGBA", img.size, (255,255,255,255)) | |
img = Image.alpha_composite(bg, img) | |
# Determine save format & extension | |
fmt_upper = fmt.upper() | |
save_fmt = "JPEG" if fmt_upper in ("JPEG","JPG") else fmt_upper | |
ext = "jpg" if fmt_upper in ("JPEG","JPG") else fmt.lower() | |
# Convert to RGB if JPEG | |
if save_fmt == "JPEG": | |
img = img.convert("RGB") | |
# Resize with padding | |
img = resize_with_padding(img, w, h) | |
# Build filename | |
fname = ( | |
f"{name}_frame{i+1}.{ext}" | |
if len(imgs) > 1 | |
else f"{name}.{ext}" | |
) | |
out_path = os.path.join(proc, fname) | |
# Save & record | |
img.save(out_path, format=save_fmt, quality=90) | |
files.append(out_path) | |
except Exception as e: | |
print(f"β οΈ Save failed for {name}: {e}") | |
fails.append(f"{name}_frame{i+1}: {e}") | |
# 3) If nothing succeeded, clean up and report | |
if not files: | |
stop_event.clear() | |
shutil.rmtree(tmp) | |
return None, None, "No images processed.", None | |
# 4) Create ZIP in βstoreβ mode (no compression) | |
date_str = datetime.now().strftime("%Y-%m-%d") | |
zip_name = f"{date_str}.zip" | |
zip_path = os.path.join(tmp, zip_name) | |
with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED) as zf: | |
for f in files: | |
zf.write(f, os.path.basename(f)) | |
# 5) Build status message | |
msg_lines = [f"β Processed {len(files)} image(s)."] | |
if fails: | |
msg_lines.append(f"β Failed: {len(fails)} image(s)") | |
msg_lines += [f" - {fail}" for fail in fails] | |
stop_event.clear() | |
return files, zip_path, "\n".join(msg_lines), tmp | |
def read_uploaded_workbook(file): | |
if not file: | |
return [], "β No file uploaded" | |
try: | |
# read all sheets except "Cleared Data" | |
xls = pd.ExcelFile(file.name) | |
sheets = [s for s in xls.sheet_names if s.lower() != "cleared data"] | |
df_list = [pd.read_excel(file.name, sheet_name=s, engine="openpyxl") for s in sheets] | |
df = pd.concat(df_list, ignore_index=True) | |
df.columns = [c.strip() for c in df.columns] | |
item_col = next((c for c in df.columns if c.lower() == 'itemcode'), None) | |
if not item_col: | |
return [], "β Missing 'ItemCode' column" | |
url_cols = [c for c in df.columns if any(k in c.lower() for k in ["url", "image", "link"])] | |
data = [] | |
for _, row in df.iterrows(): | |
raw = row[item_col] | |
if pd.isna(raw): | |
continue | |
key = str(raw).strip().split('.')[0] if str(raw).strip().replace('.', '', 1).isdigit() else str(raw).strip() | |
idx = 0 | |
for col in url_cols: | |
if pd.notna(row[col]): | |
name = f"{key}" if idx == 0 else f"{key}_{idx}" | |
data.append({"url": str(row[col]).strip(), "name": name}) | |
idx += 1 | |
return data, f"β Fetched {len(data)} image link(s)" | |
except Exception as e: | |
return [], f"β Error: {e}" | |
def clear_all(tmp_dir): | |
# wipe out the temp folder if it exists | |
if tmp_dir and os.path.exists(tmp_dir): | |
shutil.rmtree(tmp_dir) | |
return ( | |
None, # clear the singleβfile upload | |
[], # reset the image_data_state | |
[], # clear the multiβfile upload | |
gr.update(visible=False, value=None), # hide & clear the hidden ZIP file | |
[], # clear the βindividual downloadsβ list | |
"Cleared.", # status message | |
None, # reset temp_dir_state | |
"" # clear the URL textbox | |
) | |
# === CSS === | |
css = """ | |
body { | |
margin: 0; | |
background: #4B352A; | |
font-family: 'Segoe UI', sans-serif; | |
color: #222; | |
display: flex; | |
justify-content: center; | |
padding: 2rem; | |
transition: background 0.3s, color 0.3s; | |
} | |
.gradio-container { | |
max-width: 1200px; /* You can reduce this (e.g., 600px) */ | |
width: 100%; | |
margin: 0 auto; | |
h1 { | |
font-size: 24px !important; | |
font-weight: 700 !important; | |
margin-bottom: 1rem !important; | |
color: #FFFFFF !important; | |
text-align: center !important; | |
} | |
.panel { | |
background: white; | |
padding: 1rem; | |
border-radius: 6px; | |
border: 1px solid #ddd; | |
box-shadow: 0 1px 2px rgba(0,0,0,0.05); | |
} | |
#clear-btn, #stop-btn { | |
margin-top: 1rem; | |
font-size: 0.85rem !important; | |
font-weight: 600 !important; | |
padding: 0.5rem 1rem !important; | |
border-radius: 8px !important; | |
border: none !important; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important; | |
transition: background-color 0.2s ease !important; | |
width: 100% !important; | |
} | |
.btn-row { | |
display: flex; | |
justify-content: space-between; | |
gap: 0.5rem; | |
} | |
#clear-btn { | |
background-color: #e74c3c !important; | |
color: white !important; | |
width: 50% !important; | |
} | |
#clear-btn:hover { | |
background-color: #c0392b !important; | |
cursor: pointer !important; | |
} | |
#stop-btn { | |
background-color: #CA7842 !important; | |
color: white !important; | |
width: 50% !important; | |
} | |
#stop-btn:hover { | |
background-color: #FFA55D !important; | |
cursor: pointer !important; | |
} | |
.gradio-container .gr-row > div { | |
padding: 4px !important; | |
} | |
#process-btn-url { | |
background-color: #A4B465 !important; | |
color: #333 !important; | |
} | |
#fetch-btn { | |
background-color: #cfbed7 !important; | |
color: #333 !important; | |
} | |
#status-box { | |
background-color: #ffffff; | |
border: 2px solid #cfbed7; | |
border-radius: 6px; | |
padding: 1rem; | |
font-size: 0.95rem; | |
color: #333; | |
/* Styling the toggle radio buttons */ | |
.gradio-container label.svelte-1ipelgc { | |
font-weight: 600 !important; | |
border: 2px solid #A4B465 !important; | |
padding: 0.4rem 1rem !important; | |
border-radius: 12px !important; | |
margin: 0.25rem !important; | |
background-color: #f5fbe8 !important; | |
transition: all 0.3s ease-in-out; | |
cursor: pointer; | |
} | |
/* Selected option */ | |
input[type="radio"]:checked + label.svelte-1ipelgc { | |
background-color: #A4B465 !important; | |
color: white !important; | |
border-color: #889E46 !important; | |
} | |
/* Unselected hover */ | |
.gradio-container input[type="radio"] + label.svelte-1ipelgc:hover { | |
background-color: #e0f1c4 !important; | |
} | |
} | |
""" | |
gr.HTML(""" | |
<div class="wrapper"> | |
<div class="toggle-icon-group"> | |
<input type="radio" id="spreadsheet" name="mode" checked> | |
<label for="spreadsheet"><span>π</span> Spreadsheet</label> | |
<input type="radio" id="upload" name="mode"> | |
<label for="upload"><span>π</span> Upload</label> | |
</div> | |
</div> | |
""") | |
def process_all(mode, workbook_file, ups, pu, fmt, w, h): | |
# 1οΈβ£ If workbook mode, read links from Excel | |
if mode.startswith("π"): | |
data, msg = read_uploaded_workbook(workbook_file) | |
print("π‘ DEBUG β links extracted:", data) # <<< DEBUG | |
if not data: | |
# no links or error reading | |
return [], None, msg, None | |
# we have a list of {"url":β¦, "name":β¦} | |
files, zip_path, proc_msg, tmp = process_and_zip(data, fmt, w, h) | |
return files, zip_path, f"{msg}\n{proc_msg}", tmp | |
# 2οΈβ£ If βUpload Imagesβ mode | |
if mode.startswith("π€") and ups: | |
return process_uploaded_images(ups, fmt, w, h) | |
# 3οΈβ£ If singleβURL mode | |
if pu and pu.strip(): | |
return process_single_url_image(pu, fmt, w, h) | |
# 4οΈβ£ Nothing valid | |
return [], None, "β οΈ No valid input provided", None | |
if os.path.exists(FAILED_LOG_PATH): | |
with open(FAILED_LOG_PATH) as f: | |
failed = [line.split("\t")[1] for line in f if line.strip()] | |
if failed: | |
print(f"π Retrying {len(failed)} failed URLsβ¦") | |
# reuse your ThreadPoolExecutor to download them to a βretriesβ folder | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
os.makedirs("retries", exist_ok=True) | |
def retry_one(url): | |
fname = os.path.join("retries", url.split("/")[-1].split("?")[0]) | |
r = fetch_with_retries(url, random_headers(f"{urlparse(url).scheme}://{urlparse(url).netloc}/"), timeout=15, retries=2) | |
if r: | |
with open(fname, 'wb') as f: | |
for c in r.iter_content(8192): f.write(c) | |
return True | |
return False | |
with ThreadPoolExecutor(max_workers=8) as ex: | |
results = list(ex.map(retry_one, failed)) | |
print(f"β Retried successes: {sum(results)} / {len(failed)}") | |
with gr.Blocks(css=css) as demo: | |
# βββ States βββ | |
image_data_state = gr.State([]) | |
temp_dir_state = gr.State(None) | |
# βββ Header βββ | |
gr.HTML(lamp_html) | |
with gr.Row(): | |
gr.Markdown("<h1>πΌοΈ Image Processor</h1>") | |
# βββ Main layout βββ | |
with gr.Row(): | |
# ββ LEFT PANEL ββ | |
with gr.Column(scale=3, min_width=500): | |
with gr.Group(elem_classes="panel"): | |
mode_toggle = gr.Radio( | |
["π Upload Workbook", "π€ Upload Images"], | |
value="π Upload Workbook", | |
label="Select Input Method" | |
) | |
workbook_upload = gr.File( | |
label="π Upload .xlsx/.xlsm Workbook", | |
file_types=['.xlsx', '.xlsm'], | |
visible=True | |
) | |
upload_box = gr.File( | |
label="π Upload Images", | |
file_count="multiple", | |
visible=False | |
) | |
image_url_input = gr.Textbox( | |
label="π Paste Image URL", | |
visible=False | |
) | |
process_btn = gr.Button("βοΈ Process", elem_id="process-btn-url") | |
with gr.Group(elem_classes="panel"): | |
with gr.Row(): | |
format_choice = gr.Dropdown( | |
["JPG","PNG","WEBP","TIFF","GIF","JFIF","AVIF"], | |
label="πΌοΈ Format", | |
value="JPG" | |
) | |
width = gr.Number(label="Width (px)", value=1000, precision=0) | |
height = gr.Number(label="Height (px)", value=1000, precision=0) | |
# ββ RIGHT PANEL ββ | |
with gr.Column(scale=2, min_width=400): | |
with gr.Group(elem_classes="panel"): | |
status = gr.Textbox( | |
label="π£ Status", | |
lines=6, | |
interactive=False, | |
elem_id="status-box" | |
) | |
zip_download_btn = gr.Button("π¦ Download ZIP") | |
zip_file_hidden = gr.File(visible=False) | |
with gr.Accordion("π§· Individual Files", open=False): | |
single_downloads = gr.File(label="Files", file_count="multiple") | |
with gr.Row(elem_classes="btn-row"): | |
stop_btn = gr.Button("Stop", elem_id="stop-btn") | |
clear_btn = gr.Button("Clear", elem_id="clear-btn") | |
# βββ Footer βββ | |
gr.HTML( | |
"<div style='width:100%; text-align:center; margin-top:1rem; color:white;'>" | |
"Created with π by Vishakha" | |
"</div>" | |
) | |
# βββ EVENT HOOKS βββ | |
mode_toggle.change( | |
fn=lambda choice: ( | |
gr.update(visible=choice.startswith("π")), | |
gr.update(visible=not choice.startswith("π")), | |
gr.update(visible=not choice.startswith("π")) | |
), | |
inputs=[mode_toggle], | |
outputs=[workbook_upload, upload_box, image_url_input] | |
) | |
workbook_upload.change( | |
fn=read_uploaded_workbook, | |
inputs=[workbook_upload], | |
outputs=[image_data_state, status] | |
) | |
upload_box.change( | |
fn=lambda files: f"{len(files)} files ready." if files else "No files selected", | |
inputs=[upload_box], | |
outputs=[status] | |
) | |
process_btn.click( | |
fn=process_all, | |
inputs=[ | |
mode_toggle, | |
workbook_upload, | |
upload_box, | |
image_url_input, | |
format_choice, | |
width, | |
height | |
], | |
outputs=[ | |
single_downloads, | |
zip_file_hidden, | |
status, | |
temp_dir_state | |
] | |
) | |
zip_download_btn.click( | |
None, | |
inputs=[zip_file_hidden], | |
js="(file) => file && window.open(file.url, '_blank')" | |
) | |
clear_btn.click( | |
fn=clear_all, | |
inputs=[temp_dir_state], | |
outputs=[ | |
workbook_upload, image_data_state, upload_box, | |
zip_file_hidden, single_downloads, status, | |
temp_dir_state, image_url_input | |
] | |
) | |
stop_btn.click( | |
fn=lambda: "π Stop signal sent", | |
outputs=[status] | |
) | |
if __name__ == "__main__": | |
demo.queue().launch(debug=True) | |