Image_editor / app.py
Workpam's picture
Update app.py
be43e95 verified
raw
history blame
22 kB
import gradio as gr
from PIL import Image
import os
import zipfile
import pandas as pd
import requests
import tempfile
import shutil
from io import BytesIO
import threading
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
import time
import random
import cloudscraper
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
stop_event = threading.Event()
cpu_cores = os.cpu_count() or 4
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"Mozilla/5.0 (Linux; Android 10)",
"Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X)"
]
PROXY_LIST = [
"http://user:pass@proxy1:port",
"http://user:pass@proxy2:port",
]
FAILED_LOG_PATH = "failed_downloads.txt"
BLOCKED_SITES = ["bigw.com.au"]
session = requests.Session()
# --- Utility Functions ---
def process_url_images(data, fmt, w, h):
return process_and_zip(data, fmt, w, h)
def process_uploaded_images(files, fmt, w, h):
tmp = tempfile.mkdtemp()
data = []
for f in files:
src = str(f)
dst = os.path.join(tmp, os.path.basename(src))
shutil.copyfile(src, dst)
data.append({"url": dst, "name": os.path.splitext(os.path.basename(dst))[0]})
return process_and_zip(data, fmt, w, h)
def process_single_url_image(url, fmt, w, h):
if not url.strip():
return [], None, "No URL provided", None
return process_and_zip([{"url": url.strip(), "name": "single"}], fmt, w, h)
handle_process = lambda mode, sd, ups, pu, fmt, w, h: (
process_url_images(sd, fmt, w, h) if mode.startswith("πŸ“„") and sd else
process_uploaded_images(ups, fmt, w, h) if mode.startswith("πŸ“€") and ups else
process_single_url_image(pu, fmt, w, h) if pu.strip() else
([], None, "⚠️ No valid input provided", None)
)
import cloudscraper
def download_image(url, save_path):
if os.path.exists(save_path) and os.path.getsize(save_path) > 1000:
print(f"βœ… Skipping cached: {save_path}")
return save_path
if any(blocked in url for blocked in BLOCKED_SITES):
log_failure(url, "Skipped known slow site")
return None
referer = f"{urlparse(url).scheme}://{urlparse(url).netloc}/"
headers = random_headers(referer)
# 1. Try direct
try:
print("πŸš€ Trying direct...")
resp = session.get(url, headers=headers, stream=True, timeout=8)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
with open(save_path, 'wb') as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
print("βœ… Direct worked")
return save_path
except Exception as e:
print(f"⚠️ Direct failed: {e}")
# 2. Try proxies in parallel
if PROXY_LIST:
headers = random_headers(referer)
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(try_download, url, headers, {"http": p, "https": p}): p
for p in random.sample(PROXY_LIST, min(5, len(PROXY_LIST)))
}
for future in as_completed(futures):
result = future.result()
if result:
with open(save_path, 'wb') as f:
for chunk in result.iter_content(8192):
f.write(chunk)
print("βœ… Proxy worked:", futures[future])
return save_path
# 3. cloudscraper fallback
try:
print("🟠 cloudscraper fallback...")
scraper = cloudscraper.create_scraper(sess=session)
resp = scraper.get(url, headers=headers, stream=True, timeout=12)
if resp.status_code == 200 and "image" in resp.headers.get("Content-Type", ""):
with open(save_path, 'wb') as f:
for chunk in resp.iter_content(8192):
f.write(chunk)
print("βœ… cloudscraper worked")
return save_path
except Exception as e:
print(f"❌ cloudscraper failed: {e}")
# 4. Final fallback: Selenium
try:
print("πŸ§ͺ Headless browser fallback...")
chrome_opts = Options()
chrome_opts.add_argument("--headless")
chrome_opts.add_argument("--disable-gpu")
chrome_opts.add_argument("--no-sandbox")
chrome_opts.add_argument("--disable-dev-shm-usage")
chrome_opts.add_argument(f"user-agent={headers['User-Agent']}")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=chrome_opts)
driver.get(url)
time.sleep(4)
final_url = driver.current_url
cookies = {c['name']: c['value'] for c in driver.get_cookies()}
driver.quit()
with session:
for k, v in cookies.items():
session.cookies.set(k, v)
r = session.get(final_url, headers=headers, stream=True, timeout=15)
if r.status_code == 200 and 'image' in r.headers.get("Content-Type", ""):
with open(save_path, 'wb') as f:
for chunk in r.iter_content(8192):
f.write(chunk)
print("βœ… Selenium + cookies worked")
return save_path
else:
log_failure(url, f"Selenium bad response: {r.status_code}")
except Exception as e:
print(f"❌ Selenium failed: {e}")
log_failure(url, f"Selenium exception: {e}")
# Fail
log_failure(url, "All methods failed")
return None
lamp_html = """
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
background-color: #D8D2C2;
transition: background 0.4s ease;
}
body.night {
background: radial-gradient(#555, #111);
}
.lamp-fixed {
position: fixed;
top: 0;
right: 0;
width: 100px;
height: 100vh;
display: flex;
flex-direction: column;
align-items: center;
z-index: 9999;
pointer-events: none;
}
.mount {
width: 30px;
height: 5px;
background: #111;
border-radius: 6px;
pointer-events: none;
}
.wire {
width: 4px;
height: 120px;
background: #000;
margin: 0;
pointer-events: none;
}
.blub {
position: relative;
width: 47px;
height: 47px;
background: #444;
border-radius: 50%;
cursor: pointer;
margin-top: -1px;
transition: all 0.3s ease;
z-index: 2;
pointer-events: auto;
}
.blub.night {
background: #ffec99;
box-shadow: 0 0 28px #ffec99, 0 0 56px #ffec99,
0 0 84px #ffec99, 0 0 168px #ffec99;
}
.blub.night::before {
background: #ffec99;
}
.blub.night::after {
content: "";
position: absolute;
top: 50%;
left: 50%;
transform: translate(-60%, -50%);
width: 78px;
height: 78px;
background: #ffec99;
border-radius: 100%;
filter: blur(26px);
z-index: -1;
}
.blub::before {
content: "";
position: absolute;
top: -30px;
left: 11px;
width: 25px;
height: 47px;
background: #444;
border-top: 19px solid #000;
border-radius: 8px;
transition: background 0.3s ease;
}
.blub span:nth-child(1) {
position: absolute;
top: -10px;
left: -2px;
width: 20px;
height: 18px;
background: transparent;
transform: rotate(343deg);
border-bottom-right-radius: 40px;
box-shadow: 12px 12px 0 5px #444;
transition: box-shadow 0.3s ease;
}
.blub.night span:nth-child(1) {
box-shadow: 12px 12px 0 5px #ffec99;
}
.blub span:nth-child(2) {
position: absolute;
top: -10px;
right: -2px;
width: 20px;
height: 18px;
background: transparent;
transform: rotate(18deg);
border-bottom-left-radius: 50px;
box-shadow: -12px 12px 0 5px #444;
transition: box-shadow 0.3s ease;
}
.blub.night span:nth-child(2) {
box-shadow: -12px 12px 0 5px #ffec99;
}
</style>
<div class="lamp-fixed">
<div class="mount"></div>
<div class="wire"></div>
<div class="blub" onclick="this.classList.toggle('night'); document.body.classList.toggle('night')">
<span></span>
<span></span>
</div>
</div>
"""
def resize_with_padding(img, target_w, target_h, fill=(255,255,255)):
img.thumbnail((target_w, target_h), Image.LANCZOS)
bg = Image.new("RGB", (target_w, target_h), fill)
x = (target_w - img.width) // 2
y = (target_h - img.height) // 2
bg.paste(img, (x, y))
return bg
def threaded_download_and_open(item, temp_dir):
from PIL import ImageSequence
name, src = item['name'], item['url']
try:
if os.path.exists(src):
path = src
else:
temp_path = os.path.join(temp_dir, f"{name}.gif")
path = download_image(src, temp_path)
if not path:
return (name, None, 'Download failed or invalid image')
img = Image.open(path)
# Rewind to first frame in case it's not
if getattr(img, "is_animated", False):
img.seek(0)
frames = []
for frame in ImageSequence.Iterator(img):
frame_copy = frame.convert("RGBA").copy()
frames.append(frame_copy)
return (name, frames, None)
else:
return (name, [img.convert("RGBA")], None)
except Exception as e:
return (name, None, str(e))
def resize_and_save_worker(args):
name, img, fmt, w, h, out_dir = args
try:
img = img.convert('RGBA')
bg = Image.new('RGBA', img.size, (255,255,255,255))
img = Image.alpha_composite(bg, img)
if fmt.upper() == 'JPEG':
img = img.convert('RGB')
img = resize_with_padding(img, w, h)
out_name = f"{name}.{fmt.lower()}"
out_path = os.path.join(out_dir, out_name)
img.save(out_path, format=fmt.upper(), quality=90)
return out_path, None
except Exception as e:
return None, f"{name}: {e}"
def process_and_zip(items, fmt, w, h):
tmp = tempfile.mkdtemp()
proc = os.path.join(tmp, 'out')
os.makedirs(proc, exist_ok=True)
files, fails = [], []
with ThreadPoolExecutor(max_workers=8) as ex:
results = list(ex.map(lambda it: threaded_download_and_open(it, tmp), items))
for name, imgs, err in results:
if stop_event.is_set():
print("πŸ›‘ Stopped before processing.")
break
if err or imgs is None:
fails.append(f"{name}: {err}")
continue
for i, img in enumerate(imgs):
try:
img = img.convert('RGBA')
bg = Image.new('RGBA', img.size, (255, 255, 255, 255))
img = Image.alpha_composite(bg, img)
if fmt.upper() == 'JPEG':
img = img.convert('RGB')
img = resize_with_padding(img, w, h)
fname = f"{name}_frame{i+1}.{fmt.lower()}" if len(imgs) > 1 else f"{name}.{fmt.lower()}"
path = os.path.join(proc, fname)
img.save(path, format=fmt.upper(), quality=90)
files.append(path)
except Exception as e:
fails.append(f"{name}_frame{i+1}: {e}")
if not files:
stop_event.clear()
shutil.rmtree(tmp)
return None, None, "No images processed.", None
date_str = datetime.now().strftime("%Y-%m-%d")
zip_name = f"{date_str}.zip"
zip_path = os.path.join(tmp, zip_name)
with zipfile.ZipFile(zip_path, 'w') as zf:
for f in files:
zf.write(f, os.path.basename(f))
msg_lines = [f"βœ… Processed {len(files)} image(s)."]
if fails:
msg_lines.append(f"❌ Failed: {len(fails)} image(s)")
msg_lines += [f" - {fail}" for fail in fails]
stop_event.clear()
return files, zip_path, "\n".join(msg_lines), tmp
def read_uploaded_workbook(file):
if not file:
return [], "❌ No file uploaded"
try:
# read all sheets except "Cleared Data"
xls = pd.ExcelFile(file.name)
sheets = [s for s in xls.sheet_names if s.lower() != "cleared data"]
df_list = [pd.read_excel(file.name, sheet_name=s, engine="openpyxl") for s in sheets]
df = pd.concat(df_list, ignore_index=True)
df.columns = [c.strip() for c in df.columns]
item_col = next((c for c in df.columns if c.lower() == 'itemcode'), None)
if not item_col:
return [], "❌ Missing 'ItemCode' column"
url_cols = [c for c in df.columns if any(k in c.lower() for k in ["url", "image", "link"])]
data = []
for _, row in df.iterrows():
raw = row[item_col]
if pd.isna(raw):
continue
key = str(raw).strip().split('.')[0] if str(raw).strip().replace('.', '', 1).isdigit() else str(raw).strip()
idx = 0
for col in url_cols:
if pd.notna(row[col]):
name = f"{key}" if idx == 0 else f"{key}_{idx}"
data.append({"url": str(row[col]).strip(), "name": name})
idx += 1
return data, f"βœ… Fetched {len(data)} image link(s)"
except Exception as e:
return [], f"❌ Error: {e}"
def clear_all(tmp_dir):
if tmp_dir and os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
return "", [], [], gr.update(visible=False), [], "Cleared.", None, ""
# === CSS ===
css = """
body {
margin: 0;
background: #4B352A;
font-family: 'Segoe UI', sans-serif;
color: #222;
display: flex;
justify-content: center;
padding: 2rem;
transition: background 0.3s, color 0.3s;
}
.gradio-container {
max-width: 1200px; /* You can reduce this (e.g., 600px) */
width: 100%;
margin: 0 auto;
h1 {
font-size: 24px !important;
font-weight: 700 !important;
margin-bottom: 1rem !important;
color: #FFFFFF !important;
text-align: center !important;
}
.panel {
background: white;
padding: 1rem;
border-radius: 6px;
border: 1px solid #ddd;
box-shadow: 0 1px 2px rgba(0,0,0,0.05);
}
#clear-btn, #stop-btn {
margin-top: 1rem;
font-size: 0.85rem !important;
font-weight: 600 !important;
padding: 0.5rem 1rem !important;
border-radius: 8px !important;
border: none !important;
box-shadow: 0 2px 4px rgba(0,0,0,0.1) !important;
transition: background-color 0.2s ease !important;
width: 100% !important;
}
.btn-row {
display: flex;
justify-content: space-between;
gap: 0.5rem;
}
#clear-btn {
background-color: #e74c3c !important;
color: white !important;
width: 50% !important;
}
#clear-btn:hover {
background-color: #c0392b !important;
cursor: pointer !important;
}
#stop-btn {
background-color: #CA7842 !important;
color: white !important;
width: 50% !important;
}
#stop-btn:hover {
background-color: #FFA55D !important;
cursor: pointer !important;
}
.gradio-container .gr-row > div {
padding: 4px !important;
}
#process-btn-url {
background-color: #A4B465 !important;
color: #333 !important;
}
#fetch-btn {
background-color: #cfbed7 !important;
color: #333 !important;
}
#status-box {
background-color: #ffffff;
border: 2px solid #cfbed7;
border-radius: 6px;
padding: 1rem;
font-size: 0.95rem;
color: #333;
/* Styling the toggle radio buttons */
.gradio-container label.svelte-1ipelgc {
font-weight: 600 !important;
border: 2px solid #A4B465 !important;
padding: 0.4rem 1rem !important;
border-radius: 12px !important;
margin: 0.25rem !important;
background-color: #f5fbe8 !important;
transition: all 0.3s ease-in-out;
cursor: pointer;
}
/* Selected option */
input[type="radio"]:checked + label.svelte-1ipelgc {
background-color: #A4B465 !important;
color: white !important;
border-color: #889E46 !important;
}
/* Unselected hover */
.gradio-container input[type="radio"] + label.svelte-1ipelgc:hover {
background-color: #e0f1c4 !important;
}
}
"""
gr.HTML("""
<div class="wrapper">
<div class="toggle-icon-group">
<input type="radio" id="spreadsheet" name="mode" checked>
<label for="spreadsheet"><span>πŸ“„</span> Spreadsheet</label>
<input type="radio" id="upload" name="mode">
<label for="upload"><span>πŸ“</span> Upload</label>
</div>
</div>
""")
def process_all(mode, workbook_file, ups, pu, fmt, w, h):
# πŸ“„ Workbook mode
if mode.startswith("πŸ“„"):
data, msg = read_uploaded_workbook(workbook_file)
if not data:
# no links β†’ just show the fetch‑error
return [], None, msg, None
# got a list of dicts β†’ process them
return process_and_zip(data, fmt, w, h)
# πŸ“€ Direct image files
if mode.startswith("πŸ“€") and ups:
return process_uploaded_images(ups, fmt, w, h)
# πŸ”— Single URL
if pu and pu.strip():
return process_single_url_image(pu, fmt, w, h)
# nothing valid
return [], None, "⚠️ No valid input provided", None
with gr.Blocks(css=css) as demo:
# ─── States ───
image_data_state = gr.State([])
temp_dir_state = gr.State(None)
# ─── Header ───
gr.HTML(lamp_html)
with gr.Row():
gr.Markdown("<h1>πŸ–ΌοΈ Image Processor</h1>")
# ─── Main layout ───
with gr.Row():
# ── LEFT PANEL ──
with gr.Column(scale=3, min_width=500):
with gr.Group(elem_classes="panel"):
mode_toggle = gr.Radio(
["πŸ“„ Upload Workbook", "πŸ“€ Upload Images"],
value="πŸ“„ Upload Workbook",
label="Select Input Method"
)
workbook_upload = gr.File(
label="πŸ“‚ Upload .xlsx/.xlsm Workbook",
file_types=['.xlsx', '.xlsm'],
visible=True
)
upload_box = gr.File(
label="πŸ“ Upload Images",
file_count="multiple",
visible=False
)
image_url_input = gr.Textbox(
label="🌐 Paste Image URL",
visible=False
)
process_btn = gr.Button("βš™οΈ Process", elem_id="process-btn-url")
with gr.Group(elem_classes="panel"):
with gr.Row():
format_choice = gr.Dropdown(
["JPEG","PNG","WEBP","TIFF","GIF","JFIF","AVIF"],
label="πŸ–ΌοΈ Format",
value="JPEG"
)
width = gr.Number(label="Width (px)", value=1000, precision=0)
height = gr.Number(label="Height (px)", value=1000, precision=0)
# ── RIGHT PANEL ──
with gr.Column(scale=2, min_width=400):
with gr.Group(elem_classes="panel"):
status = gr.Textbox(
label="πŸ“£ Status",
lines=6,
interactive=False,
elem_id="status-box"
)
zip_download_btn = gr.Button("πŸ“¦ Download ZIP")
zip_file_hidden = gr.File(visible=False)
with gr.Accordion("🧷 Individual Files", open=False):
single_downloads = gr.File(label="Files", file_count="multiple")
with gr.Row(elem_classes="btn-row"):
stop_btn = gr.Button("Stop", elem_id="stop-btn")
clear_btn = gr.Button("Clear", elem_id="clear-btn")
# ─── Footer ───
gr.Markdown("<center style='margin-top:1rem;color:white'>Created with πŸ’œ by Vishakha</center>")
# ─── EVENT HOOKS ───
mode_toggle.change(
fn=lambda choice: (
gr.update(visible=choice.startswith("πŸ“„")),
gr.update(visible=not choice.startswith("πŸ“„")),
gr.update(visible=not choice.startswith("πŸ“„"))
),
inputs=[mode_toggle],
outputs=[workbook_upload, upload_box, image_url_input]
)
workbook_upload.change(
fn=read_uploaded_workbook,
inputs=[workbook_upload],
outputs=[image_data_state, status]
)
upload_box.change(
fn=lambda files: f"{len(files)} files ready." if files else "No files selected",
inputs=[upload_box],
outputs=[status]
)
process_btn.click(
fn=process_all,
inputs=[
mode_toggle,
workbook_upload,
upload_box,
image_url_input,
format_choice,
width,
height
],
outputs=[
single_downloads,
zip_file_hidden,
status,
temp_dir_state
]
)
zip_download_btn.click(
None,
inputs=[zip_file_hidden],
js="(file) => file && window.open(file.url, '_blank')"
)
clear_btn.click(
fn=clear_all,
inputs=[temp_dir_state],
outputs=[
workbook_upload, image_data_state, upload_box,
zip_file_hidden, single_downloads, status,
temp_dir_state, image_url_input
]
)
stop_btn.click(
fn=lambda: "πŸ›‘ Stop signal sent",
outputs=[status]
)
if __name__ == "__main__":
demo.queue().launch(debug=True)