import gradio as gr
from PIL import Image
import os
import zipfile
import pandas as pd
import requests
import tempfile
import shutil
from io import BytesIO
import threading
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import time
import random
import cloudscraper
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
stop_event = threading.Event()
cpu_cores = os.cpu_count() or 4
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Mozilla/5.0 (Linux; Android 10)",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X)"
]
PROXY_LIST = [
"http://user:pass@proxy1:port",
"http://user:pass@proxy2:port",
]
FAILED_LOG_PATH = "failed_downloads.txt"
BLOCKED_SITES = ["bigw.com.au"]
def random_headers(referer):
"""Return a rotating UserβAgent + referer header dict."""
return {
"User-Agent": random.choice(USER_AGENTS),
"Referer": referer
}
def try_download(url, headers, proxies):
"""
Attempt a simple GET through a proxy dict {"http":..., "https":...}.
Returns the response if status_code==200 and content is image, else None.
"""
try:
resp = session.get(url, headers=headers, proxies=proxies, stream=True, timeout=15)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
return resp
except Exception as e:
print(f"β οΈ Proxy download error ({proxies}): {e}")
return None
session = requests.Session()
# --- Utility Functions ---
def process_url_images(data, fmt, w, h):
return process_and_zip(data, fmt, w, h)
def process_uploaded_images(files, fmt, w, h):
tmp = tempfile.mkdtemp()
data = []
for f in files:
src = str(f)
dst = os.path.join(tmp, os.path.basename(src))
shutil.copyfile(src, dst)
data.append({"url": dst, "name": os.path.splitext(os.path.basename(dst))[0]})
return process_and_zip(data, fmt, w, h)
def process_single_url_image(url, fmt, w, h):
if not url.strip():
return [], None, "No URL provided", None
return process_and_zip([{"url": url.strip(), "name": "single"}], fmt, w, h)
handle_process = lambda mode, sd, ups, pu, fmt, w, h: (
process_url_images(sd, fmt, w, h) if mode.startswith("π") and sd else
process_uploaded_images(ups, fmt, w, h) if mode.startswith("π€") and ups else
process_single_url_image(pu, fmt, w, h) if pu.strip() else
([], None, "β οΈ No valid input provided", None)
)
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
def fetch_with_retries(url, headers, timeout=15, retries=2, backoff=1):
"""
Try up to retries+1 times to GET url with timeout seconds.
Returns a requests.Response or None.
"""
for attempt in range(retries + 1):
try:
resp = session.get(url, headers=headers, stream=True, timeout=timeout)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
return resp
except Exception as e:
print(f"β οΈ Attempt {attempt+1} failed for {url}: {e}")
time.sleep(backoff * (attempt + 1))
return None
def download_image(url, save_path):
# Skip cached
if os.path.exists(save_path) and os.path.getsize(save_path) > 1000:
return save_path
# Skip blocked
if any(b in url for b in BLOCKED_SITES):
log_failure(url, "Skipped known slow site")
return None
referer = f"{urlparse(url).scheme}://{urlparse(url).netloc}/"
headers = random_headers(referer)
# 1) Direct with retries
print(f"π Fetching with retries: {url}")
resp = fetch_with_retries(url, headers, timeout=15, retries=2)
if resp:
with open(save_path, 'wb') as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
print("β
Direct fetch succeeded")
return save_path
else:
print("β Direct fetch failed after retries")
# 2) cloudscraper fallback
print("π cloudscraper fallback...")
try:
scraper = cloudscraper.create_scraper(sess=session)
resp = scraper.get(url, headers=headers, stream=True, timeout=15)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
with open(save_path, 'wb') as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
print("β
cloudscraper worked")
return save_path
except Exception as e:
print(f"β cloudscraper failed: {e}")
# 3) Selenium fallback
try:
print("π§ͺ Headless browser fallback...")
chrome_opts = Options()
chrome_opts.add_argument("--headless")
chrome_opts.add_argument("--disable-gpu")
chrome_opts.add_argument("--no-sandbox")
chrome_opts.add_argument("--disable-dev-shm-usage")
chrome_opts.add_argument(f"user-agent={headers['User-Agent']}")
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_opts
)
driver.get(url)
time.sleep(4)
final_url = driver.current_url
cookies = {c['name']: c['value'] for c in driver.get_cookies()}
driver.quit()
with session:
for k, v in cookies.items():
session.cookies.set(k, v)
r = session.get(final_url, headers=headers, stream=True, timeout=15)
if r.status_code == 200 and "image" in r.headers.get("Content-Type", ""):
with open(save_path, 'wb') as f:
for chunk in r.iter_content(8192):
f.write(chunk)
print("β
Selenium + cookies worked")
return save_path
except Exception as e:
print(f"β Selenium failed: {e}")
# 4) Give up
log_failure(url, "All methods failed")
return None
lamp_html = """
"""
def resize_with_padding(img, target_w, target_h, fill=(255,255,255)):
img.thumbnail((target_w, target_h), Image.LANCZOS)
bg = Image.new("RGB", (target_w, target_h), fill)
x = (target_w - img.width) // 2
y = (target_h - img.height) // 2
bg.paste(img, (x, y))
return bg
def threaded_download_and_open(item, temp_dir):
from PIL import ImageSequence
name, src = item['name'], item['url']
try:
if os.path.exists(src):
path = src
else:
temp_path = os.path.join(temp_dir, f"{name}.gif")
path = download_image(src, temp_path)
if not path:
return (name, None, 'Download failed or invalid image')
img = Image.open(path)
# Rewind to first frame in case it's not
if getattr(img, "is_animated", False):
img.seek(0)
frames = []
for frame in ImageSequence.Iterator(img):
frame_copy = frame.convert("RGBA").copy()
frames.append(frame_copy)
return (name, frames, None)
else:
return (name, [img.convert("RGBA")], None)
except Exception as e:
return (name, None, str(e))
def resize_and_save_worker(args):
name, img, fmt, w, h, out_dir = args
try:
# 1) Composite RGBA over white
img = img.convert('RGBA')
bg = Image.new('RGBA', img.size, (255,255,255,255))
img = Image.alpha_composite(bg, img)
# 2) Determine save format & extension
fmt_upper = fmt.upper()
save_fmt = "JPEG" if fmt_upper in ("JPEG", "JPG") else fmt_upper
ext = "jpg" if fmt_upper in ("JPEG", "JPG") else fmt.lower()
# 3) Convert to RGB if needed
if save_fmt == "JPEG":
img = img.convert("RGB")
# 4) Resize with padding
img = resize_with_padding(img, w, h)
# 5) Build filename & save
out_name = f"{name}.{ext}"
out_path = os.path.join(out_dir, out_name)
img.save(out_path, format=save_fmt, quality=90)
return out_path, None
except Exception as e:
return None, f"{name}: {e}"
from multiprocessing import cpu_count
import zipfile
def process_and_zip(items, fmt, w, h):
tmp = tempfile.mkdtemp()
proc = os.path.join(tmp, 'out')
os.makedirs(proc, exist_ok=True)
files, fails = [], []
# 1) Download & open in parallel
workers = cpu_count() * 2
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(lambda it: threaded_download_and_open(it, tmp), items))
# 2) Process each downloaded image/list of frames
for name, imgs, err in results:
if err or not imgs:
fails.append(f"{name}: {err}")
continue
for i, img in enumerate(imgs):
try:
# Composite RGBA over white
img = img.convert("RGBA")
bg = Image.new("RGBA", img.size, (255,255,255,255))
img = Image.alpha_composite(bg, img)
# Determine save format & extension
fmt_upper = fmt.upper()
save_fmt = "JPEG" if fmt_upper in ("JPEG","JPG") else fmt_upper
ext = "jpg" if fmt_upper in ("JPEG","JPG") else fmt.lower()
# Convert to RGB if JPEG
if save_fmt == "JPEG":
img = img.convert("RGB")
# Resize with padding
img = resize_with_padding(img, w, h)
# Build filename
fname = (
f"{name}_frame{i+1}.{ext}"
if len(imgs) > 1
else f"{name}.{ext}"
)
out_path = os.path.join(proc, fname)
# Save & record
img.save(out_path, format=save_fmt, quality=90)
files.append(out_path)
except Exception as e:
print(f"β οΈ Save failed for {name}: {e}")
fails.append(f"{name}_frame{i+1}: {e}")
# 3) If nothing succeeded, clean up and report
if not files:
stop_event.clear()
shutil.rmtree(tmp)
return None, None, "No images processed.", None
# 4) Create ZIP in βstoreβ mode (no compression)
date_str = datetime.now().strftime("%Y-%m-%d")
zip_name = f"{date_str}.zip"
zip_path = os.path.join(tmp, zip_name)
with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED) as zf:
for f in files:
zf.write(f, os.path.basename(f))
# 5) Build status message
msg_lines = [f"β
Processed {len(files)} image(s)."]
if fails:
msg_lines.append(f"β Failed: {len(fails)} image(s)")
msg_lines += [f" - {fail}" for fail in fails]
stop_event.clear()
return files, zip_path, "\n".join(msg_lines), tmp
def read_uploaded_workbook(file):
if not file:
return [], "β No file uploaded"
try:
# read all sheets except "Cleared Data"
xls = pd.ExcelFile(file.name)
sheets = [s for s in xls.sheet_names if s.lower() != "cleared data"]
df_list = [pd.read_excel(file.name, sheet_name=s, engine="openpyxl") for s in sheets]
df = pd.concat(df_list, ignore_index=True)
df.columns = [c.strip() for c in df.columns]
item_col = next((c for c in df.columns if c.lower() == 'itemcode'), None)
if not item_col:
return [], "β Missing 'ItemCode' column"
url_cols = [c for c in df.columns if any(k in c.lower() for k in ["url", "image", "link"])]
data = []
for _, row in df.iterrows():
raw = row[item_col]
if pd.isna(raw):
continue
key = str(raw).strip().split('.')[0] if str(raw).strip().replace('.', '', 1).isdigit() else str(raw).strip()
idx = 0
for col in url_cols:
if pd.notna(row[col]):
name = f"{key}" if idx == 0 else f"{key}_{idx}"
data.append({"url": str(row[col]).strip(), "name": name})
idx += 1
return data, f"β
Fetched {len(data)} image link(s)"
except Exception as e:
return [], f"β Error: {e}"
def clear_all(tmp_dir):
# wipe out the temp folder if it exists
if tmp_dir and os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
return (
None, # clear the singleβfile upload
[], # reset the image_data_state
[], # clear the multiβfile upload
gr.update(visible=False, value=None), # hide & clear the hidden ZIP file
[], # clear the βindividual downloadsβ list
"Cleared.", # status message
None, # reset temp_dir_state
"" # clear the URL textbox
)
# === CSS ===
css = """
body {
margin: 0;
background: #4B352A;
font-family: 'Segoe UI', sans-serif;
color: #222;
display: flex;
justify-content: center;
padding: 2rem;
transition: background 0.3s, color 0.3s;
}
.gradio-container {
max-width: 1200px; /* You can reduce this (e.g., 600px) */
width: 100%;
margin: 0 auto;
h1 {
font-size: 24px !important;
font-weight: 700 !important;
margin-bottom: 1rem !important;
color: #FFFFFF !important;
text-align: center !important;
}
.panel {
background: white;
padding: 1rem;
border-radius: 6px;
border: 1px solid #ddd;
box-shadow: 0 1px 2px rgba(0,0,0,0.05);
}
#clear-btn, #stop-btn {
margin-top: 1rem;
font-size: 0.85rem !important;
font-weight: 600 !important;
padding: 0.5rem 1rem !important;
border-radius: 8px !important;
border: none !important;
box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important;
transition: background-color 0.2s ease !important;
width: 100% !important;
}
.btn-row {
display: flex;
justify-content: space-between;
gap: 0.5rem;
}
#clear-btn {
background-color: #e74c3c !important;
color: white !important;
width: 50% !important;
}
#clear-btn:hover {
background-color: #c0392b !important;
cursor: pointer !important;
}
#stop-btn {
background-color: #CA7842 !important;
color: white !important;
width: 50% !important;
}
#stop-btn:hover {
background-color: #FFA55D !important;
cursor: pointer !important;
}
.gradio-container .gr-row > div {
padding: 4px !important;
}
#process-btn-url {
background-color: #A4B465 !important;
color: #333 !important;
}
#fetch-btn {
background-color: #cfbed7 !important;
color: #333 !important;
}
#status-box {
background-color: #ffffff;
border: 2px solid #cfbed7;
border-radius: 6px;
padding: 1rem;
font-size: 0.95rem;
color: #333;
/* Styling the toggle radio buttons */
.gradio-container label.svelte-1ipelgc {
font-weight: 600 !important;
border: 2px solid #A4B465 !important;
padding: 0.4rem 1rem !important;
border-radius: 12px !important;
margin: 0.25rem !important;
background-color: #f5fbe8 !important;
transition: all 0.3s ease-in-out;
cursor: pointer;
}
/* Selected option */
input[type="radio"]:checked + label.svelte-1ipelgc {
background-color: #A4B465 !important;
color: white !important;
border-color: #889E46 !important;
}
/* Unselected hover */
.gradio-container input[type="radio"] + label.svelte-1ipelgc:hover {
background-color: #e0f1c4 !important;
}
}
"""
gr.HTML("""
""")
def process_all(mode, workbook_file, ups, pu, fmt, w, h):
# 1οΈβ£ If workbook mode, read links from Excel
if mode.startswith("π"):
data, msg = read_uploaded_workbook(workbook_file)
print("π‘ DEBUG β links extracted:", data) # <<< DEBUG
if not data:
# no links or error reading
return [], None, msg, None
# we have a list of {"url":β¦, "name":β¦}
files, zip_path, proc_msg, tmp = process_and_zip(data, fmt, w, h)
return files, zip_path, f"{msg}\n{proc_msg}", tmp
# 2οΈβ£ If βUpload Imagesβ mode
if mode.startswith("π€") and ups:
return process_uploaded_images(ups, fmt, w, h)
# 3οΈβ£ If singleβURL mode
if pu and pu.strip():
return process_single_url_image(pu, fmt, w, h)
# 4οΈβ£ Nothing valid
return [], None, "β οΈ No valid input provided", None
if os.path.exists(FAILED_LOG_PATH):
with open(FAILED_LOG_PATH) as f:
failed = [line.split("\t")[1] for line in f if line.strip()]
if failed:
print(f"π Retrying {len(failed)} failed URLsβ¦")
# reuse your ThreadPoolExecutor to download them to a βretriesβ folder
from concurrent.futures import ThreadPoolExecutor, as_completed
os.makedirs("retries", exist_ok=True)
def retry_one(url):
fname = os.path.join("retries", url.split("/")[-1].split("?")[0])
r = fetch_with_retries(url, random_headers(f"{urlparse(url).scheme}://{urlparse(url).netloc}/"), timeout=15, retries=2)
if r:
with open(fname, 'wb') as f:
for c in r.iter_content(8192): f.write(c)
return True
return False
with ThreadPoolExecutor(max_workers=8) as ex:
results = list(ex.map(retry_one, failed))
print(f"β
Retried successes: {sum(results)} / {len(failed)}")
with gr.Blocks(css=css) as demo:
# βββ States βββ
image_data_state = gr.State([])
temp_dir_state = gr.State(None)
# βββ Header βββ
gr.HTML(lamp_html)
with gr.Row():
gr.Markdown("πΌοΈ Image Processor
")
# βββ Main layout βββ
with gr.Row():
# ββ LEFT PANEL ββ
with gr.Column(scale=3, min_width=500):
with gr.Group(elem_classes="panel"):
mode_toggle = gr.Radio(
["π Upload Workbook", "π€ Upload Images"],
value="π Upload Workbook",
label="Select Input Method"
)
workbook_upload = gr.File(
label="π Upload .xlsx/.xlsm Workbook",
file_types=['.xlsx', '.xlsm'],
visible=True
)
upload_box = gr.File(
label="π Upload Images",
file_count="multiple",
visible=False
)
image_url_input = gr.Textbox(
label="π Paste Image URL",
visible=False
)
process_btn = gr.Button("βοΈ Process", elem_id="process-btn-url")
with gr.Group(elem_classes="panel"):
with gr.Row():
format_choice = gr.Dropdown(
["JPG","PNG","WEBP","TIFF","GIF","JFIF","AVIF"],
label="πΌοΈ Format",
value="JPG"
)
width = gr.Number(label="Width (px)", value=1000, precision=0)
height = gr.Number(label="Height (px)", value=1000, precision=0)
# ββ RIGHT PANEL ββ
with gr.Column(scale=2, min_width=400):
with gr.Group(elem_classes="panel"):
status = gr.Textbox(
label="π£ Status",
lines=6,
interactive=False,
elem_id="status-box"
)
zip_download_btn = gr.Button("π¦ Download ZIP")
zip_file_hidden = gr.File(visible=False)
with gr.Accordion("π§· Individual Files", open=False):
single_downloads = gr.File(label="Files", file_count="multiple")
with gr.Row(elem_classes="btn-row"):
stop_btn = gr.Button("Stop", elem_id="stop-btn")
clear_btn = gr.Button("Clear", elem_id="clear-btn")
# βββ Footer βββ
gr.HTML(
""
"Created with π by Vishakha"
"
"
)
# βββ EVENT HOOKS βββ
mode_toggle.change(
fn=lambda choice: (
gr.update(visible=choice.startswith("π")),
gr.update(visible=not choice.startswith("π")),
gr.update(visible=not choice.startswith("π"))
),
inputs=[mode_toggle],
outputs=[workbook_upload, upload_box, image_url_input]
)
workbook_upload.change(
fn=read_uploaded_workbook,
inputs=[workbook_upload],
outputs=[image_data_state, status]
)
upload_box.change(
fn=lambda files: f"{len(files)} files ready." if files else "No files selected",
inputs=[upload_box],
outputs=[status]
)
process_btn.click(
fn=process_all,
inputs=[
mode_toggle,
workbook_upload,
upload_box,
image_url_input,
format_choice,
width,
height
],
outputs=[
single_downloads,
zip_file_hidden,
status,
temp_dir_state
]
)
zip_download_btn.click(
None,
inputs=[zip_file_hidden],
js="(file) => file && window.open(file.url, '_blank')"
)
clear_btn.click(
fn=clear_all,
inputs=[temp_dir_state],
outputs=[
workbook_upload, image_data_state, upload_box,
zip_file_hidden, single_downloads, status,
temp_dir_state, image_url_input
]
)
stop_btn.click(
fn=lambda: "π Stop signal sent",
outputs=[status]
)
if __name__ == "__main__":
demo.queue().launch(debug=True)