Image_editor / app.py
Workpam's picture
Update app.py
13989eb verified
import gradio as gr
from PIL import Image
import os
import zipfile
import pandas as pd
import requests
import tempfile
import shutil
from io import BytesIO
import threading
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import time
import random
import cloudscraper
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
stop_event = threading.Event()
cpu_cores = os.cpu_count() or 4
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Mozilla/5.0 (Linux; Android 10)",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X)"
]
PROXY_LIST = [
"http://user:pass@proxy1:port",
"http://user:pass@proxy2:port",
]
FAILED_LOG_PATH = "failed_downloads.txt"
BLOCKED_SITES = ["bigw.com.au"]
def random_headers(referer):
"""Return a rotating User‑Agent + referer header dict."""
return {
"User-Agent": random.choice(USER_AGENTS),
"Referer": referer
}
def try_download(url, headers, proxies):
"""
Attempt a simple GET through a proxy dict {"http":..., "https":...}.
Returns the response if status_code==200 and content is image, else None.
"""
try:
resp = session.get(url, headers=headers, proxies=proxies, stream=True, timeout=15)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
return resp
except Exception as e:
print(f"⚠️ Proxy download error ({proxies}): {e}")
return None
session = requests.Session()
# --- Utility Functions ---
def process_url_images(data, fmt, w, h):
return process_and_zip(data, fmt, w, h)
def process_uploaded_images(files, fmt, w, h):
tmp = tempfile.mkdtemp()
data = []
for f in files:
src = str(f)
dst = os.path.join(tmp, os.path.basename(src))
shutil.copyfile(src, dst)
data.append({"url": dst, "name": os.path.splitext(os.path.basename(dst))[0]})
return process_and_zip(data, fmt, w, h)
def process_single_url_image(url, fmt, w, h):
if not url.strip():
return [], None, "No URL provided", None
return process_and_zip([{"url": url.strip(), "name": "single"}], fmt, w, h)
handle_process = lambda mode, sd, ups, pu, fmt, w, h: (
process_url_images(sd, fmt, w, h) if mode.startswith("πŸ“„") and sd else
process_uploaded_images(ups, fmt, w, h) if mode.startswith("πŸ“€") and ups else
process_single_url_image(pu, fmt, w, h) if pu.strip() else
([], None, "⚠️ No valid input provided", None)
)
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
def fetch_with_retries(url, headers, timeout=15, retries=2, backoff=1):
"""
Try up to retries+1 times to GET url with timeout seconds.
Returns a requests.Response or None.
"""
for attempt in range(retries + 1):
try:
resp = session.get(url, headers=headers, stream=True, timeout=timeout)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
return resp
except Exception as e:
print(f"⚠️ Attempt {attempt+1} failed for {url}: {e}")
time.sleep(backoff * (attempt + 1))
return None
def download_image(url, save_path):
# Skip cached
if os.path.exists(save_path) and os.path.getsize(save_path) > 1000:
return save_path
# Skip blocked
if any(b in url for b in BLOCKED_SITES):
log_failure(url, "Skipped known slow site")
return None
referer = f"{urlparse(url).scheme}://{urlparse(url).netloc}/"
headers = random_headers(referer)
# 1) Direct with retries
print(f"πŸš€ Fetching with retries: {url}")
resp = fetch_with_retries(url, headers, timeout=15, retries=2)
if resp:
with open(save_path, 'wb') as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
print("βœ… Direct fetch succeeded")
return save_path
else:
print("❌ Direct fetch failed after retries")
# 2) cloudscraper fallback
print("🟠 cloudscraper fallback...")
try:
scraper = cloudscraper.create_scraper(sess=session)
resp = scraper.get(url, headers=headers, stream=True, timeout=15)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
with open(save_path, 'wb') as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
print("βœ… cloudscraper worked")
return save_path
except Exception as e:
print(f"❌ cloudscraper failed: {e}")
# 3) Selenium fallback
try:
print("πŸ§ͺ Headless browser fallback...")
chrome_opts = Options()
chrome_opts.add_argument("--headless")
chrome_opts.add_argument("--disable-gpu")
chrome_opts.add_argument("--no-sandbox")
chrome_opts.add_argument("--disable-dev-shm-usage")
chrome_opts.add_argument(f"user-agent={headers['User-Agent']}")
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_opts
)
driver.get(url)
time.sleep(4)
final_url = driver.current_url
cookies = {c['name']: c['value'] for c in driver.get_cookies()}
driver.quit()
with session:
for k, v in cookies.items():
session.cookies.set(k, v)
r = session.get(final_url, headers=headers, stream=True, timeout=15)
if r.status_code == 200 and "image" in r.headers.get("Content-Type", ""):
with open(save_path, 'wb') as f:
for chunk in r.iter_content(8192):
f.write(chunk)
print("βœ… Selenium + cookies worked")
return save_path
except Exception as e:
print(f"❌ Selenium failed: {e}")
# 4) Give up
log_failure(url, "All methods failed")
return None
lamp_html = """
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
background-color: #D8D2C2;
transition: background 0.4s ease;
}
body.night {
background: radial-gradient(#555, #111);
}
.lamp-fixed {
position: fixed;
top: 0;
right: 0;
width: 100px;
height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
z-index: 9999;
pointer-events: none;
}
.mount {
width: 30px;
height: 5px;
background: #111;
border-radius: 6px;
pointer-events: none;
}
.wire {
width: 4px;
height: 120px;
background: #000;
margin: 0;
pointer-events: none;
}
.blub {
position: relative;
width: 47px;
height: 47px;
background: #444;
border-radius: 50%;
cursor: pointer;
margin-top: -1px;
transition: all 0.3s ease;
z-index: 2;
pointer-events: auto;
}
.blub.night {
background: #ffec99;
box-shadow: 0 0 28px #ffec99, 0 0 56px #ffec99,
0 0 84px #ffec99, 0 0 168px #ffec99;
}
.blub.night::before {
background: #ffec99;
}
.blub.night::after {
content: "";
position: absolute;
top: 50%;
left: 50%;
transform: translate(-60%, -50%);
width: 78px;
height: 78px;
background: #ffec99;
border-radius: 100%;
filter: blur(26px);
z-index: -1;
}
.blub::before {
content: "";
position: absolute;
top: -30px;
left: 11px;
width: 25px;
height: 47px;
background: #444;
border-top: 19px solid #000;
border-radius: 8px;
transition: background 0.3s ease;
}
.blub span:nth-child(1) {
position: absolute;
top: -10px;
left: -2px;
width: 20px;
height: 18px;
background: transparent;
transform: rotate(343deg);
border-bottom-right-radius: 40px;
box-shadow: 12px 12px 0 5px #444;
transition: box-shadow 0.3s ease;
}
.blub.night span:nth-child(1) {
box-shadow: 12px 12px 0 5px #ffec99;
}
.blub span:nth-child(2) {
position: absolute;
top: -10px;
right: -2px;
width: 20px;
height: 18px;
background: transparent;
transform: rotate(18deg);
border-bottom-left-radius: 50px;
box-shadow: -12px 12px 0 5px #444;
transition: box-shadow 0.3s ease;
}
.blub.night span:nth-child(2) {
box-shadow: -12px 12px 0 5px #ffec99;
}
</style>
<div class="lamp-fixed">
<div class="mount"></div>
<div class="wire"></div>
<div class="blub" onclick="this.classList.toggle('night'); document.body.classList.toggle('night')">
<span></span>
<span></span>
</div>
</div>
"""
def resize_with_padding(img, target_w, target_h, fill=(255,255,255)):
img.thumbnail((target_w, target_h), Image.LANCZOS)
bg = Image.new("RGB", (target_w, target_h), fill)
x = (target_w - img.width) // 2
y = (target_h - img.height) // 2
bg.paste(img, (x, y))
return bg
def threaded_download_and_open(item, temp_dir):
from PIL import ImageSequence
name, src = item['name'], item['url']
try:
if os.path.exists(src):
path = src
else:
temp_path = os.path.join(temp_dir, f"{name}.gif")
path = download_image(src, temp_path)
if not path:
return (name, None, 'Download failed or invalid image')
img = Image.open(path)
# Rewind to first frame in case it's not
if getattr(img, "is_animated", False):
img.seek(0)
frames = []
for frame in ImageSequence.Iterator(img):
frame_copy = frame.convert("RGBA").copy()
frames.append(frame_copy)
return (name, frames, None)
else:
return (name, [img.convert("RGBA")], None)
except Exception as e:
return (name, None, str(e))
def resize_and_save_worker(args):
name, img, fmt, w, h, out_dir = args
try:
# 1) Composite RGBA over white
img = img.convert('RGBA')
bg = Image.new('RGBA', img.size, (255,255,255,255))
img = Image.alpha_composite(bg, img)
# 2) Determine save format & extension
fmt_upper = fmt.upper()
save_fmt = "JPEG" if fmt_upper in ("JPEG", "JPG") else fmt_upper
ext = "jpg" if fmt_upper in ("JPEG", "JPG") else fmt.lower()
# 3) Convert to RGB if needed
if save_fmt == "JPEG":
img = img.convert("RGB")
# 4) Resize with padding
img = resize_with_padding(img, w, h)
# 5) Build filename & save
out_name = f"{name}.{ext}"
out_path = os.path.join(out_dir, out_name)
img.save(out_path, format=save_fmt, quality=90)
return out_path, None
except Exception as e:
return None, f"{name}: {e}"
from multiprocessing import cpu_count
import zipfile
def process_and_zip(items, fmt, w, h):
tmp = tempfile.mkdtemp()
proc = os.path.join(tmp, 'out')
os.makedirs(proc, exist_ok=True)
files, fails = [], []
# 1) Download & open in parallel
workers = cpu_count() * 2
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(lambda it: threaded_download_and_open(it, tmp), items))
# 2) Process each downloaded image/list of frames
for name, imgs, err in results:
if err or not imgs:
fails.append(f"{name}: {err}")
continue
for i, img in enumerate(imgs):
try:
# Composite RGBA over white
img = img.convert("RGBA")
bg = Image.new("RGBA", img.size, (255,255,255,255))
img = Image.alpha_composite(bg, img)
# Determine save format & extension
fmt_upper = fmt.upper()
save_fmt = "JPEG" if fmt_upper in ("JPEG","JPG") else fmt_upper
ext = "jpg" if fmt_upper in ("JPEG","JPG") else fmt.lower()
# Convert to RGB if JPEG
if save_fmt == "JPEG":
img = img.convert("RGB")
# Resize with padding
img = resize_with_padding(img, w, h)
# Build filename
fname = (
f"{name}_frame{i+1}.{ext}"
if len(imgs) > 1
else f"{name}.{ext}"
)
out_path = os.path.join(proc, fname)
# Save & record
img.save(out_path, format=save_fmt, quality=90)
files.append(out_path)
except Exception as e:
print(f"⚠️ Save failed for {name}: {e}")
fails.append(f"{name}_frame{i+1}: {e}")
# 3) If nothing succeeded, clean up and report
if not files:
stop_event.clear()
shutil.rmtree(tmp)
return None, None, "No images processed.", None
# 4) Create ZIP in β€œstore” mode (no compression)
date_str = datetime.now().strftime("%Y-%m-%d")
zip_name = f"{date_str}.zip"
zip_path = os.path.join(tmp, zip_name)
with zipfile.ZipFile(zip_path, 'w', compression=zipfile.ZIP_STORED) as zf:
for f in files:
zf.write(f, os.path.basename(f))
# 5) Build status message
msg_lines = [f"βœ… Processed {len(files)} image(s)."]
if fails:
msg_lines.append(f"❌ Failed: {len(fails)} image(s)")
msg_lines += [f" - {fail}" for fail in fails]
stop_event.clear()
return files, zip_path, "\n".join(msg_lines), tmp
def read_uploaded_workbook(file):
if not file:
return [], "❌ No file uploaded"
try:
# read all sheets except "Cleared Data"
xls = pd.ExcelFile(file.name)
sheets = [s for s in xls.sheet_names if s.lower() != "cleared data"]
df_list = [pd.read_excel(file.name, sheet_name=s, engine="openpyxl") for s in sheets]
df = pd.concat(df_list, ignore_index=True)
df.columns = [c.strip() for c in df.columns]
item_col = next((c for c in df.columns if c.lower() == 'itemcode'), None)
if not item_col:
return [], "❌ Missing 'ItemCode' column"
url_cols = [c for c in df.columns if any(k in c.lower() for k in ["url", "image", "link"])]
data = []
for _, row in df.iterrows():
raw = row[item_col]
if pd.isna(raw):
continue
key = str(raw).strip().split('.')[0] if str(raw).strip().replace('.', '', 1).isdigit() else str(raw).strip()
idx = 0
for col in url_cols:
if pd.notna(row[col]):
name = f"{key}" if idx == 0 else f"{key}_{idx}"
data.append({"url": str(row[col]).strip(), "name": name})
idx += 1
return data, f"βœ… Fetched {len(data)} image link(s)"
except Exception as e:
return [], f"❌ Error: {e}"
def clear_all(tmp_dir):
# wipe out the temp folder if it exists
if tmp_dir and os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
return (
None, # clear the single‐file upload
[], # reset the image_data_state
[], # clear the multi‐file upload
gr.update(visible=False, value=None), # hide & clear the hidden ZIP file
[], # clear the β€œindividual downloads” list
"Cleared.", # status message
None, # reset temp_dir_state
"" # clear the URL textbox
)
# === CSS ===
css = """
body {
margin: 0;
background: #4B352A;
font-family: 'Segoe UI', sans-serif;
color: #222;
display: flex;
justify-content: center;
padding: 2rem;
transition: background 0.3s, color 0.3s;
}
.gradio-container {
max-width: 1200px; /* You can reduce this (e.g., 600px) */
width: 100%;
margin: 0 auto;
h1 {
font-size: 24px !important;
font-weight: 700 !important;
margin-bottom: 1rem !important;
color: #FFFFFF !important;
text-align: center !important;
}
.panel {
background: white;
padding: 1rem;
border-radius: 6px;
border: 1px solid #ddd;
box-shadow: 0 1px 2px rgba(0,0,0,0.05);
}
#clear-btn, #stop-btn {
margin-top: 1rem;
font-size: 0.85rem !important;
font-weight: 600 !important;
padding: 0.5rem 1rem !important;
border-radius: 8px !important;
border: none !important;
box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important;
transition: background-color 0.2s ease !important;
width: 100% !important;
}
.btn-row {
display: flex;
justify-content: space-between;
gap: 0.5rem;
}
#clear-btn {
background-color: #e74c3c !important;
color: white !important;
width: 50% !important;
}
#clear-btn:hover {
background-color: #c0392b !important;
cursor: pointer !important;
}
#stop-btn {
background-color: #CA7842 !important;
color: white !important;
width: 50% !important;
}
#stop-btn:hover {
background-color: #FFA55D !important;
cursor: pointer !important;
}
.gradio-container .gr-row > div {
padding: 4px !important;
}
#process-btn-url {
background-color: #A4B465 !important;
color: #333 !important;
}
#fetch-btn {
background-color: #cfbed7 !important;
color: #333 !important;
}
#status-box {
background-color: #ffffff;
border: 2px solid #cfbed7;
border-radius: 6px;
padding: 1rem;
font-size: 0.95rem;
color: #333;
/* Styling the toggle radio buttons */
.gradio-container label.svelte-1ipelgc {
font-weight: 600 !important;
border: 2px solid #A4B465 !important;
padding: 0.4rem 1rem !important;
border-radius: 12px !important;
margin: 0.25rem !important;
background-color: #f5fbe8 !important;
transition: all 0.3s ease-in-out;
cursor: pointer;
}
/* Selected option */
input[type="radio"]:checked + label.svelte-1ipelgc {
background-color: #A4B465 !important;
color: white !important;
border-color: #889E46 !important;
}
/* Unselected hover */
.gradio-container input[type="radio"] + label.svelte-1ipelgc:hover {
background-color: #e0f1c4 !important;
}
}
"""
gr.HTML("""
<div class="wrapper">
<div class="toggle-icon-group">
<input type="radio" id="spreadsheet" name="mode" checked>
<label for="spreadsheet"><span>πŸ“„</span> Spreadsheet</label>
<input type="radio" id="upload" name="mode">
<label for="upload"><span>πŸ“</span> Upload</label>
</div>
</div>
""")
def process_all(mode, workbook_file, ups, pu, fmt, w, h):
# 1️⃣ If workbook mode, read links from Excel
if mode.startswith("πŸ“„"):
data, msg = read_uploaded_workbook(workbook_file)
print("πŸ’‘ DEBUG – links extracted:", data) # <<< DEBUG
if not data:
# no links or error reading
return [], None, msg, None
# we have a list of {"url":…, "name":…}
files, zip_path, proc_msg, tmp = process_and_zip(data, fmt, w, h)
return files, zip_path, f"{msg}\n{proc_msg}", tmp
# 2️⃣ If β€œUpload Images” mode
if mode.startswith("πŸ“€") and ups:
return process_uploaded_images(ups, fmt, w, h)
# 3️⃣ If single‑URL mode
if pu and pu.strip():
return process_single_url_image(pu, fmt, w, h)
# 4️⃣ Nothing valid
return [], None, "⚠️ No valid input provided", None
if os.path.exists(FAILED_LOG_PATH):
with open(FAILED_LOG_PATH) as f:
failed = [line.split("\t")[1] for line in f if line.strip()]
if failed:
print(f"πŸ”„ Retrying {len(failed)} failed URLs…")
# reuse your ThreadPoolExecutor to download them to a β€œretries” folder
from concurrent.futures import ThreadPoolExecutor, as_completed
os.makedirs("retries", exist_ok=True)
def retry_one(url):
fname = os.path.join("retries", url.split("/")[-1].split("?")[0])
r = fetch_with_retries(url, random_headers(f"{urlparse(url).scheme}://{urlparse(url).netloc}/"), timeout=15, retries=2)
if r:
with open(fname, 'wb') as f:
for c in r.iter_content(8192): f.write(c)
return True
return False
with ThreadPoolExecutor(max_workers=8) as ex:
results = list(ex.map(retry_one, failed))
print(f"βœ… Retried successes: {sum(results)} / {len(failed)}")
with gr.Blocks(css=css) as demo:
# ─── States ───
image_data_state = gr.State([])
temp_dir_state = gr.State(None)
# ─── Header ───
gr.HTML(lamp_html)
with gr.Row():
gr.Markdown("<h1>πŸ–ΌοΈ Image Processor</h1>")
# ─── Main layout ───
with gr.Row():
# ── LEFT PANEL ──
with gr.Column(scale=3, min_width=500):
with gr.Group(elem_classes="panel"):
mode_toggle = gr.Radio(
["πŸ“„ Upload Workbook", "πŸ“€ Upload Images"],
value="πŸ“„ Upload Workbook",
label="Select Input Method"
)
workbook_upload = gr.File(
label="πŸ“‚ Upload .xlsx/.xlsm Workbook",
file_types=['.xlsx', '.xlsm'],
visible=True
)
upload_box = gr.File(
label="πŸ“ Upload Images",
file_count="multiple",
visible=False
)
image_url_input = gr.Textbox(
label="🌐 Paste Image URL",
visible=False
)
process_btn = gr.Button("βš™οΈ Process", elem_id="process-btn-url")
with gr.Group(elem_classes="panel"):
with gr.Row():
format_choice = gr.Dropdown(
["JPG","PNG","WEBP","TIFF","GIF","JFIF","AVIF"],
label="πŸ–ΌοΈ Format",
value="JPG"
)
width = gr.Number(label="Width (px)", value=1000, precision=0)
height = gr.Number(label="Height (px)", value=1000, precision=0)
# ── RIGHT PANEL ──
with gr.Column(scale=2, min_width=400):
with gr.Group(elem_classes="panel"):
status = gr.Textbox(
label="πŸ“£ Status",
lines=6,
interactive=False,
elem_id="status-box"
)
zip_download_btn = gr.Button("πŸ“¦ Download ZIP")
zip_file_hidden = gr.File(visible=False)
with gr.Accordion("🧷 Individual Files", open=False):
single_downloads = gr.File(label="Files", file_count="multiple")
with gr.Row(elem_classes="btn-row"):
stop_btn = gr.Button("Stop", elem_id="stop-btn")
clear_btn = gr.Button("Clear", elem_id="clear-btn")
# ─── Footer ───
gr.HTML(
"<div style='width:100%; text-align:center; margin-top:1rem; color:white;'>"
"Created with πŸ’œ by Vishakha"
"</div>"
)
# ─── EVENT HOOKS ───
mode_toggle.change(
fn=lambda choice: (
gr.update(visible=choice.startswith("πŸ“„")),
gr.update(visible=not choice.startswith("πŸ“„")),
gr.update(visible=not choice.startswith("πŸ“„"))
),
inputs=[mode_toggle],
outputs=[workbook_upload, upload_box, image_url_input]
)
workbook_upload.change(
fn=read_uploaded_workbook,
inputs=[workbook_upload],
outputs=[image_data_state, status]
)
upload_box.change(
fn=lambda files: f"{len(files)} files ready." if files else "No files selected",
inputs=[upload_box],
outputs=[status]
)
process_btn.click(
fn=process_all,
inputs=[
mode_toggle,
workbook_upload,
upload_box,
image_url_input,
format_choice,
width,
height
],
outputs=[
single_downloads,
zip_file_hidden,
status,
temp_dir_state
]
)
zip_download_btn.click(
None,
inputs=[zip_file_hidden],
js="(file) => file && window.open(file.url, '_blank')"
)
clear_btn.click(
fn=clear_all,
inputs=[temp_dir_state],
outputs=[
workbook_upload, image_data_state, upload_box,
zip_file_hidden, single_downloads, status,
temp_dir_state, image_url_input
]
)
stop_btn.click(
fn=lambda: "πŸ›‘ Stop signal sent",
outputs=[status]
)
if __name__ == "__main__":
demo.queue().launch(debug=True)