import gradio as gr from PIL import Image import os import zipfile import pandas as pd import requests import tempfile import shutil from io import BytesIO import threading from datetime import datetime from concurrent.futures import ThreadPoolExecutor import time import random import cloudscraper from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor, as_completed from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager stop_event = threading.Event() cpu_cores = os.cpu_count() or 4 USER_AGENTS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64)", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)", "Mozilla/5.0 (Linux; Android 10)", "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X)" ] PROXY_LIST = [ "http://user:pass@proxy1:port", "http://user:pass@proxy2:port", ] FAILED_LOG_PATH = "failed_downloads.txt" BLOCKED_SITES = ["bigw.com.au"] session = requests.Session() # --- Utility Functions --- def process_url_images(data, fmt, w, h): return process_and_zip(data, fmt, w, h) def process_uploaded_images(files, fmt, w, h): tmp = tempfile.mkdtemp() data = [] for f in files: src = str(f) dst = os.path.join(tmp, os.path.basename(src)) shutil.copyfile(src, dst) data.append({"url": dst, "name": os.path.splitext(os.path.basename(dst))[0]}) return process_and_zip(data, fmt, w, h) def process_single_url_image(url, fmt, w, h): if not url.strip(): return [], None, "No URL provided", None return process_and_zip([{"url": url.strip(), "name": "single"}], fmt, w, h) handle_process = lambda mode, sd, ups, pu, fmt, w, h: ( process_url_images(sd, fmt, w, h) if mode.startswith("πŸ“„") and sd else process_uploaded_images(ups, fmt, w, h) if mode.startswith("πŸ“€") and ups else process_single_url_image(pu, fmt, w, h) if pu.strip() else ([], None, "⚠️ No valid input provided", None) ) import cloudscraper def download_image(url, save_path): if os.path.exists(save_path) and os.path.getsize(save_path) > 1000: print(f"βœ… Skipping cached: {save_path}") return save_path if any(blocked in url for blocked in BLOCKED_SITES): log_failure(url, "Skipped known slow site") return None referer = f"{urlparse(url).scheme}://{urlparse(url).netloc}/" headers = random_headers(referer) # 1. Try direct try: print("πŸš€ Trying direct...") resp = session.get(url, headers=headers, stream=True, timeout=8) if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): with open(save_path, 'wb') as f: for chunk in resp.iter_content(8192): f.write(chunk) print("βœ… Direct worked") return save_path except Exception as e: print(f"⚠️ Direct failed: {e}") # 2. Try proxies in parallel if PROXY_LIST: headers = random_headers(referer) with ThreadPoolExecutor(max_workers=5) as executor: futures = { executor.submit(try_download, url, headers, {"http": p, "https": p}): p for p in random.sample(PROXY_LIST, min(5, len(PROXY_LIST))) } for future in as_completed(futures): result = future.result() if result: with open(save_path, 'wb') as f: for chunk in result.iter_content(8192): f.write(chunk) print("βœ… Proxy worked:", futures[future]) return save_path # 3. cloudscraper fallback try: print("🟠 cloudscraper fallback...") scraper = cloudscraper.create_scraper(sess=session) resp = scraper.get(url, headers=headers, stream=True, timeout=12) if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""): with open(save_path, 'wb') as f: for chunk in resp.iter_content(8192): f.write(chunk) print("βœ… cloudscraper worked") return save_path except Exception as e: print(f"❌ cloudscraper failed: {e}") # 4. Final fallback: Selenium try: print("πŸ§ͺ Headless browser fallback...") chrome_opts = Options() chrome_opts.add_argument("--headless") chrome_opts.add_argument("--disable-gpu") chrome_opts.add_argument("--no-sandbox") chrome_opts.add_argument("--disable-dev-shm-usage") chrome_opts.add_argument(f"user-agent={headers['User-Agent']}") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_opts) driver.get(url) time.sleep(4) final_url = driver.current_url cookies = {c['name']: c['value'] for c in driver.get_cookies()} driver.quit() with session: for k, v in cookies.items(): session.cookies.set(k, v) r = session.get(final_url, headers=headers, stream=True, timeout=15) if r.status_code == 200 and 'image' in r.headers.get("Content-Type", ""): with open(save_path, 'wb') as f: for chunk in r.iter_content(8192): f.write(chunk) print("βœ… Selenium + cookies worked") return save_path else: log_failure(url, f"Selenium bad response: {r.status_code}") except Exception as e: print(f"❌ Selenium failed: {e}") log_failure(url, f"Selenium exception: {e}") # Fail log_failure(url, "All methods failed") return None lamp_html = """
""" def resize_with_padding(img, target_w, target_h, fill=(255,255,255)): img.thumbnail((target_w, target_h), Image.LANCZOS) bg = Image.new("RGB", (target_w, target_h), fill) x = (target_w - img.width) // 2 y = (target_h - img.height) // 2 bg.paste(img, (x, y)) return bg def threaded_download_and_open(item, temp_dir): from PIL import ImageSequence name, src = item['name'], item['url'] try: if os.path.exists(src): path = src else: temp_path = os.path.join(temp_dir, f"{name}.gif") path = download_image(src, temp_path) if not path: return (name, None, 'Download failed or invalid image') img = Image.open(path) # Rewind to first frame in case it's not if getattr(img, "is_animated", False): img.seek(0) frames = [] for frame in ImageSequence.Iterator(img): frame_copy = frame.convert("RGBA").copy() frames.append(frame_copy) return (name, frames, None) else: return (name, [img.convert("RGBA")], None) except Exception as e: return (name, None, str(e)) def resize_and_save_worker(args): name, img, fmt, w, h, out_dir = args try: img = img.convert('RGBA') bg = Image.new('RGBA', img.size, (255,255,255,255)) img = Image.alpha_composite(bg, img) if fmt.upper() == 'JPEG': img = img.convert('RGB') img = resize_with_padding(img, w, h) out_name = f"{name}.{fmt.lower()}" out_path = os.path.join(out_dir, out_name) img.save(out_path, format=fmt.upper(), quality=90) return out_path, None except Exception as e: return None, f"{name}: {e}" def process_and_zip(items, fmt, w, h): tmp = tempfile.mkdtemp() proc = os.path.join(tmp, 'out') os.makedirs(proc, exist_ok=True) files, fails = [], [] with ThreadPoolExecutor(max_workers=8) as ex: results = list(ex.map(lambda it: threaded_download_and_open(it, tmp), items)) for name, imgs, err in results: if stop_event.is_set(): print("πŸ›‘ Stopped before processing.") break if err or imgs is None: fails.append(f"{name}: {err}") continue for i, img in enumerate(imgs): try: img = img.convert('RGBA') bg = Image.new('RGBA', img.size, (255, 255, 255, 255)) img = Image.alpha_composite(bg, img) if fmt.upper() == 'JPEG': img = img.convert('RGB') img = resize_with_padding(img, w, h) fname = f"{name}_frame{i+1}.{fmt.lower()}" if len(imgs) > 1 else f"{name}.{fmt.lower()}" path = os.path.join(proc, fname) img.save(path, format=fmt.upper(), quality=90) files.append(path) except Exception as e: fails.append(f"{name}_frame{i+1}: {e}") if not files: stop_event.clear() shutil.rmtree(tmp) return None, None, "No images processed.", None date_str = datetime.now().strftime("%Y-%m-%d") zip_name = f"{date_str}.zip" zip_path = os.path.join(tmp, zip_name) with zipfile.ZipFile(zip_path, 'w') as zf: for f in files: zf.write(f, os.path.basename(f)) msg_lines = [f"βœ… Processed {len(files)} image(s)."] if fails: msg_lines.append(f"❌ Failed: {len(fails)} image(s)") msg_lines += [f" - {fail}" for fail in fails] stop_event.clear() return files, zip_path, "\n".join(msg_lines), tmp def read_uploaded_workbook(file): if not file: return [], "❌ No file uploaded" try: # read all sheets except "Cleared Data" xls = pd.ExcelFile(file.name) sheets = [s for s in xls.sheet_names if s.lower() != "cleared data"] df_list = [pd.read_excel(file.name, sheet_name=s, engine="openpyxl") for s in sheets] df = pd.concat(df_list, ignore_index=True) df.columns = [c.strip() for c in df.columns] item_col = next((c for c in df.columns if c.lower() == 'itemcode'), None) if not item_col: return [], "❌ Missing 'ItemCode' column" url_cols = [c for c in df.columns if any(k in c.lower() for k in ["url", "image", "link"])] data = [] for _, row in df.iterrows(): raw = row[item_col] if pd.isna(raw): continue key = str(raw).strip().split('.')[0] if str(raw).strip().replace('.', '', 1).isdigit() else str(raw).strip() idx = 0 for col in url_cols: if pd.notna(row[col]): name = f"{key}" if idx == 0 else f"{key}_{idx}" data.append({"url": str(row[col]).strip(), "name": name}) idx += 1 return data, f"βœ… Fetched {len(data)} image link(s)" except Exception as e: return [], f"❌ Error: {e}" def clear_all(tmp_dir): if tmp_dir and os.path.exists(tmp_dir): shutil.rmtree(tmp_dir) return "", [], [], gr.update(visible=False), [], "Cleared.", None, "" # === CSS === css = """ body { margin: 0; background: #4B352A; font-family: 'Segoe UI', sans-serif; color: #222; display: flex; justify-content: center; padding: 2rem; transition: background 0.3s, color 0.3s; } .gradio-container { max-width: 1200px; /* You can reduce this (e.g., 600px) */ width: 100%; margin: 0 auto; h1 { font-size: 24px !important; font-weight: 700 !important; margin-bottom: 1rem !important; color: #FFFFFF !important; text-align: center !important; } .panel { background: white; padding: 1rem; border-radius: 6px; border: 1px solid #ddd; box-shadow: 0 1px 2px rgba(0,0,0,0.05); } #clear-btn, #stop-btn { margin-top: 1rem; font-size: 0.85rem !important; font-weight: 600 !important; padding: 0.5rem 1rem !important; border-radius: 8px !important; border: none !important; box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important; transition: background-color 0.2s ease !important; width: 100% !important; } .btn-row { display: flex; justify-content: space-between; gap: 0.5rem; } #clear-btn { background-color: #e74c3c !important; color: white !important; width: 50% !important; } #clear-btn:hover { background-color: #c0392b !important; cursor: pointer !important; } #stop-btn { background-color: #CA7842 !important; color: white !important; width: 50% !important; } #stop-btn:hover { background-color: #FFA55D !important; cursor: pointer !important; } .gradio-container .gr-row > div { padding: 4px !important; } #process-btn-url { background-color: #A4B465 !important; color: #333 !important; } #fetch-btn { background-color: #cfbed7 !important; color: #333 !important; } #status-box { background-color: #ffffff; border: 2px solid #cfbed7; border-radius: 6px; padding: 1rem; font-size: 0.95rem; color: #333; /* Styling the toggle radio buttons */ .gradio-container label.svelte-1ipelgc { font-weight: 600 !important; border: 2px solid #A4B465 !important; padding: 0.4rem 1rem !important; border-radius: 12px !important; margin: 0.25rem !important; background-color: #f5fbe8 !important; transition: all 0.3s ease-in-out; cursor: pointer; } /* Selected option */ input[type="radio"]:checked + label.svelte-1ipelgc { background-color: #A4B465 !important; color: white !important; border-color: #889E46 !important; } /* Unselected hover */ .gradio-container input[type="radio"] + label.svelte-1ipelgc:hover { background-color: #e0f1c4 !important; } } """ gr.HTML("""
""") def process_all(mode, workbook_file, ups, pu, fmt, w, h): # πŸ“„ Workbook mode if mode.startswith("πŸ“„"): data, msg = read_uploaded_workbook(workbook_file) if not data: # no links β†’ just show the fetch‑error return [], None, msg, None # got a list of dicts β†’ process them return process_and_zip(data, fmt, w, h) # πŸ“€ Direct image files if mode.startswith("πŸ“€") and ups: return process_uploaded_images(ups, fmt, w, h) # πŸ”— Single URL if pu and pu.strip(): return process_single_url_image(pu, fmt, w, h) # nothing valid return [], None, "⚠️ No valid input provided", None with gr.Blocks(css=css) as demo: # ─── States ─── image_data_state = gr.State([]) temp_dir_state = gr.State(None) # ─── Header ─── gr.HTML(lamp_html) with gr.Row(): gr.Markdown("

πŸ–ΌοΈ Image Processor

") # ─── Main layout ─── with gr.Row(): # ── LEFT PANEL ── with gr.Column(scale=3, min_width=500): with gr.Group(elem_classes="panel"): mode_toggle = gr.Radio( ["πŸ“„ Upload Workbook", "πŸ“€ Upload Images"], value="πŸ“„ Upload Workbook", label="Select Input Method" ) workbook_upload = gr.File( label="πŸ“‚ Upload .xlsx/.xlsm Workbook", file_types=['.xlsx', '.xlsm'], visible=True ) upload_box = gr.File( label="πŸ“ Upload Images", file_count="multiple", visible=False ) image_url_input = gr.Textbox( label="🌐 Paste Image URL", visible=False ) process_btn = gr.Button("βš™οΈ Process", elem_id="process-btn-url") with gr.Group(elem_classes="panel"): with gr.Row(): format_choice = gr.Dropdown( ["JPEG","PNG","WEBP","TIFF","GIF","JFIF","AVIF"], label="πŸ–ΌοΈ Format", value="JPEG" ) width = gr.Number(label="Width (px)", value=1000, precision=0) height = gr.Number(label="Height (px)", value=1000, precision=0) # ── RIGHT PANEL ── with gr.Column(scale=2, min_width=400): with gr.Group(elem_classes="panel"): status = gr.Textbox( label="πŸ“£ Status", lines=6, interactive=False, elem_id="status-box" ) zip_download_btn = gr.Button("πŸ“¦ Download ZIP") zip_file_hidden = gr.File(visible=False) with gr.Accordion("🧷 Individual Files", open=False): single_downloads = gr.File(label="Files", file_count="multiple") with gr.Row(elem_classes="btn-row"): stop_btn = gr.Button("Stop", elem_id="stop-btn") clear_btn = gr.Button("Clear", elem_id="clear-btn") # ─── Footer ─── gr.Markdown("
Created with πŸ’œ by Vishakha
") # ─── EVENT HOOKS ─── mode_toggle.change( fn=lambda choice: ( gr.update(visible=choice.startswith("πŸ“„")), gr.update(visible=not choice.startswith("πŸ“„")), gr.update(visible=not choice.startswith("πŸ“„")) ), inputs=[mode_toggle], outputs=[workbook_upload, upload_box, image_url_input] ) workbook_upload.change( fn=read_uploaded_workbook, inputs=[workbook_upload], outputs=[image_data_state, status] ) upload_box.change( fn=lambda files: f"{len(files)} files ready." if files else "No files selected", inputs=[upload_box], outputs=[status] ) process_btn.click( fn=process_all, inputs=[ mode_toggle, workbook_upload, upload_box, image_url_input, format_choice, width, height ], outputs=[ single_downloads, zip_file_hidden, status, temp_dir_state ] ) zip_download_btn.click( None, inputs=[zip_file_hidden], js="(file) => file && window.open(file.url, '_blank')" ) clear_btn.click( fn=clear_all, inputs=[temp_dir_state], outputs=[ workbook_upload, image_data_state, upload_box, zip_file_hidden, single_downloads, status, temp_dir_state, image_url_input ] ) stop_btn.click( fn=lambda: "πŸ›‘ Stop signal sent", outputs=[status] ) if __name__ == "__main__": demo.queue().launch(debug=True)