import gradio as gr import os import json import pandas as pd from huggingface_hub import HfApi, hf_hub_download from datasets import load_dataset import requests TOKEN = os.environ.get("HF_TOKEN") OWNER = os.environ.get("OWNER") RESULTS_COMMUNITY = f"{OWNER}/benchmark_results" api = HfApi() URL = os.environ.get("URL") def load_data(source): if source == "core": with open("data.json", "r") as f: data = json.load(f) else: ds = load_dataset(RESULTS_COMMUNITY, split='train') data = [] for entry in ds: data.append(entry) return data def build_table(source): data = load_data(source) entries = [] if source == "core": headers = ["Benchmark", "Category", "Pile-train Dirty (%)", "DCLM-baseline Dirty (%)", "CC-2025-05 Dirty (%)"] else: headers = ["Benchmark", "Contributor", "Pile-train Dirty (%)", "DCLM-baseline Dirty (%)", "CC-2025-05 Dirty (%)"] html = """ """ for col in headers: html += f'' html += '\n\n' for entry in data: name = entry.get("Benchmark", "") url = entry.get("URL", "#") hyperlink = f'{name}' row = { "Benchmark": hyperlink, "Pile-train Dirty (%)": entry.get("Pile Dirty", -1), "DCLM-baseline Dirty (%)": entry.get("DCLM Dirty", -1), "CC-2025-05 Dirty (%)": entry.get("CC202505 Dirty", -1), } if source == "core": row["Category"] = entry.get("Category", "") elif source == "community": row["Contributor"] = entry.get("Contributor", "") html += "" for col in headers: val = row.get(col, "") if isinstance(val, float) and val >= 0: val = f"{val:5.1f}" elif isinstance(val, float): val = "N/A" html += f'' html += "\n" html += "
{col}
{val}
" html += """ """ html += """ """ return html def record_submission(benchmark_name, contributor, jsonl_file, hf_path, hf_split, field_name): if not benchmark_name or not benchmark_name.strip(): return "❌ Please provide a benchmark name." if not field_name or not field_name.strip(): return "❌ Please provide a field name." has_jsonl = jsonl_file is not None has_hf = hf_path and hf_path.strip() if not has_jsonl and not has_hf: return "❌ Please provide either a .jsonl file or a HuggingFace dataset path." if has_jsonl: try: with open(jsonl_file.name, 'r', encoding='utf-8') as f: line_count = 0 for line in f: line_count += 1 if line_count > 5: break try: entry = json.loads(line.strip()) if field_name.strip() not in entry: available_fields = list(entry.keys()) return f"❌ Field '{field_name.strip()}' not found in JSONL file. Available fields: {', '.join(available_fields)}" except json.JSONDecodeError as e: return f"❌ Invalid JSON format in line {line_count}: {str(e)}" if line_count == 0: return "❌ The uploaded file is empty." except Exception as e: return f"❌ Error reading file: {str(e)}" elif has_hf: if not hf_split or not hf_split.strip(): return "❌ Please provide a dataset split for the HuggingFace dataset." try: dataset_info = load_dataset(hf_path.strip(), split=hf_split.strip(), streaming=True, trust_remote_code=True) first_item = next(iter(dataset_info)) if field_name.strip() not in first_item: available_fields = list(first_item.keys()) return f"❌ Field '{field_name.strip()}' not found in dataset. Available fields: {', '.join(available_fields)}" except Exception as e: return f"❌ Could not access HuggingFace dataset: {str(e)}" try: data = { 'name': benchmark_name.strip(), 'contributor': contributor.strip(), 'type': 'jsonl' if has_jsonl else 'hf', 'split': hf_split.strip() if has_hf else '', 'field_name': field_name.strip(), 'hf_path': hf_path.strip() if has_hf else '' } print(json.dumps(data)) files = {} if has_jsonl: files['file'] = (benchmark_name.strip() + '.jsonl', open(jsonl_file.name, 'rb'), 'application/json') response = requests.post(f"{URL}/", data={"payload": json.dumps(data)}, files=files, timeout=30) if files: files['file'][1].close() if response.status_code == 200: result = response.json() if result.get("status") == "success": message = result.get('message', 'Submission successful!') full_message = f"{message}" return full_message elif result.get("status") == "info": return f"❌ {result.get('message', 'Submission already exists')}" else: return f"❌ {result.get('message', 'Unknown error occurred')}" else: return f"❌ Server error: {response.status_code} - {response.text}" except Exception as e: return f"❌ Error submitting benchmark: {str(e)}" with gr.Blocks() as interface: gr.Markdown("# 📖 Benchmark Contamination Bulletin") with gr.Tabs(): with gr.Tab(label="Bulletin"): source_radio = gr.Radio( choices=["core", "community"], label="Select Benchmark Source", value="core" ) leaderboard_html = gr.HTML(build_table("core")) def update_table(source): return build_table(source) source_radio.change( fn=update_table, inputs=source_radio, outputs=leaderboard_html ) refresh_button = gr.Button("Refresh") refresh_button.click( fn=update_table, inputs=source_radio, outputs=leaderboard_html ) with gr.Tab(label="Add New Benchmarks"): gr.Markdown("## Add Your Own Benchmarks for Contamination Checking") with gr.Row(): benchmark_name_input = gr.Textbox(label="Benchmark Name") contributor_input = gr.Textbox(label="Contributor") with gr.Row(): jsonl_input = gr.File(label="Upload .jsonl File", file_types=[".jsonl"]) with gr.Column(): hf_path_input = gr.Textbox(label="HuggingFace Dataset Path") hf_split_input = gr.Textbox(label="Dataset split (only if providing HuggingFace Dataset)", placeholder="e.g., validation, test") field_name_input = gr.Textbox(label="Context or Question Field Name", placeholder="e.g., context, question, ...") submit_button = gr.Button("Submit for Contamination Check") result_output = gr.Textbox(label="Submission Status", interactive=False) submit_button.click( fn=record_submission, inputs=[benchmark_name_input, contributor_input, jsonl_input, hf_path_input, hf_split_input, field_name_input], outputs=result_output ) interface.launch()