Spaces:
Sleeping
Sleeping
""" | |
Process and transform GuardBench leaderboard data. | |
""" | |
import json | |
import os | |
import pandas as pd | |
from datetime import datetime | |
from typing import Dict, List, Any, Tuple | |
from src.display.utils import CATEGORIES, TEST_TYPES, METRICS | |
def load_leaderboard_data(file_path: str) -> Dict: | |
""" | |
Load the leaderboard data from a JSON file. | |
""" | |
if not os.path.exists(file_path): | |
version = "v0" | |
if "_v" in file_path: | |
version = file_path.split("_")[-1].split(".")[0] | |
return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} | |
with open(file_path, 'r') as f: | |
data = json.load(f) | |
# Ensure version field exists | |
if "version" not in data: | |
version = "v0" | |
if "_v" in file_path: | |
version = file_path.split("_")[-1].split(".")[0] | |
data["version"] = version | |
return data | |
def save_leaderboard_data(data: Dict, file_path: str) -> None: | |
""" | |
Save the leaderboard data to a JSON file. | |
""" | |
# Ensure the directory exists | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
# Update the last_updated timestamp | |
data["last_updated"] = datetime.now().isoformat() | |
# Ensure version is set | |
if "version" not in data: | |
version = "v0" | |
if "_v" in file_path: | |
version = file_path.split("_")[-1].split(".")[0] | |
data["version"] = version | |
with open(file_path, 'w') as f: | |
json.dump(data, f, indent=2) | |
def process_submission(submission_data: List[Dict]) -> List[Dict]: | |
""" | |
Process submission data and convert it to leaderboard entries. | |
""" | |
entries = [] | |
for item in submission_data: | |
# Create a new entry for the leaderboard | |
entry = { | |
"model_name": item.get("model_name", "Unknown Model"), | |
"per_category_metrics": {}, | |
"avg_metrics": {}, | |
"submission_date": datetime.now().isoformat(), | |
"version": item.get("version", "v0") | |
} | |
# Copy model metadata | |
for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: | |
if key in item: | |
entry[key] = item[key] | |
# Process per-category metrics | |
if "per_category_metrics" in item: | |
entry["per_category_metrics"] = item["per_category_metrics"] | |
# Process average metrics | |
if "avg_metrics" in item: | |
entry["avg_metrics"] = item["avg_metrics"] | |
entries.append(entry) | |
return entries | |
def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: | |
""" | |
Convert leaderboard data to a pandas DataFrame for display. | |
""" | |
rows = [] | |
for entry in leaderboard_data.get("entries", []): | |
model_name = entry.get("model_name", "Unknown Model") | |
# Extract average metrics for main display | |
row = { | |
"model_name": model_name, | |
"model_type": entry.get("model_type", "Unknown"), | |
"submission_date": entry.get("submission_date", ""), | |
"version": entry.get("version", "v0"), | |
"guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() | |
} | |
# Add additional metadata fields if present | |
for key in ["base_model", "revision", "precision", "weight_type"]: | |
if key in entry: | |
row[key] = entry[key] | |
# CASE 1: Metrics are flat in the root | |
for key, value in entry.items(): | |
if any(test_type in key for test_type in TEST_TYPES) or key in ["average_f1", "average_recall", "average_precision"]: | |
row[key] = value | |
# CASE 2: Metrics are in avg_metrics structure | |
avg_metrics = entry.get("avg_metrics", {}) | |
if avg_metrics: | |
for test_type in TEST_TYPES: | |
if test_type in avg_metrics: | |
metrics = avg_metrics[test_type] | |
for metric in METRICS: | |
if metric in metrics: | |
col_name = f"{test_type}_{metric}" | |
row[col_name] = metrics[metric] | |
# Also add non-binary version for F1 scores | |
if metric == "f1_binary": | |
row[f"{test_type}_f1"] = metrics[metric] | |
# Calculate averages if not present | |
if "average_f1" not in row: | |
f1_values = [] | |
for test_type in TEST_TYPES: | |
if test_type in avg_metrics and "f1_binary" in avg_metrics[test_type]: | |
f1_values.append(avg_metrics[test_type]["f1_binary"]) | |
if f1_values: | |
row["average_f1"] = sum(f1_values) / len(f1_values) | |
if "average_recall" not in row: | |
recall_values = [] | |
for test_type in TEST_TYPES: | |
if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type]: | |
recall_values.append(avg_metrics[test_type]["recall_binary"]) | |
if recall_values: | |
row["average_recall"] = sum(recall_values) / len(recall_values) | |
if "average_precision" not in row: | |
precision_values = [] | |
for test_type in TEST_TYPES: | |
if test_type in avg_metrics and "precision_binary" in avg_metrics[test_type]: | |
precision_values.append(avg_metrics[test_type]["precision_binary"]) | |
if precision_values: | |
row["average_precision"] = sum(precision_values) / len(precision_values) | |
rows.append(row) | |
# Create DataFrame and sort by average F1 score | |
df = pd.DataFrame(rows) | |
# Ensure all expected columns exist | |
for test_type in TEST_TYPES: | |
if f"{test_type}_f1" not in df.columns: | |
df[f"{test_type}_f1"] = None | |
if f"{test_type}_f1_binary" not in df.columns: | |
df[f"{test_type}_f1_binary"] = None | |
if f"{test_type}_recall_binary" not in df.columns: | |
df[f"{test_type}_recall_binary"] = None | |
if f"{test_type}_precision_binary" not in df.columns: | |
df[f"{test_type}_precision_binary"] = None | |
if not df.empty and "average_f1" in df.columns: | |
df = df.sort_values(by="average_f1", ascending=False) | |
return df | |
def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: | |
""" | |
Add new entries to the leaderboard, replacing any with the same model name. | |
""" | |
# Create a mapping of existing entries by model name and version | |
existing_entries = { | |
(entry["model_name"], entry.get("version", "v0")): i | |
for i, entry in enumerate(leaderboard_data.get("entries", [])) | |
} | |
# Process each new entry | |
for new_entry in new_entries: | |
model_name = new_entry.get("model_name") | |
version = new_entry.get("version", "v0") | |
if (model_name, version) in existing_entries: | |
# Replace existing entry | |
leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry | |
else: | |
# Add new entry | |
if "entries" not in leaderboard_data: | |
leaderboard_data["entries"] = [] | |
leaderboard_data["entries"].append(new_entry) | |
# Update the last_updated timestamp | |
leaderboard_data["last_updated"] = datetime.now().isoformat() | |
return leaderboard_data | |
def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: | |
""" | |
Process a JSONL submission file and extract entries. | |
""" | |
entries = [] | |
try: | |
with open(file_path, 'r') as f: | |
for line in f: | |
try: | |
entry = json.loads(line) | |
entries.append(entry) | |
except json.JSONDecodeError as e: | |
return [], f"Invalid JSON in submission file: {e}" | |
if not entries: | |
return [], "Submission file is empty" | |
return entries, "Successfully processed submission" | |
except Exception as e: | |
return [], f"Error processing submission file: {e}" | |