import os, uuid, datetime, traceback from pathlib import Path import html as _py_html import pandas as pd import gradio as gr from huggingface_hub import hf_hub_download from urllib.parse import unquote # add at top from huggingface_hub.errors import EntryNotFoundError, RepositoryNotFoundError # ----------- HF DATASET CONFIG ----------- HF_DATASET_REPO = "akazemian/audio-html" # <-- change if needed INDEX_FILENAME = "index.csv" # ----------------------------------------- DB_PATH = "library.csv" ALLOWED_EXTS = {".html"} # Columns in DB EXTRA_COLS = ["category", "dataset", "hf_path"] # <-- add hf_path here BASE_COLS = ["id","filename","path","tags","keywords","notes","uploaded_at"] ALL_DB_COLS = BASE_COLS + EXTRA_COLS # Columns shown in the table (don't show hf_path) TABLE_COLS = ["id","filename","category","dataset", "tags","keywords","notes","uploaded_at"] # ---------- Load HF index ---------- # ----------- HF DATASET CONFIG ----------- HF_DATASET_REPO = "akazemian/audio-html" # dataset holding the HTMLs + index.csv INDEX_FILENAME = "index.csv" # ----------------------------------------- # Always read index.csv from the DATASET (not the Space) HF_INDEX_REPO_ID = HF_DATASET_REPO HF_INDEX_REPO_TYPE = "dataset" # at the top import os from huggingface_hub import login def _refresh_hf_path_from_index(db_row_id: str, filename: str, current_model_hint: str | None = None) -> str | None: """ Look up correct relpath for `filename` in index.csv. If `current_model_hint` is given, only consider rows whose relpath starts with that model (handles shard dirs). Persist the fixed hf_path back into library.csv for row `db_row_id`. """ try: idx = _load_hf_index() except Exception: return None df = idx.copy() df["filename"] = df["filename"].astype(str) df["relpath"] = df["relpath"].astype(str) if current_model_hint: model = unquote(str(current_model_hint).strip()) df = df[df["relpath"].str.startswith(f"{model}/")] hits = df[df["filename"] == str(filename)] if hits.empty: return None relpath = unquote(str(hits.iloc[0]["relpath"])) # includes shard subdir new_hf_path = f"hf://{HF_DATASET_REPO}/{relpath}" # persist the fix db = _load_db() i = db.index[db["id"] == db_row_id] if len(i): db.at[i[0], "hf_path"] = new_hf_path _save_db(db) return new_hf_path # --- add near top --- def parse_hf_uri(uri: str): """ hf://// -> (repo_id 'user/repo', relpath) """ assert uri.startswith("hf://") rest = uri[len("hf://"):] parts = rest.split("/", 2) if len(parts) < 3: raise ValueError(f"Bad hf:// uri: {uri}") user, repo, relpath = parts[0], parts[1], parts[2] return f"{user}/{repo}", relpath # ---------- DB helpers ---------- def _load_db() -> pd.DataFrame: if os.path.exists(DB_PATH): df = pd.read_csv(DB_PATH) for c in ALL_DB_COLS: if c not in df.columns: df[c] = "" for c in ["tags","keywords","notes","category","dataset","hf_path","path","filename","id","uploaded_at"]: df[c] = df[c].fillna("").astype(str) return df[ALL_DB_COLS] return pd.DataFrame(columns=ALL_DB_COLS) def _save_db(df: pd.DataFrame): df.to_csv(DB_PATH, index=False) # ---------- Table normalizer ---------- def _df_from_table_value(table_value): cols = TABLE_COLS if isinstance(table_value, pd.DataFrame): for c in cols: if c not in table_value.columns: table_value[c] = "" return table_value[cols] if isinstance(table_value, list): if not table_value: return pd.DataFrame(columns=cols) first = table_value[0] if isinstance(first, dict): df = pd.DataFrame(table_value) for c in cols: if c not in df.columns: df[c] = "" return df[cols] else: return pd.DataFrame(table_value, columns=cols) return pd.DataFrame(columns=cols) def _load_hf_index() -> pd.DataFrame: """ Download + read index.csv from the HF *dataset* repo. Required columns: id, filename, relpath, category, dataset, tags, keywords, notes, uploaded_at """ local = hf_hub_download( repo_id=HF_INDEX_REPO_ID, # = HF_DATASET_REPO repo_type=HF_INDEX_REPO_TYPE, # = "dataset" filename=INDEX_FILENAME, ) # Optional: log where we loaded from (shows in Space logs) print(f"[index] loaded from {HF_INDEX_REPO_TYPE}:{HF_INDEX_REPO_ID}/{INDEX_FILENAME} -> {local}") df = pd.read_csv(local) for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]: if c not in df.columns: df[c] = "" for c in ["id","filename","relpath","category","dataset","tags","keywords","notes","uploaded_at"]: df[c] = df[c].fillna("").astype(str) return df # ---------- Sync by model (prefix inside HF dataset) ---------- from urllib.parse import unquote # ensure this import exists at top def sync_model(model_name: str): raw = (model_name or "").strip() if not raw: return gr.Info("Please enter a model name."), None, None, None, "", "" try: idx = _load_hf_index() # must have columns: id,filename,relpath,category,dataset,tags,keywords,notes,uploaded_at except Exception as e: traceback.print_exc() return gr.Info(f"Failed to load index from HF: {e}"), None, None, None, "", "" # accept both '=' and '%3D' in the model folder name decoded = unquote(raw) rel = idx["relpath"].astype(str) sub = idx[rel.str.startswith(f"{raw}/") | rel.str.startswith(f"{decoded}/")].copy() if sub.empty: return gr.Info( f"No HTML files found for model '{raw}'. " "Tip: if you copied from the URL, use '=' instead of '%3D'." ), None, None, None, "", "" # make the hf_path we'll store locally sub["hf_path"] = sub["relpath"].apply(lambda rp: f"hf://{HF_DATASET_REPO}/{rp}") # keep only the columns we want to pull from index into library cols_from_idx = ["filename","hf_path","category","dataset","tags","keywords","notes","uploaded_at"] sub = sub[cols_from_idx].copy() db = _load_db() # 1) update existing rows matched by filename if not db.empty: # merge to bring in new metadata merged = db.merge(sub, on="filename", how="left", suffixes=("", "_idx")) # for each field, prefer index value if present; otherwise keep existing for col in ["hf_path","category","dataset","tags","keywords","notes","uploaded_at"]: idx_col = f"{col}_idx" if idx_col in merged.columns: merged[col] = merged[idx_col].where(merged[idx_col].astype(str) != "", merged[col]) merged.drop(columns=[idx_col], inplace=True) db = merged # 2) add truly new rows (filenames not present yet) existing_fns = set(db["filename"].astype(str)) if not db.empty else set() new_rows = sub[~sub["filename"].astype(str).isin(existing_fns)].copy() if not new_rows.empty: # build missing base columns new_rows["id"] = [uuid.uuid4().hex[:8] for _ in range(len(new_rows))] new_rows["path"] = "" # unknown locally in HF mode # reorder to library schema new_rows = new_rows[["id","filename","path","tags","keywords","notes","uploaded_at","category","dataset","hf_path"]] db = pd.concat([db, new_rows], ignore_index=True) _save_db(db) # show only this model in the UI current_model = decoded return refresh_view("", [], "", "", current_model) + (current_model,) # ---------- Search / filters ---------- from urllib.parse import unquote # ensure at top of file from urllib.parse import unquote # at top def refresh_view(query, tag_filters, category_filter, dataset_filter, current_model): df = _load_db() # Ensure required columns exist (robust against old CSVs) for col in ["id","filename","category","dataset","tags","keywords","notes","uploaded_at","path","hf_path"]: if col not in df.columns: df[col] = "" # Scope to current model prefix (HF sharded paths still start with model/) if current_model: model = unquote(str(current_model).strip()) hf_prefix = f"hf://{HF_DATASET_REPO}/{model}/" mask = ( df["hf_path"].astype(str).str.startswith(hf_prefix) | df["path"].astype(str).str.startswith(hf_prefix) # for legacy rows | df["path"].astype(str).str.contains(f"/{model}/") # local fallback ) df = df[mask] # Free-text search (across common text fields) if query: q = str(query).lower() mask = ( df["filename"].astype(str).str.lower().str.contains(q, na=False) | df["tags"].astype(str).str.lower().str.contains(q, na=False) | df["keywords"].astype(str).str.lower().str.contains(q, na=False) | df["notes"].astype(str).str.lower().str.contains(q, na=False) | df["category"].astype(str).str.lower().str.contains(q, na=False) | df["dataset"].astype(str).str.lower().str.contains(q, na=False) ) df = df[mask] # Tag filter (AND semantics) for t in (tag_filters or []): t = str(t).strip() if t: df = df[df["tags"].astype(str).apply( lambda s: t in [x.strip() for x in str(s).split(",") if x.strip()] )] # Dropdown filters (exact match) if category_filter: df = df[df["category"] == category_filter] if dataset_filter: df = df[df["dataset"] == dataset_filter] # Build vocab choicesโ€”ALWAYS define these, even if df is empty if not df.empty: all_tags = sorted({t.strip() for s in df["tags"].astype(str).tolist() for t in s.split(",") if t.strip()}) all_cats = sorted([c for c in df["category"].astype(str).unique() if c]) all_sets = sorted([c for c in df["dataset"].astype(str).unique() if c]) else: all_tags, all_cats, all_sets = [], [], [] # Compose the table view safely df = df.sort_values("uploaded_at", ascending=False, na_position="last").reset_index(drop=True) view = pd.DataFrame(columns=TABLE_COLS) if not df.empty: # Ensure all TABLE_COLS exist before projection for c in TABLE_COLS: if c not in df.columns: df[c] = "" view = df[TABLE_COLS].copy() count_text = f"**Showing {len(view)} file(s)**" return ( view, gr.update(choices=all_tags), gr.update(choices=[""] + all_cats, value=category_filter or ""), gr.update(choices=[""] + all_sets, value=dataset_filter or ""), count_text ) # ---------- Preview ---------- def _iframe_from_html_string(raw_html: str, height_px: int = 720) -> str: srcdoc = raw_html.replace("&", "&").replace('"', """) return f'' def select_row(evt: gr.SelectData, table_value, source_mode, current_model): try: view = _df_from_table_value(table_value) if view.empty: return "No rows.", "" # --- resolve row_idx robustly --- row_idx = None ix = getattr(evt, "index", None) if isinstance(ix, int): row_idx = ix elif isinstance(ix, (list, tuple)) and ix and isinstance(ix[0], int): row_idx = ix[0] if row_idx is None: val = getattr(evt, "value", None) if isinstance(val, dict) and "id" in val: hits = view.index[view["id"] == val["id"]].tolist() if hits: row_idx = hits[0] elif isinstance(val, list) and len(val) >= 1: hits = view.index[view["id"] == val[0]].tolist() if hits: row_idx = hits[0] if row_idx is None: row_idx = 0 if not (0 <= row_idx < len(view)): return "Invalid selection.", "" row = view.iloc[row_idx] sel_id = row["id"] # --- look up the full record from DB --- db = _load_db() rec = db[db["id"] == sel_id] if rec.empty: return "Could not find file for this row.", "" # --- choose source: HF vs Local --- use_hf = (str(source_mode).upper() == "HF") path_str = rec["hf_path"].values[0] if use_hf else rec["path"].values[0] path_str = str(path_str or "") if not path_str: return "No path available for this source.", f"๐Ÿ“„ {row['filename']}" # HF dataset URI โ†’ lazy download then iframe from raw HTML if path_str.startswith("hf://"): repo_id, relpath = parse_hf_uri(path_str) relpath = unquote(relpath) try: # first attempt: whatever is saved in library.csv local_path = hf_hub_download( repo_id=repo_id, repo_type="dataset", filename=relpath, local_files_only=False, ) except EntryNotFoundError: # fix relpath from index.csv, **scoped by active model** (handles shard dir) fixed_hf_path = _refresh_hf_path_from_index( sel_id, row["filename"], current_model_hint=current_model ) if not fixed_hf_path: msg = ( f"Entry not found for {_py_html.escape(relpath)}, " f"and no matching row for {_py_html.escape(str(row['filename']))} " f"under the current model in index.csv." ) return msg, f"๐Ÿ“„ {row['filename']}" repo_id, relpath = parse_hf_uri(fixed_hf_path) relpath = unquote(relpath) # single retry with corrected shard-aware relpath local_path = hf_hub_download( repo_id=repo_id, repo_type="dataset", filename=relpath, local_files_only=False, ) except RepositoryNotFoundError: return ( "Dataset repo not accessible. If it's private, set an HF token in Space Secrets " "as HF_TOKEN (or HUGGINGFACE_HUB_TOKEN) and restart.", f"๐Ÿ“„ {row['filename']}", ) # success raw_html = Path(local_path).read_text(encoding="utf-8") iframe = _iframe_from_html_string(raw_html, height_px=720) return iframe, f"๐Ÿ“„ {row['filename']}" # Direct HTTP URL (CDN) if path_str.startswith("http"): iframe = f'' return iframe, f"๐Ÿ“„ {row['filename']}" # Local file fallback p = Path(path_str) if not p.exists(): return f"File not found: {_py_html.escape(str(p))}", f"๐Ÿ“„ {row['filename']}" raw_html = p.read_text(encoding="utf-8") iframe = _iframe_from_html_string(raw_html, height_px=720) return iframe, f"๐Ÿ“„ {row['filename']}" except EntryNotFoundError: # just in case one bubbles up return "File not found in dataset even after shard fix.", "" except Exception as e: traceback.print_exc() return f"
Failed to render (see terminal):\n{_py_html.escape(str(e))}
", "" # ---------- Save edits ---------- def save_edits(edited_table, current_model): if edited_table is None or not len(edited_table): return gr.Info("Nothing to save.") df_db = _load_db() editable_cols = ["category","dataset","tags","keywords","notes"] for c in editable_cols: edited_table[c] = edited_table[c].fillna("").astype(str) for _, row in edited_table.iterrows(): i = df_db.index[df_db["id"] == row["id"]] if len(i): for c in editable_cols: df_db.at[i[0], c] = row[c] _save_db(df_db) # return refreshed table only (respect current_model scope) return refresh_view("", [], "", "", current_model)[0] # -------------------- UI -------------------- # CSS that targets only the three buttons via elem_id custom_css = """ /* scope styles to only these 3 components */ #sync-btn button, #refresh-btn button, #save-btn button, #sync-btn .gr-button, #refresh-btn .gr-button, #save-btn .gr-button, #sync-btn [role="button"], #refresh-btn [role="button"], #save-btn [role="button"] { background: #f97316 !important; /* orange-500 */ border-color: #f97316 !important; color: #fff !important; } /* hover/active */ #sync-btn button:hover, #refresh-btn button:hover, #save-btn button:hover, #sync-btn .gr-button:hover, #refresh-btn .gr-button:hover, #save-btn .gr-button:hover, #sync-btn [role="button"]:hover, #refresh-btn [role="button"]:hover, #save-btn [role="button"]:hover { background: #ea580c !important; /* orange-600 */ border-color: #ea580c !important; } /* (optional) also set CSS vars in case theme uses them */ #sync-btn, #refresh-btn, #save-btn { --button-primary-background-fill: #f97316; --button-primary-background-fill-hover: #ea580c; --button-text-color: #fff; } """ with gr.Blocks(title="Audio HTML Library", css=custom_css) as demo: gr.Markdown("## ๐ŸŽง Audio Reconstruction Reports โ€” sync โ€ข search โ€ข view") current_model = gr.State("") # remembers active model prefix inside HF repo source_mode = gr.State("HF") # default with gr.Row(): with gr.Column(scale=1): # Choose model & sync gr.Markdown(f"**Model prefix on HF dataset:** `{HF_DATASET_REPO}//...`") model_in = gr.Textbox(label="Model name", placeholder="e.g., WavCochV8192") sync_btn = gr.Button("Sync this model", elem_id="sync-btn") # Search & filters gr.Markdown("---\n**Search & filter**") query = gr.Textbox(label="Keyword search (filename/tags/notes/category/dataset)", placeholder="type to searchโ€ฆ") tag_filter = gr.CheckboxGroup(choices=[], label="Filter by tags (AND)") category_filter = gr.Dropdown(choices=[], label="Category") dataset_filter = gr.Dropdown(choices=[], label="Dataset") # ๐Ÿ”ฝ Step 5: Source toggle (HF vs Local) mode_radio = gr.Radio( choices=["HF", "Local"], value="HF", label="Source", info="Preview from HF dataset or local disk" ) refresh_btn = gr.Button("Refresh", elem_id="refresh-btn") with gr.Column(scale=2): # Count of current view count_md = gr.Markdown("**Showing 0 file(s)**") gr.Markdown("**Library** (click a row to preview; edit cells and Save)") table = gr.Dataframe( headers=TABLE_COLS, datatype=["str"] * len(TABLE_COLS), interactive=True, wrap=True, row_count=(0, "dynamic"), col_count=(len(TABLE_COLS), "fixed") ) with gr.Row(): save_btn = gr.Button("Save Edits", elem_id="save-btn") preview_label = gr.Markdown("") preview_html = gr.HTML("") # wiring: sync (also sets current_model) sync_btn.click( sync_model, [model_in], [table, tag_filter, category_filter, dataset_filter, count_md, current_model] ) # wiring: refresh + live filters (respect current_model) refresh_btn.click( refresh_view, [query, tag_filter, category_filter, dataset_filter, current_model], [table, tag_filter, category_filter, dataset_filter, count_md] ) # Trigger refresh when any filter OR source mode changes for comp in (query, tag_filter, category_filter, dataset_filter, mode_radio): comp.change( refresh_view, [query, tag_filter, category_filter, dataset_filter, current_model], [table, tag_filter, category_filter, dataset_filter, count_md] ) # Keep source_mode state in sync with the radio mode_radio.change(lambda x: x, [mode_radio], [source_mode]) # Pass source_mode into select_row so it can choose hf_path vs path table.select(select_row, [table, source_mode, current_model], [preview_html, preview_label]) save_btn.click(save_edits, [table, current_model], [table]) # initial load (no model yet) demo.load( refresh_view, [query, tag_filter, category_filter, dataset_filter, current_model], [table, tag_filter, category_filter, dataset_filter, count_md] ) if __name__ == "__main__": demo.launch(share=True) # auth optional