Spaces:
Sleeping
Sleeping
""" | |
Process and transform GuardBench leaderboard data. | |
""" | |
import json | |
import os | |
import pandas as pd | |
from datetime import datetime | |
from typing import Dict, List, Any, Tuple | |
import numpy as np | |
from src.display.utils import CATEGORIES, TEST_TYPES, METRICS | |
# Constants for Integral Score calculation (mirrors guardbench library) | |
MAX_PUNISHABLE_RUNTIME_MS = 6000.0 | |
MIN_PUNISHABLE_RUNTIME_MS = 200.0 | |
MAX_RUNTIME_PENALTY = 0.75 # Corresponds to 1.0 - MIN_TIME_FACTOR, library used 0.75 | |
def calculate_integral_score(row: pd.Series) -> float: | |
""" | |
Calculate the integral score for a given model entry row. | |
Uses accuracy as the primary metric, micro error ratio, and micro runtime penalty. | |
Falls back to macro accuracy and averaged per-test-type errors/runtimes if micro values are missing. | |
""" | |
integral_score = 1.0 | |
metric_count = 0 | |
# Primary metric (using accuracy) | |
for test_type in TEST_TYPES: | |
metric_col = f"{test_type}_accuracy" | |
if metric_col in row and pd.notna(row[metric_col]): | |
# print(f"Found accuracy metric for {test_type}: {row[metric_col]}") | |
integral_score *= row[metric_col] | |
metric_count += 1 | |
# print(f"Metric count: {metric_count}") | |
# If no accuracy metrics were found at all, the score remains 1.0 before penalties. | |
# The library returns 0.0 in this case (`return integral_score if count > 0 else 0.0`) | |
# Let's add that check back before applying penalties. | |
if metric_count == 0: | |
return 0.0 | |
# Error Penalty | |
micro_error_col = "micro_avg_error_ratio" | |
if micro_error_col in row and pd.notna(row[micro_error_col]): | |
# Micro error is stored as %, convert back to ratio | |
micro_error_ratio = row[micro_error_col] / 100.0 | |
integral_score *= (1.0 - micro_error_ratio) | |
# Runtime Penalty | |
avg_runtime_ms = None # Initialize | |
micro_runtime_col = "micro_avg_runtime_ms" | |
if micro_runtime_col in row and pd.notna(row[micro_runtime_col]): | |
avg_runtime_ms = row[micro_runtime_col] | |
if avg_runtime_ms is not None: | |
# Apply penalty based on runtime (only if micro avg runtime was found) | |
runtime = max( | |
min(avg_runtime_ms, MAX_PUNISHABLE_RUNTIME_MS), | |
MIN_PUNISHABLE_RUNTIME_MS, | |
) | |
if MAX_PUNISHABLE_RUNTIME_MS > MIN_PUNISHABLE_RUNTIME_MS: | |
normalized_time = (runtime - MIN_PUNISHABLE_RUNTIME_MS) / ( | |
MAX_PUNISHABLE_RUNTIME_MS - MIN_PUNISHABLE_RUNTIME_MS | |
) | |
# Match reference library formula 1 | |
time_factor = 1.0 - (1.0 - MAX_RUNTIME_PENALTY) * normalized_time | |
else: | |
# Match reference library formula (though less critical when max==min) | |
time_factor = 1.0 if runtime <= MIN_PUNISHABLE_RUNTIME_MS else (1.0 - MAX_RUNTIME_PENALTY) | |
# Match reference library formula 2 (enforce minimum factor) | |
time_factor = max(MAX_RUNTIME_PENALTY, time_factor) | |
integral_score *= time_factor | |
# Rooting is not done in the reference library's summary table calculation | |
return integral_score | |
def load_leaderboard_data(file_path: str) -> Dict: | |
""" | |
Load the leaderboard data from a JSON file. | |
""" | |
if not os.path.exists(file_path): | |
version = "v0" | |
if "_v" in file_path: | |
version = file_path.split("_")[-1].split(".")[0] | |
return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} | |
with open(file_path, 'r') as f: | |
data = json.load(f) | |
# Ensure version field exists | |
if "version" not in data: | |
version = "v0" | |
if "_v" in file_path: | |
version = file_path.split("_")[-1].split(".")[0] | |
data["version"] = version | |
return data | |
def save_leaderboard_data(data: Dict, file_path: str) -> None: | |
""" | |
Save the leaderboard data to a JSON file. | |
""" | |
# Ensure the directory exists | |
os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
# Update the last_updated timestamp | |
data["last_updated"] = datetime.now().isoformat() | |
# Ensure version is set | |
if "version" not in data: | |
version = "v0" | |
if "_v" in file_path: | |
version = file_path.split("_")[-1].split(".")[0] | |
data["version"] = version | |
with open(file_path, 'w') as f: | |
json.dump(data, f, indent=2) | |
def process_submission(submission_data: List[Dict]) -> List[Dict]: | |
""" | |
Process submission data and convert it to leaderboard entries. | |
""" | |
entries = [] | |
for item in submission_data: | |
# Create a new entry for the leaderboard | |
entry = { | |
"model_name": item.get("model_name", "Unknown Model"), | |
"per_category_metrics": {}, | |
"avg_metrics": {}, | |
"submission_date": datetime.now().isoformat(), | |
"version": item.get("version", "v0") | |
} | |
# Copy model metadata | |
for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: | |
if key in item: | |
entry[key] = item[key] | |
# Process per-category metrics | |
if "per_category_metrics" in item: | |
entry["per_category_metrics"] = item["per_category_metrics"] | |
# Process average metrics | |
if "avg_metrics" in item: | |
entry["avg_metrics"] = item["avg_metrics"] | |
entries.append(entry) | |
return entries | |
def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: | |
""" | |
Convert leaderboard data to a pandas DataFrame for display. | |
""" | |
rows = [] | |
for entry in leaderboard_data.get("entries", []): | |
model_name = entry.get("model_name", "Unknown Model") | |
# Extract average metrics for main display | |
row = { | |
"model_name": model_name, | |
"model_type": entry.get("model_type", "Unknown"), | |
"mode": entry.get("mode", "Strict"), | |
"submission_date": entry.get("submission_date", ""), | |
"version": entry.get("version", "v0"), | |
"guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() | |
} | |
# Add additional metadata fields if present | |
for key in ["base_model", "revision", "precision", "weight_type"]: | |
if key in entry: | |
row[key] = entry[key] | |
# CASE 1: Metrics are flat in the root | |
for key, value in entry.items(): | |
if any(test_type in key for test_type in TEST_TYPES) or \ | |
key in ["average_f1", "average_recall", "average_precision", | |
"macro_accuracy", "macro_recall", "total_evals_count"]: | |
row[key] = value | |
# CASE 2: Metrics are in avg_metrics structure | |
avg_metrics = entry.get("avg_metrics", {}) | |
if avg_metrics: | |
for test_type in TEST_TYPES: | |
if test_type in avg_metrics: | |
metrics = avg_metrics[test_type] | |
for metric in METRICS: | |
if metric in metrics: | |
col_name = f"{test_type}_{metric}" | |
row[col_name] = metrics[metric] | |
# Also add non-binary version for F1 scores | |
if metric == "f1_binary": | |
row[f"{test_type}_f1"] = metrics[metric] | |
# Calculate averages if not present | |
# Use accuracy for macro_accuracy | |
if "macro_accuracy" not in row: | |
accuracy_values = [] | |
for test_type in TEST_TYPES: | |
# Check avg_metrics structure first | |
accuracy_val = None | |
if test_type in avg_metrics and "accuracy" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["accuracy"]): | |
accuracy_val = avg_metrics[test_type]["accuracy"] | |
# Check flat structure as fallback (might be redundant but safer) | |
elif f"{test_type}_accuracy" in row and pd.notna(row[f"{test_type}_accuracy"]): | |
accuracy_val = row[f"{test_type}_accuracy"] | |
if accuracy_val is not None: | |
accuracy_values.append(accuracy_val) | |
if accuracy_values: | |
row["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) | |
# Use recall_binary for macro_recall | |
if "macro_recall" not in row: | |
recall_values = [] | |
for test_type in TEST_TYPES: | |
if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["recall_binary"]): | |
recall_values.append(avg_metrics[test_type]["recall_binary"]) | |
if recall_values: | |
row["macro_recall"] = sum(recall_values) / len(recall_values) | |
if "total_evals_count" not in row: | |
total_samples = 0 | |
found_samples = False | |
for test_type in TEST_TYPES: | |
if test_type in avg_metrics and "sample_count" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["sample_count"]): | |
total_samples += avg_metrics[test_type]["sample_count"] | |
found_samples = True | |
if found_samples: | |
row["total_evals_count"] = total_samples | |
# Extract micro averages directly from entry if they exist (like in guardbench library) | |
row["micro_avg_error_ratio"] = entry.get("micro_avg_error_ratio", pd.NA) | |
row["micro_avg_runtime_ms"] = entry.get("micro_avg_runtime_ms", pd.NA) | |
# Convert error ratio to percentage for consistency with display name | |
if pd.notna(row["micro_avg_error_ratio"]): | |
row["micro_avg_error_ratio"] *= 100 | |
rows.append(row) | |
# Create DataFrame and sort by average F1 score | |
df = pd.DataFrame(rows) | |
# Ensure all expected columns exist | |
for test_type in TEST_TYPES: | |
for metric in METRICS: | |
col_name = f"{test_type}_{metric}" | |
if col_name not in df.columns: | |
df[col_name] = pd.NA # Use pd.NA for missing numeric data | |
# Add non-binary F1 if binary exists and f1 is missing | |
if metric == "f1_binary" and f"{test_type}_f1" not in df.columns: | |
# Check if the binary column has data before copying | |
if col_name in df.columns: | |
df[f"{test_type}_f1"] = df[col_name] | |
else: | |
df[f"{test_type}_f1"] = pd.NA | |
# Calculate Integral Score | |
if not df.empty: | |
df["integral_score"] = df.apply(calculate_integral_score, axis=1) | |
# Sort by Integral Score instead of average_f1 | |
df = df.sort_values(by="integral_score", ascending=False, na_position='last') | |
else: | |
# Add the column even if empty | |
df["integral_score"] = pd.NA | |
# Ensure summary columns exist | |
summary_cols = ["macro_accuracy", "macro_recall", "micro_avg_error_ratio", "micro_avg_runtime_ms", "total_evals_count"] | |
for col in summary_cols: | |
if col not in df.columns: | |
df[col] = pd.NA | |
# Remove old average columns if they somehow snuck in | |
old_avg_cols = ["average_f1", "average_recall", "average_precision"] | |
for col in old_avg_cols: | |
if col in df.columns: | |
df = df.drop(columns=[col]) | |
# print("--- DataFrame before returning from leaderboard_to_dataframe ---") | |
# print(df[['model_name', 'macro_accuracy', 'macro_recall', 'total_evals_count']].head()) | |
# print("-------------------------------------------------------------") | |
return df | |
def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: | |
""" | |
Add new entries to the leaderboard, replacing any with the same model name. | |
""" | |
# Create a mapping of existing entries by model name and version | |
existing_entries = { | |
(entry["model_name"], entry.get("version", "v0")): i | |
for i, entry in enumerate(leaderboard_data.get("entries", [])) | |
} | |
# Process each new entry | |
for new_entry in new_entries: | |
model_name = new_entry.get("model_name") | |
version = new_entry.get("version", "v0") | |
if (model_name, version) in existing_entries: | |
# Replace existing entry | |
leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry | |
else: | |
# Add new entry | |
if "entries" not in leaderboard_data: | |
leaderboard_data["entries"] = [] | |
leaderboard_data["entries"].append(new_entry) | |
# Update the last_updated timestamp | |
leaderboard_data["last_updated"] = datetime.now().isoformat() | |
return leaderboard_data | |
def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: | |
""" | |
Process a JSONL submission file and extract entries. | |
""" | |
entries = [] | |
try: | |
with open(file_path, 'r') as f: | |
for line in f: | |
try: | |
entry = json.loads(line) | |
entries.append(entry) | |
except json.JSONDecodeError as e: | |
return [], f"Invalid JSON in submission file: {e}" | |
if not entries: | |
return [], "Submission file is empty" | |
return entries, "Successfully processed submission" | |
except Exception as e: | |
return [], f"Error processing submission file: {e}" | |