import gradio as gr
import os
import json
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download
from datasets import load_dataset
import requests
TOKEN = os.environ.get("HF_TOKEN")
OWNER = os.environ.get("OWNER")
RESULTS_COMMUNITY = f"{OWNER}/benchmark_results"
api = HfApi()
URL = os.environ.get("URL")
def load_data(source):
if source == "core":
with open("data.json", "r") as f:
data = json.load(f)
else:
ds = load_dataset(RESULTS_COMMUNITY, split='train')
data = []
for entry in ds:
data.append(entry)
return data
def build_table(source):
data = load_data(source)
entries = []
if source == "core":
headers = ["Benchmark", "Category", "Pile-train Dirty (%)", "DCLM-baseline Dirty (%)", "CC-2025-05 Dirty (%)"]
else:
headers = ["Benchmark", "Contributor", "Pile-train Dirty (%)", "DCLM-baseline Dirty (%)", "CC-2025-05 Dirty (%)"]
html = """
"""
for col in headers:
html += f'{col} | '
html += '
\n\n'
for entry in data:
name = entry.get("Benchmark", "")
url = entry.get("URL", "#")
hyperlink = f'{name}'
row = {
"Benchmark": hyperlink,
"Pile-train Dirty (%)": entry.get("Pile Dirty", -1),
"DCLM-baseline Dirty (%)": entry.get("DCLM Dirty", -1),
"CC-2025-05 Dirty (%)": entry.get("CC202505 Dirty", -1),
}
if source == "core":
row["Category"] = entry.get("Category", "")
elif source == "community":
row["Contributor"] = entry.get("Contributor", "")
html += ""
for col in headers:
val = row.get(col, "")
if isinstance(val, float) and val >= 0:
val = f"{val:5.1f}"
elif isinstance(val, float):
val = "N/A"
html += f'{val} | '
html += "
\n"
html += "
"
html += """
"""
html += """
"""
return html
def record_submission(benchmark_name, contributor, jsonl_file, hf_path, hf_split, field_name):
if not benchmark_name or not benchmark_name.strip():
return "❌ Please provide a benchmark name."
if not field_name or not field_name.strip():
return "❌ Please provide a field name."
has_jsonl = jsonl_file is not None
has_hf = hf_path and hf_path.strip()
if not has_jsonl and not has_hf:
return "❌ Please provide either a .jsonl file or a HuggingFace dataset path."
if has_jsonl:
try:
with open(jsonl_file.name, 'r', encoding='utf-8') as f:
line_count = 0
for line in f:
line_count += 1
if line_count > 5:
break
try:
entry = json.loads(line.strip())
if field_name.strip() not in entry:
available_fields = list(entry.keys())
return f"❌ Field '{field_name.strip()}' not found in JSONL file. Available fields: {', '.join(available_fields)}"
except json.JSONDecodeError as e:
return f"❌ Invalid JSON format in line {line_count}: {str(e)}"
if line_count == 0:
return "❌ The uploaded file is empty."
except Exception as e:
return f"❌ Error reading file: {str(e)}"
elif has_hf:
if not hf_split or not hf_split.strip():
return "❌ Please provide a dataset split for the HuggingFace dataset."
try:
dataset_info = load_dataset(hf_path.strip(), split=hf_split.strip(), streaming=True, trust_remote_code=True)
first_item = next(iter(dataset_info))
if field_name.strip() not in first_item:
available_fields = list(first_item.keys())
return f"❌ Field '{field_name.strip()}' not found in dataset. Available fields: {', '.join(available_fields)}"
except Exception as e:
return f"❌ Could not access HuggingFace dataset: {str(e)}"
try:
data = {
'name': benchmark_name.strip(),
'contributor': contributor.strip(),
'type': 'jsonl' if has_jsonl else 'hf',
'split': hf_split.strip() if has_hf else '',
'field_name': field_name.strip(),
'hf_path': hf_path.strip() if has_hf else ''
}
print(json.dumps(data))
files = {}
if has_jsonl:
files['file'] = (benchmark_name.strip() + '.jsonl', open(jsonl_file.name, 'rb'), 'application/json')
response = requests.post(f"{URL}/", data={"payload": json.dumps(data)}, files=files, timeout=30)
if files:
files['file'][1].close()
if response.status_code == 200:
result = response.json()
if result.get("status") == "success":
message = result.get('message', 'Submission successful!')
full_message = f"{message}"
return full_message
elif result.get("status") == "info":
return f"❌ {result.get('message', 'Submission already exists')}"
else:
return f"❌ {result.get('message', 'Unknown error occurred')}"
else:
return f"❌ Server error: {response.status_code} - {response.text}"
except Exception as e:
return f"❌ Error submitting benchmark: {str(e)}"
with gr.Blocks() as interface:
gr.Markdown("# 📖 Benchmark Contamination Bulletin")
with gr.Tabs():
with gr.Tab(label="Bulletin"):
source_radio = gr.Radio(
choices=["core", "community"],
label="Select Benchmark Source",
value="core"
)
leaderboard_html = gr.HTML(build_table("core"))
def update_table(source):
return build_table(source)
source_radio.change(
fn=update_table,
inputs=source_radio,
outputs=leaderboard_html
)
refresh_button = gr.Button("Refresh")
refresh_button.click(
fn=update_table,
inputs=source_radio,
outputs=leaderboard_html
)
with gr.Tab(label="Add New Benchmarks"):
gr.Markdown("## Add Your Own Benchmarks for Contamination Checking")
with gr.Row():
benchmark_name_input = gr.Textbox(label="Benchmark Name")
contributor_input = gr.Textbox(label="Contributor")
with gr.Row():
jsonl_input = gr.File(label="Upload .jsonl File", file_types=[".jsonl"])
with gr.Column():
hf_path_input = gr.Textbox(label="HuggingFace Dataset Path")
hf_split_input = gr.Textbox(label="Dataset split (only if providing HuggingFace Dataset)", placeholder="e.g., validation, test")
field_name_input = gr.Textbox(label="Context or Question Field Name", placeholder="e.g., context, question, ...")
submit_button = gr.Button("Submit for Contamination Check")
result_output = gr.Textbox(label="Submission Status", interactive=False)
submit_button.click(
fn=record_submission,
inputs=[benchmark_name_input, contributor_input, jsonl_input, hf_path_input, hf_split_input, field_name_input],
outputs=result_output
)
interface.launch()