Spaces:
Sleeping
Sleeping
""" | |
Populate the GuardBench leaderboard from HuggingFace datasets. | |
""" | |
import json | |
import os | |
import pandas as pd | |
import tempfile | |
from typing import Dict, List, Optional | |
from datetime import datetime | |
import numpy as np | |
from huggingface_hub import hf_hub_download, HfApi | |
from datasets import load_dataset | |
from src.display.utils import GUARDBENCH_COLUMN, DISPLAY_COLS, CATEGORIES | |
from src.envs import RESULTS_DATASET_ID, TOKEN, CACHE_PATH | |
from src.leaderboard.processor import leaderboard_to_dataframe | |
def get_latest_leaderboard(version="v0") -> Optional[Dict]: | |
""" | |
Get the latest leaderboard data from HuggingFace dataset. | |
""" | |
try: | |
# Try to download the leaderboard file | |
leaderboard_path = hf_hub_download( | |
repo_id=RESULTS_DATASET_ID, | |
filename=f"leaderboards/leaderboard_{version}.json", | |
repo_type="dataset", | |
token=TOKEN | |
) | |
with open(leaderboard_path, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Error downloading leaderboard: {e}") | |
return None | |
def get_model_entry(model_name: str, version="v0") -> Optional[Dict]: | |
""" | |
Get a specific model's entry from the entries folder. | |
""" | |
try: | |
model_name_safe = model_name.replace("/", "_").replace(" ", "_") | |
entry_path = hf_hub_download( | |
repo_id=RESULTS_DATASET_ID, | |
filename=f"entries/entry_{model_name_safe}_{version}.json", | |
repo_type="dataset", | |
token=TOKEN | |
) | |
with open(entry_path, 'r') as f: | |
return json.load(f) | |
except Exception as e: | |
print(f"Error downloading model entry: {e}") | |
return None | |
def get_all_entries(version="v0") -> List[Dict]: | |
""" | |
Get all model entries from the entries folder. | |
""" | |
try: | |
api = HfApi(token=TOKEN) | |
files = api.list_repo_files(repo_id=RESULTS_DATASET_ID, repo_type="dataset") | |
entry_files = [f for f in files if f.startswith("entries/") and f.endswith(f"_{version}.json")] | |
entries = [] | |
for entry_file in entry_files: | |
try: | |
entry_path = hf_hub_download( | |
repo_id=RESULTS_DATASET_ID, | |
filename=entry_file, | |
repo_type="dataset", | |
token=TOKEN | |
) | |
with open(entry_path, 'r') as f: | |
entry_data = json.load(f) | |
entries.append(entry_data) | |
except Exception as e: | |
print(f"Error loading entry {entry_file}: {e}") | |
return entries | |
except Exception as e: | |
print(f"Error listing entries: {e}") | |
return [] | |
def get_leaderboard_df(version="v0") -> pd.DataFrame: | |
""" | |
Get the leaderboard data as a DataFrame. | |
""" | |
# Get latest leaderboard data | |
leaderboard_data = get_latest_leaderboard(version) | |
if not leaderboard_data: | |
# If no leaderboard exists, try to build it from entries | |
entries = get_all_entries(version) | |
if entries: | |
leaderboard_data = { | |
"entries": entries, | |
"last_updated": datetime.now().isoformat(), | |
"version": version | |
} | |
else: | |
# Return empty DataFrame if no data available | |
return pd.DataFrame(columns=DISPLAY_COLS) | |
# Convert to DataFrame | |
return leaderboard_to_dataframe(leaderboard_data) | |
def get_category_leaderboard_df(category: str, version="v0") -> pd.DataFrame: | |
""" | |
Get the leaderboard data filtered by a specific category. | |
""" | |
# Get latest leaderboard data | |
leaderboard_data = get_latest_leaderboard(version) | |
if not leaderboard_data: | |
# If no leaderboard exists, try to build it from entries | |
entries = get_all_entries(version) | |
if entries: | |
leaderboard_data = { | |
"entries": entries, | |
"last_updated": datetime.now().isoformat(), | |
"version": version | |
} | |
else: | |
# Return empty DataFrame if no data available | |
return pd.DataFrame(columns=DISPLAY_COLS) | |
# Filter entries to only include those with data for the specified category | |
filtered_entries = [] | |
for entry in leaderboard_data.get("entries", []): | |
# Copy all base fields | |
filtered_entry = { | |
"model_name": entry.get("model_name", "Unknown Model"), | |
"model_type": entry.get("model_type", "Unknown"), | |
"guard_model_type": entry.get("guard_model_type", "Unknown"), | |
"submission_date": entry.get("submission_date", ""), | |
"version": entry.get("version", version), | |
"base_model": entry.get("base_model", ""), | |
"revision": entry.get("revision", ""), | |
"precision": entry.get("precision", ""), | |
"weight_type": entry.get("weight_type", "") | |
} | |
if "per_category_metrics" in entry and category in entry["per_category_metrics"]: | |
category_metrics = entry["per_category_metrics"][category] | |
# Add all metrics for each test type | |
for test_type, metrics in category_metrics.items(): | |
if isinstance(metrics, dict): | |
for metric, value in metrics.items(): | |
col_name = f"{test_type}_{metric}" | |
filtered_entry[col_name] = value | |
# Also add the non-binary version for F1 scores | |
if metric == "f1_binary": | |
filtered_entry[f"{test_type}_f1"] = value | |
# Calculate averages | |
f1_values = [] | |
recall_values = [] | |
precision_values = [] | |
accuracy_values = [] | |
category_recall_values = [] | |
total_samples = 0 | |
for test_type in ["default_prompts", "jailbreaked_prompts", "default_answers", "jailbreaked_answers"]: | |
if test_type in category_metrics and isinstance(category_metrics[test_type], dict): | |
test_metrics = category_metrics[test_type] | |
if "f1_binary" in test_metrics and pd.notna(test_metrics["f1_binary"]): | |
f1_values.append(test_metrics["f1_binary"]) | |
if "recall_binary" in test_metrics and pd.notna(test_metrics["recall_binary"]): | |
recall_values.append(test_metrics["recall_binary"]) | |
category_recall_values.append(test_metrics["recall_binary"]) | |
if "precision_binary" in test_metrics and pd.notna(test_metrics["precision_binary"]): | |
precision_values.append(test_metrics["precision_binary"]) | |
if "accuracy" in test_metrics and pd.notna(test_metrics["accuracy"]): | |
accuracy_values.append(test_metrics["accuracy"]) | |
if "sample_count" in test_metrics and pd.notna(test_metrics["sample_count"]): | |
total_samples += test_metrics["sample_count"] | |
# print(f"F1 values: {f1_values}") | |
# print(f1_values, recall_values, precision_values, accuracy_values, total_samples) | |
# Add overall averages | |
if f1_values: | |
filtered_entry["average_f1"] = sum(f1_values) / len(f1_values) | |
if recall_values: | |
filtered_entry["average_recall"] = sum(recall_values) / len(recall_values) | |
if precision_values: | |
filtered_entry["average_precision"] = sum(precision_values) / len(precision_values) | |
# Add category-specific values to standard macro metric keys | |
if accuracy_values: | |
filtered_entry["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) | |
else: | |
filtered_entry["macro_accuracy"] = np.nan | |
if category_recall_values: | |
filtered_entry["macro_recall"] = sum(category_recall_values) / len(category_recall_values) | |
else: | |
filtered_entry["macro_recall"] = np.nan | |
if total_samples > 0: | |
filtered_entry["total_evals_count"] = total_samples | |
else: | |
filtered_entry["total_evals_count"] = np.nan | |
filtered_entries.append(filtered_entry) | |
# Create a new leaderboard data structure with the filtered entries | |
filtered_leaderboard = { | |
"entries": filtered_entries, | |
"last_updated": leaderboard_data.get("last_updated", datetime.now().isoformat()), | |
"version": version | |
} | |
# print(filtered_leaderboard) | |
# Convert to DataFrame | |
return leaderboard_to_dataframe(filtered_leaderboard) | |
def get_detailed_model_data(model_name: str, version="v0") -> Dict: | |
""" | |
Get detailed data for a specific model. | |
""" | |
# Try to get model's entry directly first | |
entry = get_model_entry(model_name, version) | |
if entry: | |
return entry | |
# If no direct entry found, try looking in the leaderboard | |
leaderboard_data = get_latest_leaderboard(version) | |
if leaderboard_data: | |
for entry in leaderboard_data.get("entries", []): | |
if entry.get("model_name") == model_name: | |
return entry | |
return {} | |