Spaces:
Running
Running
import pandas as pd | |
import json | |
import numpy as np | |
# Define game order | |
GAME_ORDER = [ | |
# "Super Mario Bros", # Commented out | |
"Super Mario Bros", | |
"Sokoban", | |
"2048", | |
"Candy Crush", | |
# "Tetris (complete)", # Commented out | |
"Tetris", | |
"Ace Attorney" | |
] | |
def get_organization(model_name): | |
m = model_name.lower() | |
if "claude" in m: | |
return "anthropic" | |
elif "gemini" in m: | |
return "google" | |
elif "o1" in m or "gpt" in m or "o3" in m or "o4" in m: | |
return "openai" | |
elif "deepseek" in m: | |
return "deepseek" | |
elif "llama" in m: | |
return "meta" | |
elif "grok" in m: | |
return "xai" | |
else: | |
return "unknown" | |
def get_sokoban_leaderboard(rank_data, limit_to_top_n=None): | |
data = rank_data.get("Sokoban", {}).get("results", []) | |
df = pd.DataFrame(data) | |
df = df.rename(columns={ | |
"model": "Player", | |
"score": "Score", | |
"steps": "Steps", | |
"detail_box_on_target": "Detail Box On Target", | |
"cracked_levels": "Levels Cracked" | |
}) | |
df["Organization"] = df["Player"].apply(get_organization) | |
# Define columns to keep, ensuring 'Score' is present | |
columns_to_keep = ["Player", "Organization", "Score", "Levels Cracked", "Detail Box On Target", "Steps"] | |
# Filter to only columns that actually exist in the DataFrame after renaming | |
df_columns = [col for col in columns_to_keep if col in df.columns] | |
df = df[df_columns] | |
if "Score" in df.columns: | |
df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
df = df.sort_values("Score", ascending=False) | |
# Apply limit if specified | |
if limit_to_top_n is not None: | |
df = df.head(limit_to_top_n) | |
return df | |
def get_2048_leaderboard(rank_data, limit_to_top_n=None): | |
data = rank_data.get("2048", {}).get("results", []) | |
# --- Diagnostic Print Removed --- | |
# if data and isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): | |
# print(f"DEBUG_UTILS: Keys in first item of raw data for 2048: {list(data[0].keys())}") | |
# elif not data: | |
# print("DEBUG_UTILS: Raw data for 2048 is empty.") | |
# else: | |
# print("DEBUG_UTILS: Raw data for 2048 is not in the expected list of dicts format.") | |
# --- End Diagnostic Print Removed --- | |
df = pd.DataFrame(data) | |
# print(f"DEBUG_UTILS: Columns after pd.DataFrame(data): {df.columns.tolist()}") # REMOVED | |
df = df.rename(columns={ | |
"model": "Player", | |
"score": "Score", # From new JSON structure | |
"details": "Details", # From new JSON structure | |
"highest_tail": "Highest Tail" # Added new column | |
# Old fields like "steps", "time", "rank" are removed | |
}) | |
# print(f"DEBUG_UTILS: Columns after rename: {df.columns.tolist()}") # REMOVED | |
# Ensure 'Player' column exists before applying get_organization | |
if "Player" in df.columns: | |
df["Organization"] = df["Player"].apply(get_organization) | |
else: | |
# Handle case where 'Player' column might be missing after rename (should not happen with current logic) | |
# print("DEBUG_UTILS: 'Player' column not found after rename, skipping Organization.") # REMOVED | |
df["Organization"] = "unknown" # Fallback | |
columns_to_keep = ["Player", "Organization", "Score", "Highest Tail", "Details"] # Added "Highest Tail" | |
# Defensive check for 'Highest Tail' before filtering - REMOVED | |
# if 'highest_tail' in df.columns and 'Highest Tail' not in df.columns: | |
# print("DEBUG_UTILS: 'highest_tail' (lowercase) found, but 'Highest Tail' (capitalized) not. This indicates a rename issue.") | |
# elif 'Highest Tail' not in df.columns and 'highest_tail' not in df.columns: | |
# print("DEBUG_UTILS: Neither 'Highest Tail' nor 'highest_tail' found in columns before filtering.") | |
# df_columns = [col for col in columns_to_keep if col in df.columns] # REMOVED logic that used df_columns | |
# print(f"DEBUG_UTILS: df_columns selected (columns that are in columns_to_keep AND in df.columns): {df_columns}") # REMOVED | |
# Ensure all columns in columns_to_keep exist in df, fill with np.nan if not | |
for col_k in columns_to_keep: | |
if col_k not in df.columns: | |
# print(f"DEBUG_UTILS: Column '{col_k}' from columns_to_keep not found in DataFrame. Adding it with NaN values.") # REMOVED | |
df[col_k] = np.nan # Or some other default like 'n/a' if appropriate | |
df = df[columns_to_keep] # Use columns_to_keep directly after ensuring they exist | |
# print(f"DEBUG_UTILS: Columns after final selection: {df.columns.tolist()}") # REMOVED | |
if "Score" in df.columns: | |
df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
df = df.sort_values("Score", ascending=False) | |
# Apply limit if specified | |
if limit_to_top_n is not None: | |
df = df.head(limit_to_top_n) | |
return df | |
def get_candy_leaderboard(rank_data, limit_to_top_n=None): | |
data = rank_data.get("Candy Crush", {}).get("results", []) | |
df = pd.DataFrame(data) | |
df = df.rename(columns={ | |
"model": "Player", | |
"score": "Score", | |
"details": "Details" | |
}) | |
df["Organization"] = df["Player"].apply(get_organization) | |
columns_to_keep = ["Player", "Organization", "Score", "Details"] | |
df_columns = [col for col in columns_to_keep if col in df.columns] | |
df = df[df_columns] | |
if "Score" in df.columns: | |
df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
df = df.sort_values("Score", ascending=False) | |
# Apply limit if specified | |
if limit_to_top_n is not None: | |
df = df.head(limit_to_top_n) | |
return df | |
def get_tetris_planning_leaderboard(rank_data, limit_to_top_n=None): | |
data = rank_data.get("Tetris", {}).get("results", []) | |
df = pd.DataFrame(data) | |
df = df.rename(columns={ | |
"model": "Player", | |
"score": "Score", # From new JSON structure | |
"details": "Details" # From new JSON structure | |
# Old fields like "steps_blocks", "rank" are removed | |
}) | |
df["Organization"] = df["Player"].apply(get_organization) | |
columns_to_keep = ["Player", "Organization", "Score", "Details"] | |
df_columns = [col for col in columns_to_keep if col in df.columns] | |
df = df[df_columns] | |
if "Score" in df.columns: | |
df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
df = df.sort_values("Score", ascending=False) | |
# Apply limit if specified | |
if limit_to_top_n is not None: | |
df = df.head(limit_to_top_n) | |
return df | |
def get_ace_attorney_leaderboard(rank_data, limit_to_top_n=None): | |
data = rank_data.get("Ace Attorney", {}).get("results", []) | |
df = pd.DataFrame(data) | |
df = df.rename(columns={ | |
"model": "Player", | |
"score": "Score", | |
"progress": "Progress" | |
}) | |
df["Organization"] = df["Player"].apply(get_organization) | |
# Define columns to keep | |
columns_to_keep = ["Player", "Organization", "Score", "Progress"] | |
# Filter to only columns that actually exist in the DataFrame after renaming | |
df_columns = [col for col in columns_to_keep if col in df.columns] | |
df = df[df_columns] | |
if "Score" in df.columns: | |
df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
df = df.sort_values("Score", ascending=False) # Higher score is better | |
# Apply limit if specified | |
if limit_to_top_n is not None: | |
df = df.head(limit_to_top_n) | |
return df | |
def get_mario_planning_leaderboard(rank_data, limit_to_top_n=None): | |
data = rank_data.get("Super Mario Bros", {}).get("results", []) | |
df = pd.DataFrame(data) | |
df = df.rename(columns={ | |
"model": "Player", | |
"score": "Score", | |
"detail_data": "Detail Data", | |
"progress": "Progress" | |
}) | |
df["Organization"] = df["Player"].apply(get_organization) | |
# Define columns to keep | |
columns_to_keep = ["Player", "Organization", "Score", "Progress", "Detail Data"] | |
df_columns = [col for col in columns_to_keep if col in df.columns] | |
df = df[df_columns] | |
if "Score" in df.columns: | |
df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
df = df.sort_values("Score", ascending=False) | |
# Apply limit if specified | |
if limit_to_top_n is not None: | |
df = df.head(limit_to_top_n) | |
return df | |
def calculate_rank_and_completeness(rank_data, selected_games): | |
# Dictionary to store DataFrames for each game | |
game_dfs = {} | |
# Get DataFrames for selected games | |
# if selected_games.get("Super Mario Bros"): # Commented out | |
# game_dfs["Super Mario Bros"] = get_mario_leaderboard(rank_data) | |
if selected_games.get("Super Mario Bros"): | |
game_dfs["Super Mario Bros"] = get_mario_planning_leaderboard(rank_data) | |
if selected_games.get("Sokoban"): | |
game_dfs["Sokoban"] = get_sokoban_leaderboard(rank_data) | |
if selected_games.get("2048"): | |
game_dfs["2048"] = get_2048_leaderboard(rank_data) | |
if selected_games.get("Candy Crush"): | |
game_dfs["Candy Crush"] = get_candy_leaderboard(rank_data) | |
# if selected_games.get("Tetris (complete)"): # Commented out | |
# game_dfs["Tetris (complete)"] = get_tetris_leaderboard(rank_data) | |
if selected_games.get("Tetris"): | |
game_dfs["Tetris"] = get_tetris_planning_leaderboard(rank_data) | |
if selected_games.get("Ace Attorney"): | |
game_dfs["Ace Attorney"] = get_ace_attorney_leaderboard(rank_data) | |
# Get all unique players | |
all_players = set() | |
for df in game_dfs.values(): | |
all_players.update(df["Player"].unique()) | |
all_players = sorted(list(all_players)) | |
# Create results DataFrame | |
results = [] | |
for player in all_players: | |
player_data = { | |
"Player": player, | |
"Organization": get_organization(player) | |
} | |
ranks = [] | |
games_played = 0 | |
# Calculate rank and completeness for each game | |
for game in GAME_ORDER: | |
if game in game_dfs: | |
df = game_dfs[game] | |
if player in df["Player"].values: | |
games_played += 1 | |
# Get player's score based on game type | |
# if game == "Super Mario Bros": # Commented out | |
# player_score = df[df["Player"] == player]["Score"].iloc[0] | |
# rank = len(df[df["Score"] > player_score]) + 1 | |
if game == "Super Mario Bros": | |
player_score = df[df["Player"] == player]["Score"].iloc[0] | |
rank = len(df[df["Score"] > player_score]) + 1 | |
elif game == "Sokoban": | |
player_score = df[df["Player"] == player]["Score"].iloc[0] | |
rank = len(df[df["Score"] > player_score]) + 1 | |
elif game == "2048": | |
player_score = df[df["Player"] == player]["Score"].iloc[0] | |
rank = len(df[df["Score"] > player_score]) + 1 | |
elif game == "Candy Crush": | |
player_score = df[df["Player"] == player]["Score"].iloc[0] | |
rank = len(df[df["Score"] > player_score]) + 1 | |
elif game in ["Tetris"]: | |
player_score = df[df["Player"] == player]["Score"].iloc[0] | |
rank = len(df[df["Score"] > player_score]) + 1 | |
elif game == "Ace Attorney": | |
player_score = df[df["Player"] == player]["Score"].iloc[0] | |
rank = len(df[df["Score"] > player_score]) + 1 | |
ranks.append(rank) | |
player_data[f"{game} Score"] = player_score | |
else: | |
player_data[f"{game} Score"] = 'n/a' | |
# Calculate average rank and completeness for sorting | |
if ranks: | |
player_data["Average Rank"] = round(np.mean(ranks), 2) | |
player_data["Games Played"] = games_played | |
else: | |
player_data["Average Rank"] = float('inf') | |
player_data["Games Played"] = 0 | |
results.append(player_data) | |
# Create DataFrame and sort by average rank and completeness | |
df_results = pd.DataFrame(results) | |
if not df_results.empty: | |
# Sort by average rank (ascending) and games played (descending) | |
df_results = df_results.sort_values( | |
by=["Average Rank", "Games Played"], | |
ascending=[True, False] | |
) | |
# Drop the sorting columns | |
df_results = df_results.drop(["Average Rank", "Games Played"], axis=1) | |
return df_results | |
def get_combined_leaderboard(rank_data, selected_games, limit_to_top_n=None): | |
""" | |
Get combined leaderboard for selected games | |
Args: | |
rank_data (dict): Dictionary containing rank data | |
selected_games (dict): Dictionary of game names and their selection status | |
limit_to_top_n (int, optional): Limit results to top N entries. None means no limit. | |
Returns: | |
pd.DataFrame: Combined leaderboard DataFrame | |
""" | |
# Dictionary to store DataFrames for each game | |
game_dfs = {} | |
# Get DataFrames for selected games | |
# if selected_games.get("Super Mario Bros"): # Commented out | |
# game_dfs["Super Mario Bros"] = get_mario_leaderboard(rank_data) | |
if selected_games.get("Super Mario Bros"): | |
game_dfs["Super Mario Bros"] = get_mario_planning_leaderboard(rank_data) | |
if selected_games.get("Sokoban"): | |
game_dfs["Sokoban"] = get_sokoban_leaderboard(rank_data) | |
if selected_games.get("2048"): | |
game_dfs["2048"] = get_2048_leaderboard(rank_data) | |
if selected_games.get("Candy Crush"): | |
game_dfs["Candy Crush"] = get_candy_leaderboard(rank_data) | |
# if selected_games.get("Tetris (complete)"): # Commented out | |
# game_dfs["Tetris (complete)"] = get_tetris_leaderboard(rank_data) | |
if selected_games.get("Tetris"): | |
game_dfs["Tetris"] = get_tetris_planning_leaderboard(rank_data) | |
if selected_games.get("Ace Attorney"): | |
game_dfs["Ace Attorney"] = get_ace_attorney_leaderboard(rank_data) | |
# Get all unique players | |
all_players = set() | |
for df in game_dfs.values(): | |
all_players.update(df["Player"].unique()) | |
all_players = sorted(list(all_players)) | |
# Create results DataFrame | |
results = [] | |
for player in all_players: | |
player_data = { | |
"Player": player, | |
"Organization": get_organization(player) | |
} | |
# Add scores for each game | |
for game in GAME_ORDER: | |
if game in game_dfs: | |
df = game_dfs[game] | |
if player in df["Player"].values: | |
# if game == "Super Mario Bros": # Commented out | |
# player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
if game == "Super Mario Bros": | |
player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
elif game == "Sokoban": | |
player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
elif game == "2048": | |
player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
elif game == "Candy Crush": | |
player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
elif game in ["Tetris"]: | |
player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
elif game == "Ace Attorney": | |
player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
else: | |
player_data[f"{game} Score"] = 'n/a' | |
results.append(player_data) | |
# Create DataFrame | |
df_results = pd.DataFrame(results) | |
# Calculate normalized scores and average normalized score | |
if not df_results.empty: | |
# Import the normalize_values function from data_visualization | |
from data_visualization import normalize_values | |
# Calculate normalized scores for each game | |
game_score_columns = [] | |
for game in GAME_ORDER: | |
score_col = f"{game} Score" | |
if score_col in df_results.columns: | |
game_score_columns.append(score_col) | |
# Get numeric values, replacing 'n/a' with NaN | |
# Use where() to avoid FutureWarning about downcasting in replace() | |
series = df_results[score_col].copy() | |
series = series.where(series != 'n/a', np.nan) | |
numeric_scores = pd.to_numeric(series, errors='coerce') | |
# Skip games where all scores are NaN or 0 | |
valid_scores = numeric_scores.dropna() | |
if len(valid_scores) > 0 and valid_scores.sum() > 0: | |
mean = valid_scores.mean() | |
std = valid_scores.std() if len(valid_scores) > 1 else 0 | |
# Calculate normalized scores for all players | |
normalized_scores = [] | |
for _, row in df_results.iterrows(): | |
score = row[score_col] | |
if score == 'n/a' or pd.isna(score): | |
normalized_scores.append(0) | |
else: | |
normalized_scores.append(normalize_values([float(score)], mean, std)[0]) | |
df_results[f"norm_{score_col}"] = normalized_scores | |
else: | |
# If no valid scores, set all normalized scores to 0 | |
df_results[f"norm_{score_col}"] = 0 | |
# Calculate average normalized score across games | |
normalized_columns = [f"norm_{col}" for col in game_score_columns if f"norm_{col}" in df_results.columns] | |
if normalized_columns: | |
df_results["Avg Normalized Score"] = df_results[normalized_columns].mean(axis=1).round(2) | |
else: | |
df_results["Avg Normalized Score"] = 0.0 | |
# Reorder columns to put Avg Normalized Score after Organization | |
base_columns = ["Player", "Organization", "Avg Normalized Score"] | |
game_columns = [col for col in df_results.columns if col.endswith(" Score") and not col.startswith("norm_") and col != "Avg Normalized Score"] | |
other_columns = [col for col in df_results.columns if col not in base_columns + game_columns and not col.startswith("norm_")] | |
# Create final column order | |
final_columns = base_columns + game_columns + other_columns | |
df_results = df_results[final_columns] | |
# Sort by average normalized score in descending order | |
df_results = df_results.sort_values("Avg Normalized Score", ascending=False) | |
# Apply limit if specified | |
if limit_to_top_n is not None: | |
df_results = df_results.head(limit_to_top_n) | |
return df_results | |