import gradio as gr from PIL import Image import os import zipfile import pandas as pd import requests import tempfile import shutil from io import BytesIO import threading from datetime import datetime from concurrent.futures import ThreadPoolExecutor import time import random import cloudscraper from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager stop_event = threading.Event() cpu_cores = os.cpu_count() or 4 USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64)", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)", "Mozilla/5.0 (Linux; Android 10)", "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X)" ] PROXY_LIST = [ "http://user:pass@proxy1:port", "http://user:pass@proxy2:port", ] FAILED_LOG_PATH = "failed_downloads.txt" BLOCKED_SITES = ["bigw.com.au"] def random_headers(referer): """Return a rotating User‑Agent + referer header dict.""" return { "User-Agent": random.choice(USER_AGENTS), "Referer": referer } def try_download(url, headers, proxies): """ Attempt a simple GET through a proxy dict {"http":..., "https":...}. Returns the response if status_code==200 and content is image, else None. """ try: resp = session.get(url, headers=headers, proxies=proxies, stream=True, timeout=15) if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): return resp except Exception as e: print(f"⚠️ Proxy download error ({proxies}): {e}") return None session = requests.Session() # --- Utility Functions --- def process_url_images(data, fmt, w, h): return process_and_zip(data, fmt, w, h) def process_uploaded_images(files, fmt, w, h): tmp = tempfile.mkdtemp() data = [] for f in files: src = str(f) dst = os.path.join(tmp, os.path.basename(src)) shutil.copyfile(src, dst) data.append({"url": dst, "name": os.path.splitext(os.path.basename(dst))[0]}) return process_and_zip(data, fmt, w, h) def process_single_url_image(url, fmt, w, h): if not url.strip(): return [], None, "No URL provided", None return process_and_zip([{"url": url.strip(), "name": "single"}], fmt, w, h) handle_process = lambda mode, sd, ups, pu, fmt, w, h: ( process_url_images(sd, fmt, w, h) if mode.startswith("πŸ“„") and sd else process_uploaded_images(ups, fmt, w, h) if mode.startswith("πŸ“€") and ups else process_single_url_image(pu, fmt, w, h) if pu.strip() else ([], None, "⚠️ No valid input provided", None) ) from PIL import ImageFile ImageFile.LOAD_TRUNCATED_IMAGES = True def fetch_with_retries(url, headers, timeout=15, retries=2, backoff=1): """ Try up to retries+1 times to GET url with timeout seconds. Returns a requests.Response or None. """ for attempt in range(retries + 1): try: resp = session.get(url, headers=headers, stream=True, timeout=timeout) if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): return resp except Exception as e: print(f"⚠️ Attempt {attempt+1} failed for {url}: {e}") time.sleep(backoff * (attempt + 1)) return None def download_image(url, save_path): # Skip cached if os.path.exists(save_path) and os.path.getsize(save_path) > 1000: return save_path # Skip blocked if any(b in url for b in BLOCKED_SITES): log_failure(url, "Skipped known slow site") return None referer = f"{urlparse(url).scheme}://{urlparse(url).netloc}/" headers = random_headers(referer) # 1) Direct with retries print(f"πŸš€ Fetching with retries: {url}") resp = fetch_with_retries(url, headers, timeout=15, retries=2) if resp: with open(save_path, 'wb') as f: for chunk in resp.iter_content(8192): f.write(chunk) print("βœ… Direct fetch succeeded") return save_path else: print("❌ Direct fetch failed after retries") # 2) cloudscraper fallback print("🟠 cloudscraper fallback...") try: scraper = cloudscraper.create_scraper(sess=session) resp = scraper.get(url, headers=headers, stream=True, timeout=15) if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): with open(save_path, 'wb') as f: for chunk in resp.iter_content(8192): f.write(chunk) print("βœ… cloudscraper worked") return save_path except Exception as e: print(f"❌ cloudscraper failed: {e}") # 3) Selenium fallback try: print("πŸ§ͺ Headless browser fallback...") chrome_opts = Options() chrome_opts.add_argument("--headless") chrome_opts.add_argument("--disable-gpu") chrome_opts.add_argument("--no-sandbox") chrome_opts.add_argument("--disable-dev-shm-usage") chrome_opts.add_argument(f"user-agent={headers['User-Agent']}") driver = webdriver.Chrome( service=Service(ChromeDriverManager().install()), options=chrome_opts ) driver.get(url) time.sleep(4) final_url = driver.current_url cookies = {c['name']: c['value'] for c in driver.get_cookies()} driver.quit() with session: for k, v in cookies.items(): session.cookies.set(k, v) r = session.get(final_url, headers=headers, stream=True, timeout=15) if r.status_code == 200 and "image" in r.headers.get("Content-Type", ""): with open(save_path, 'wb') as f: for chunk in r.iter_content(8192): f.write(chunk) print("βœ… Selenium + cookies worked") return save_path except Exception as e: print(f"❌ Selenium failed: {e}") # 4) Give up log_failure(url, "All methods failed") return None lamp_html = """
""" def resize_with_padding(img, target_w, target_h, fill=(255,255,255)): img.thumbnail((target_w, target_h), Image.LANCZOS) bg = Image.new("RGB", (target_w, target_h), fill) x = (target_w - img.width) // 2 y = (target_h - img.height) // 2 bg.paste(img, (x, y)) return bg def threaded_download_and_open(item, temp_dir): from PIL import ImageSequence name, src = item['name'], item['url'] try: if os.path.exists(src): path = src else: temp_path = os.path.join(temp_dir, f"{name}.gif") path = download_image(src, temp_path) if not path: return (name, None, 'Download failed or invalid image') img = Image.open(path) # Rewind to first frame in case it's not if getattr(img, "is_animated", False): img.seek(0) frames = [] for frame in ImageSequence.Iterator(img): frame_copy = frame.convert("RGBA").copy() frames.append(frame_copy) return (name, frames, None) else: return (name, [img.convert("RGBA")], None) except Exception as e: return (name, None, str(e)) def resize_and_save_worker(args): name, img, fmt, w, h, out_dir = args try: # 1) Composite RGBA over white img = img.convert('RGBA') bg = Image.new('RGBA', img.size, (255,255,255,255)) img = Image.alpha_composite(bg, img) # 2) Determine save format & extension fmt_upper = fmt.upper() save_fmt = "JPEG" if fmt_upper in ("JPEG", "JPG") else fmt_upper ext = "jpg" if fmt_upper in ("JPEG", "JPG") else fmt.lower() # 3) Convert to RGB if needed if save_fmt == "JPEG": img = img.convert("RGB") # 4) Resize with padding img = resize_with_padding(img, w, h) # 5) Build filename & save out_name = f"{name}.{ext}" out_path = os.path.join(out_dir, out_name) img.save(out_path, format=save_fmt, quality=90) return out_path, None except Exception as e: return None, f"{name}: {e}" from multiprocessing import cpu_count import zipfile def process_and_zip(items, fmt, w, h): tmp = tempfile.mkdtemp() proc = os.path.join(tmp, 'out') os.makedirs(proc, exist_ok=True) files, fails = [], [] # 1) Download & open in parallel workers = cpu_count() * 2 with ThreadPoolExecutor(max_workers=workers) as ex: results = list(ex.map(lambda it: threaded_download_and_open(it, tmp), items)) # 2) Process each downloaded image/list of frames for name, imgs, err in results: if err or not imgs: fails.append(f"{name}: {err}") continue for i, img in enumerate(imgs): try: # Composite RGBA over white img = img.convert("RGBA") bg = Image.new("RGBA", img.size, (255,255,255,255)) img = Image.alpha_composite(bg, img) # Determine save format & extension fmt_upper = fmt.upper() save_fmt = "JPEG" if fmt_upper in ("JPEG","JPG") else fmt_upper ext = "jpg" if fmt_upper in ("JPEG","JPG") else fmt.lower() # Convert to RGB if JPEG if save_fmt == "JPEG": img = img.convert("RGB") # Resize with padding img = resize_with_padding(img, w, h) # Build filename fname = ( f"{name}_frame{i+1}.{ext}" if len(imgs) > 1 else f"{name}.{ext}" ) out_path = os.path.join(proc, fname) # Save & record img.save(out_path, format=save_fmt, quality=90) files.append(out_path) except Exception as e: print(f"⚠️ Save failed for {name}: {e}") fails.append(f"{name}_frame{i+1}: {e}") # 3) If nothing succeeded, clean up and report if not files: stop_event.clear() shutil.rmtree(tmp) return None, None, "No images processed.", None # 4) Create ZIP in β€œstore” mode (no compression) date_str = datetime.now().strftime("%Y-%m-%d") zip_name = f"{date_str}.zip" zip_path = os.path.join(tmp, zip_name) with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED) as zf: for f in files: zf.write(f, os.path.basename(f)) # 5) Build status message msg_lines = [f"βœ… Processed {len(files)} image(s)."] if fails: msg_lines.append(f"❌ Failed: {len(fails)} image(s)") msg_lines += [f" - {fail}" for fail in fails] stop_event.clear() return files, zip_path, "\n".join(msg_lines), tmp def read_uploaded_workbook(file): if not file: return [], "❌ No file uploaded" try: # read all sheets except "Cleared Data" xls = pd.ExcelFile(file.name) sheets = [s for s in xls.sheet_names if s.lower() != "cleared data"] df_list = [pd.read_excel(file.name, sheet_name=s, engine="openpyxl") for s in sheets] df = pd.concat(df_list, ignore_index=True) df.columns = [c.strip() for c in df.columns] item_col = next((c for c in df.columns if c.lower() == 'itemcode'), None) if not item_col: return [], "❌ Missing 'ItemCode' column" url_cols = [c for c in df.columns if any(k in c.lower() for k in ["url", "image", "link"])] data = [] for _, row in df.iterrows(): raw = row[item_col] if pd.isna(raw): continue key = str(raw).strip().split('.')[0] if str(raw).strip().replace('.', '', 1).isdigit() else str(raw).strip() idx = 0 for col in url_cols: if pd.notna(row[col]): name = f"{key}" if idx == 0 else f"{key}_{idx}" data.append({"url": str(row[col]).strip(), "name": name}) idx += 1 return data, f"βœ… Fetched {len(data)} image link(s)" except Exception as e: return [], f"❌ Error: {e}" def clear_all(tmp_dir): # wipe out the temp folder if it exists if tmp_dir and os.path.exists(tmp_dir): shutil.rmtree(tmp_dir) return ( None, # clear the single‐file upload [], # reset the image_data_state [], # clear the multi‐file upload gr.update(visible=False, value=None), # hide & clear the hidden ZIP file [], # clear the β€œindividual downloads” list "Cleared.", # status message None, # reset temp_dir_state "" # clear the URL textbox ) # === CSS === css = """ body { margin: 0; background: #4B352A; font-family: 'Segoe UI', sans-serif; color: #222; display: flex; justify-content: center; padding: 2rem; transition: background 0.3s, color 0.3s; } .gradio-container { max-width: 1200px; /* You can reduce this (e.g., 600px) */ width: 100%; margin: 0 auto; h1 { font-size: 24px !important; font-weight: 700 !important; margin-bottom: 1rem !important; color: #FFFFFF !important; text-align: center !important; } .panel { background: white; padding: 1rem; border-radius: 6px; border: 1px solid #ddd; box-shadow: 0 1px 2px rgba(0,0,0,0.05); } #clear-btn, #stop-btn { margin-top: 1rem; font-size: 0.85rem !important; font-weight: 600 !important; padding: 0.5rem 1rem !important; border-radius: 8px !important; border: none !important; box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important; transition: background-color 0.2s ease !important; width: 100% !important; } .btn-row { display: flex; justify-content: space-between; gap: 0.5rem; } #clear-btn { background-color: #e74c3c !important; color: white !important; width: 50% !important; } #clear-btn:hover { background-color: #c0392b !important; cursor: pointer !important; } #stop-btn { background-color: #CA7842 !important; color: white !important; width: 50% !important; } #stop-btn:hover { background-color: #FFA55D !important; cursor: pointer !important; } .gradio-container .gr-row > div { padding: 4px !important; } #process-btn-url { background-color: #A4B465 !important; color: #333 !important; } #fetch-btn { background-color: #cfbed7 !important; color: #333 !important; } #status-box { background-color: #ffffff; border: 2px solid #cfbed7; border-radius: 6px; padding: 1rem; font-size: 0.95rem; color: #333; /* Styling the toggle radio buttons */ .gradio-container label.svelte-1ipelgc { font-weight: 600 !important; border: 2px solid #A4B465 !important; padding: 0.4rem 1rem !important; border-radius: 12px !important; margin: 0.25rem !important; background-color: #f5fbe8 !important; transition: all 0.3s ease-in-out; cursor: pointer; } /* Selected option */ input[type="radio"]:checked + label.svelte-1ipelgc { background-color: #A4B465 !important; color: white !important; border-color: #889E46 !important; } /* Unselected hover */ .gradio-container input[type="radio"] + label.svelte-1ipelgc:hover { background-color: #e0f1c4 !important; } } """ gr.HTML("""
""") def process_all(mode, workbook_file, ups, pu, fmt, w, h): # 1️⃣ If workbook mode, read links from Excel if mode.startswith("πŸ“„"): data, msg = read_uploaded_workbook(workbook_file) print("πŸ’‘ DEBUG – links extracted:", data) # <<< DEBUG if not data: # no links or error reading return [], None, msg, None # we have a list of {"url":…, "name":…} files, zip_path, proc_msg, tmp = process_and_zip(data, fmt, w, h) return files, zip_path, f"{msg}\n{proc_msg}", tmp # 2️⃣ If β€œUpload Images” mode if mode.startswith("πŸ“€") and ups: return process_uploaded_images(ups, fmt, w, h) # 3️⃣ If single‑URL mode if pu and pu.strip(): return process_single_url_image(pu, fmt, w, h) # 4️⃣ Nothing valid return [], None, "⚠️ No valid input provided", None if os.path.exists(FAILED_LOG_PATH): with open(FAILED_LOG_PATH) as f: failed = [line.split("\t")[1] for line in f if line.strip()] if failed: print(f"πŸ”„ Retrying {len(failed)} failed URLs…") # reuse your ThreadPoolExecutor to download them to a β€œretries” folder from concurrent.futures import ThreadPoolExecutor, as_completed os.makedirs("retries", exist_ok=True) def retry_one(url): fname = os.path.join("retries", url.split("/")[-1].split("?")[0]) r = fetch_with_retries(url, random_headers(f"{urlparse(url).scheme}://{urlparse(url).netloc}/"), timeout=15, retries=2) if r: with open(fname, 'wb') as f: for c in r.iter_content(8192): f.write(c) return True return False with ThreadPoolExecutor(max_workers=8) as ex: results = list(ex.map(retry_one, failed)) print(f"βœ… Retried successes: {sum(results)} / {len(failed)}") with gr.Blocks(css=css) as demo: # ─── States ─── image_data_state = gr.State([]) temp_dir_state = gr.State(None) # ─── Header ─── gr.HTML(lamp_html) with gr.Row(): gr.Markdown("

πŸ–ΌοΈ Image Processor

") # ─── Main layout ─── with gr.Row(): # ── LEFT PANEL ── with gr.Column(scale=3, min_width=500): with gr.Group(elem_classes="panel"): mode_toggle = gr.Radio( ["πŸ“„ Upload Workbook", "πŸ“€ Upload Images"], value="πŸ“„ Upload Workbook", label="Select Input Method" ) workbook_upload = gr.File( label="πŸ“‚ Upload .xlsx/.xlsm Workbook", file_types=['.xlsx', '.xlsm'], visible=True ) upload_box = gr.File( label="πŸ“ Upload Images", file_count="multiple", visible=False ) image_url_input = gr.Textbox( label="🌐 Paste Image URL", visible=False ) process_btn = gr.Button("βš™οΈ Process", elem_id="process-btn-url") with gr.Group(elem_classes="panel"): with gr.Row(): format_choice = gr.Dropdown( ["JPG","PNG","WEBP","TIFF","GIF","JFIF","AVIF"], label="πŸ–ΌοΈ Format", value="JPG" ) width = gr.Number(label="Width (px)", value=1000, precision=0) height = gr.Number(label="Height (px)", value=1000, precision=0) # ── RIGHT PANEL ── with gr.Column(scale=2, min_width=400): with gr.Group(elem_classes="panel"): status = gr.Textbox( label="πŸ“£ Status", lines=6, interactive=False, elem_id="status-box" ) zip_download_btn = gr.Button("πŸ“¦ Download ZIP") zip_file_hidden = gr.File(visible=False) with gr.Accordion("🧷 Individual Files", open=False): single_downloads = gr.File(label="Files", file_count="multiple") with gr.Row(elem_classes="btn-row"): stop_btn = gr.Button("Stop", elem_id="stop-btn") clear_btn = gr.Button("Clear", elem_id="clear-btn") # ─── Footer ─── gr.HTML( "
" "Created with πŸ’œ by Vishakha" "
" ) # ─── EVENT HOOKS ─── mode_toggle.change( fn=lambda choice: ( gr.update(visible=choice.startswith("πŸ“„")), gr.update(visible=not choice.startswith("πŸ“„")), gr.update(visible=not choice.startswith("πŸ“„")) ), inputs=[mode_toggle], outputs=[workbook_upload, upload_box, image_url_input] ) workbook_upload.change( fn=read_uploaded_workbook, inputs=[workbook_upload], outputs=[image_data_state, status] ) upload_box.change( fn=lambda files: f"{len(files)} files ready." if files else "No files selected", inputs=[upload_box], outputs=[status] ) process_btn.click( fn=process_all, inputs=[ mode_toggle, workbook_upload, upload_box, image_url_input, format_choice, width, height ], outputs=[ single_downloads, zip_file_hidden, status, temp_dir_state ] ) zip_download_btn.click( None, inputs=[zip_file_hidden], js="(file) => file && window.open(file.url, '_blank')" ) clear_btn.click( fn=clear_all, inputs=[temp_dir_state], outputs=[ workbook_upload, image_data_state, upload_box, zip_file_hidden, single_downloads, status, temp_dir_state, image_url_input ] ) stop_btn.click( fn=lambda: "πŸ›‘ Stop signal sent", outputs=[status] ) if __name__ == "__main__": demo.queue().launch(debug=True)