#!/usr/bin/env python3
"""
Interactive Benchmark Explorer
A comprehensive web application for exploring OpenThoughts benchmark correlations and model performance
"""
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import pearsonr, kendalltau
from scipy.optimize import minimize
import ast
import io
import base64
from itertools import combinations
import warnings
import time
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.cluster import KMeans
from sklearn.metrics import r2_score, mean_absolute_error
from scipy.optimize import curve_fit
import re
warnings.filterwarnings('ignore')
# Configure page
st.set_page_config(
page_title="OpenThoughts Evalchemy Benchmark Explorer",
page_icon="๐",
layout="wide",
initial_sidebar_state="expanded"
)
# Custom CSS for better styling
st.markdown("""
""", unsafe_allow_html=True)
@st.cache_data
def load_trusted_models():
"""Load and parse trusted models from CSV file"""
try:
df = pd.read_csv('trusted_models.csv')
trusted_models = []
experiment_codes = []
for idx, row in df.iterrows():
model_name = str(row['Model Name']).strip()
if model_name and model_name != 'nan' and model_name not in ['LOW PRIORITY EVALS', 'Experiment Name']:
trusted_models.append(model_name)
# Also track experiment codes (patterns like a1_, b2_, etc.)
if re.match(r'^[a-z]\d+_', model_name):
experiment_codes.append(model_name)
print(f"Loaded {len(trusted_models)} trusted models ({len(experiment_codes)} experiment codes)")
return trusted_models, experiment_codes
except Exception as e:
print(f"Error loading trusted models: {e}")
return [], []
def extract_experiment_pattern(model_name):
"""Extract experiment pattern from mlfoundations-dev model names"""
if not model_name.startswith('mlfoundations-dev/'):
return None
suffix = model_name[len('mlfoundations-dev/'):]
# Look for patterns like b2_math_difficulty_0.3k_eval_636d
match = re.match(r'^([a-z]\d+_[^_]+(?:_[^_]+)*)', suffix)
if match:
return match.group(1)
# Look for direct experiment code matches
match = re.match(r'^([a-z]\d+_[a-zA-Z_]+)', suffix)
if match:
return match.group(1)
return None
def filter_trusted_models(df, trusted_models_data):
"""Filter dataframe to only include trusted models with enhanced experiment matching"""
if not trusted_models_data:
return df
# Unpack the data
if isinstance(trusted_models_data, tuple):
trusted_models, experiment_codes = trusted_models_data
else:
# Backward compatibility
trusted_models = trusted_models_data
experiment_codes = [m for m in trusted_models if re.match(r'^[a-z]\d+_', m)]
trusted_set = set(trusted_models)
experiment_set = set(experiment_codes)
def is_trusted_model(model_name):
# Direct exact match
if model_name in trusted_set:
return True
# Handle format conversion: __ in trusted models vs / in dataset
# Convert model_name format to match trusted models format
model_name_converted = model_name.replace('/', '__')
if model_name_converted in trusted_set:
return True
# Convert trusted models format to match dataset format
for trusted in trusted_models:
trusted_converted = trusted.replace('__', '/')
if model_name == trusted_converted:
return True
# Case-insensitive partial matching for regular models
model_lower = model_name.lower()
for trusted in trusted_models:
trusted_lower = trusted.lower()
# Also check converted formats in case-insensitive matching
trusted_converted_lower = trusted.replace('__', '/').lower()
model_converted_lower = model_name.replace('/', '__').lower()
# Flexible matching with minimum length requirements
if len(trusted_lower) >= 5 and trusted_lower in model_lower:
return True
if len(model_lower) >= 5 and model_lower in trusted_lower:
return True
if len(trusted_converted_lower) >= 5 and trusted_converted_lower in model_lower:
return True
if len(model_converted_lower) >= 5 and model_converted_lower in trusted_lower:
return True
# Check core model names (after / or __)
if ('/' in model_name or '__' in model_name) and ('/' in trusted or '__' in trusted):
# Extract core names using both separators
model_core = model_name.replace('__', '/').split('/')[-1].lower()
trusted_core = trusted.replace('__', '/').split('/')[-1].lower()
if len(model_core) >= 3 and len(trusted_core) >= 3:
if model_core in trusted_core or trusted_core in model_core:
return True
# Experiment code matching for mlfoundations-dev models
if model_name.startswith('mlfoundations-dev/'):
pattern = extract_experiment_pattern(model_name)
if pattern:
# Try exact match first
if pattern in experiment_set:
return True
# Try partial matches with experiment codes
for exp_code in experiment_codes:
if pattern.startswith(exp_code) or exp_code.startswith(pattern):
return True
return False
# Filter models
trusted_indices = [idx for idx in df.index if is_trusted_model(idx)]
filtered_df = df.loc[trusted_indices]
return filtered_df if len(filtered_df) > 0 else df
@st.cache_data
def load_comprehensive_data(use_trusted_filter=True):
"""Load and clean the comprehensive benchmark data."""
try:
# Use explicit encoding and error handling
df = pd.read_csv("max_comprehensive_benchmark_scores.csv", index_col=0, encoding='utf-8')
# Clean the data - handle list-like values stored as strings
# Process in batches for better performance with large datasets
total_cols = len(df.columns)
if total_cols > 20:
# Show progress for large datasets
progress_text = st.empty()
progress_bar = st.progress(0)
for i, col in enumerate(df.columns):
if total_cols > 20:
progress_text.text(f"Processing column {i+1}/{total_cols}: {col}")
progress_bar.progress((i+1) / total_cols)
def extract_value(x):
if pd.isna(x):
return np.nan
if isinstance(x, str) and x.startswith('['):
try:
parsed = ast.literal_eval(x)
if isinstance(parsed, list) and len(parsed) > 0:
return float(parsed[0]) # Ensure float type
else:
return np.nan
except (ValueError, SyntaxError):
return np.nan
try:
return float(x) # Ensure numeric values are float
except (ValueError, TypeError):
return np.nan
df[col] = df[col].apply(extract_value)
df[col] = pd.to_numeric(df[col], errors='coerce')
if total_cols > 20:
progress_text.empty()
progress_bar.empty()
# Filter to trusted models only if requested
if use_trusted_filter:
trusted_models_data = load_trusted_models()
df = filter_trusted_models(df, trusted_models_data)
# Filter to only models that have data for at least a few benchmarks
min_benchmarks = 3
df_filtered = df.dropna(thresh=min_benchmarks, axis=0)
# Ensure we have some data
if len(df_filtered) == 0:
st.error("No models found with sufficient benchmark data.")
return pd.DataFrame()
return df_filtered
except FileNotFoundError:
st.error("Could not find max_comprehensive_benchmark_scores.csv. Please ensure the data file exists.")
return pd.DataFrame()
except Exception as e:
st.error(f"Error loading data: {str(e)}")
return pd.DataFrame()
@st.cache_data
def load_stderr_data(use_trusted_filter=True):
"""Load and clean standard error data."""
try:
stderr_df = pd.read_csv("max_benchmark_standard_errors.csv", index_col=0, encoding='utf-8')
# Clean the data
for col in stderr_df.columns:
def extract_value(x):
if pd.isna(x):
return np.nan
if isinstance(x, str) and x.startswith('['):
try:
parsed = ast.literal_eval(x)
if isinstance(parsed, list) and len(parsed) > 0:
return float(parsed[0]) # Ensure float type
else:
return np.nan
except (ValueError, SyntaxError):
return np.nan
try:
return float(x) # Ensure numeric values are float
except (ValueError, TypeError):
return np.nan
stderr_df[col] = stderr_df[col].apply(extract_value)
stderr_df[col] = pd.to_numeric(stderr_df[col], errors='coerce')
# Filter to trusted models only if requested
if use_trusted_filter:
trusted_models_data = load_trusted_models()
stderr_df = filter_trusted_models(stderr_df, trusted_models_data)
return stderr_df
except FileNotFoundError:
st.warning("Could not find max_benchmark_standard_errors.csv. Standard error analysis will be limited.")
return pd.DataFrame()
except Exception as e:
st.warning(f"Error loading standard error data: {str(e)}")
return pd.DataFrame()
def clean_benchmark_name(name):
"""Clean benchmark names for consistent display."""
return (name.replace("LiveCodeBench_accuracy_avg", "LiveCodeBenchv2")
.replace('_accuracy_avg', '')
.replace('_accuracy', '')
.replace('LiveCodeBench', 'LCB')
.replace('GPQADiamond', 'GPQAD')
)
def get_focused_benchmark_mapping():
"""Define the target benchmarks and categories."""
target_benchmarks = {
# Math benchmarks
'AIME24': 'AIME24_accuracy_avg',
'AIME25': 'AIME25_accuracy_avg',
'AMC23': 'AMC23_accuracy_avg',
'MATH500': 'MATH500_accuracy',
# Code benchmarks
'CodeElo': 'CodeElo_accuracy_avg',
'CodeForces': 'CodeForces_accuracy_avg',
'LCBv2': 'LiveCodeBench_accuracy_avg',
'LCBv5': 'LiveCodeBenchv5_accuracy_avg',
# Science benchmarks
'GPQADiamond': 'GPQADiamond_accuracy_avg',
'JEEBench': 'JEEBench_accuracy_avg',
# General benchmarks
'MMLUPro': 'MMLUPro_accuracy_avg',
'HLE': 'HLE_accuracy_avg'
}
benchmark_categories = {
'Math': ['AIME24', 'AIME25', 'AMC23', 'MATH500'],
'Code': ['CodeElo', 'CodeForces', 'LCBv2', 'LCBv5'],
'Science': ['GPQADiamond', 'JEEBench'],
'General': ['MMLUPro', 'HLE']
}
colors = {'Math': '#d73027', 'Code': '#1f78b4', 'Science': '#33a02c', 'General': '#ff7f00'}
# Create reverse mapping
col_to_category = {}
for category, bench_list in benchmark_categories.items():
for bench_name in bench_list:
actual_name = target_benchmarks.get(bench_name)
if actual_name:
col_to_category[actual_name] = category
return target_benchmarks, benchmark_categories, colors, col_to_category
def compute_correlations(df, method='kendall'):
"""Compute correlation matrix using specified method."""
if method == 'pearson':
return df.corr(method='pearson')
elif method == 'kendall':
return df.corr(method='kendall')
else:
raise ValueError(f"Unsupported correlation method: {method}")
def create_interactive_heatmap(corr_matrix, title="Correlation Heatmap"):
"""Create an interactive correlation heatmap using Plotly."""
target_benchmarks, benchmark_categories, colors, col_to_category = get_focused_benchmark_mapping()
# Get clean names for display
clean_names = [clean_benchmark_name(name) for name in corr_matrix.columns]
# Convert to percentages for display
corr_matrix_pct = (corr_matrix * 100).round(1)
# Create hover text
hover_text = []
for i, bench1 in enumerate(corr_matrix.columns):
hover_row = []
for j, bench2 in enumerate(corr_matrix.columns):
if i == j:
hover_row.append(f"{clean_names[i]}
Correlation: 100%")
else:
corr_val = corr_matrix_pct.iloc[i, j]
if pd.isna(corr_val):
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
No data")
else:
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
Correlation: {corr_val:.1f}%")
hover_text.append(hover_row)
# Create the heatmap
fig = go.Figure(data=go.Heatmap(
z=corr_matrix.values,
x=clean_names,
y=clean_names,
colorscale='RdBu_r',
zmid=0,
text=corr_matrix_pct.values,
texttemplate="%{text}",
textfont={"size": 10},
hoverinfo='text',
hovertext=hover_text,
colorbar=dict(title="Correlation", tickformat=".2f")
))
# Update layout
fig.update_layout(
title=title,
xaxis_title="",
yaxis_title="",
width=800,
height=800,
font=dict(size=12)
)
# Color the axis labels by category
for i, bench in enumerate(corr_matrix.columns):
category = col_to_category.get(bench, 'Unknown')
color = colors.get(category, 'black')
return fig
def create_scatter_plot(df, x_bench, y_bench, stderr_df=None):
"""Create an interactive scatter plot between two benchmarks."""
if x_bench not in df.columns or y_bench not in df.columns:
return None
# Get common data
common_data = df[[x_bench, y_bench]].dropna()
if len(common_data) < 3:
return None
x_vals = common_data[x_bench]
y_vals = common_data[y_bench]
# Calculate correlation
corr, p_val = pearsonr(x_vals, y_vals)
# Create figure
fig = go.Figure()
# Add scatter points
fig.add_trace(go.Scatter(
x=x_vals,
y=y_vals,
mode='markers',
text=common_data.index,
hovertemplate=(
"%{text}
" +
f"{clean_benchmark_name(x_bench)}: %{{x:.3f}}
" +
f"{clean_benchmark_name(y_bench)}: %{{y:.3f}}
" +
""
),
marker=dict(size=8, opacity=0.7, color='steelblue')
))
# Add regression line
z = np.polyfit(x_vals, y_vals, 1)
p = np.poly1d(z)
x_line = np.linspace(x_vals.min(), x_vals.max(), 100)
# Format p-value appropriately
if p_val < 0.001:
p_str = f"p < 0.001"
else:
p_str = f"p = {p_val:.3f}"
fig.add_trace(go.Scatter(
x=x_line,
y=p(x_line),
mode='lines',
name=f'r = {corr:.3f}, {p_str}',
line=dict(color='red', dash='dash')
))
# Update layout
fig.update_layout(
title=f"{clean_benchmark_name(y_bench)} vs {clean_benchmark_name(x_bench)}",
xaxis_title=clean_benchmark_name(x_bench),
yaxis_title=clean_benchmark_name(y_bench),
showlegend=True,
width=600,
height=500
)
return fig
def filter_target_benchmarks(df):
"""Filter dataframe to only include target benchmarks."""
target_benchmarks, _, _, _ = get_focused_benchmark_mapping()
available_benchmarks = []
for display_name, actual_name in target_benchmarks.items():
if actual_name in df.columns:
available_benchmarks.append(actual_name)
return df[available_benchmarks].copy()
@st.cache_data
def estimate_missing_ranks(df, method='kendall', min_corr=0.1, min_benchmarks=2, _version="v2_fixed_ranking"):
"""
Estimate missing ranks using rank correlation.
Now ensures ALL missing values are filled.
Parameters:
-----------
df: DataFrame
Input data with missing values
method: Rank correlation method ('kendall')
min_corr: float
Minimum correlation threshold for using a benchmark (lowered to 0.1)
min_benchmarks: int
Minimum number of benchmarks needed for estimation (lowered to 2)
_version: str
Version parameter to force cache invalidation when ranking logic changes
"""
# Convert to ranks (higher scores get better/lower ranks)
df_ranks = df.rank(ascending=False)
# Compute rank correlation matrix
if method == 'kendall':
rank_corr_matrix = df_ranks.corr(method='kendall')
else:
raise ValueError(f"Unsupported correlation method: {method}")
# Pre-compute correlation thresholds to avoid repeated calculations
valid_correlations = {}
for benchmark in df.columns:
valid_correlations[benchmark] = []
for other_bench in df.columns:
if benchmark != other_bench:
corr_val = rank_corr_matrix.loc[benchmark, other_bench]
if not pd.isna(corr_val) and abs(corr_val) >= min_corr:
valid_correlations[benchmark].append((other_bench, abs(corr_val)))
# Sort by correlation strength for better prediction
valid_correlations[benchmark].sort(key=lambda x: x[1], reverse=True)
# For each model and benchmark combination with missing data
missing_count = 0
total_missing = df_ranks.isna().sum().sum()
for model_idx in df.index:
available_benchmarks = df_ranks.columns[df_ranks.loc[model_idx].notna()].tolist()
if len(available_benchmarks) >= min_benchmarks:
for benchmark in df.columns:
if pd.isna(df_ranks.loc[model_idx, benchmark]):
# Get pre-computed valid correlations for this benchmark
valid_pairs = valid_correlations[benchmark]
correlations = []
ranks = []
# First try: use correlations above min_corr threshold
for other_bench, corr_strength in valid_pairs:
if other_bench in available_benchmarks:
correlations.append(corr_strength)
ranks.append(df_ranks.loc[model_idx, other_bench])
# Use more benchmarks for better estimation
if len(correlations) >= 8:
break
# If no good correlations found, use ANY available benchmarks
if len(correlations) == 0:
for other_bench in available_benchmarks:
if other_bench != benchmark:
corr_val = rank_corr_matrix.loc[benchmark, other_bench]
if not pd.isna(corr_val):
correlations.append(max(0.01, abs(corr_val))) # Minimum weight
ranks.append(df_ranks.loc[model_idx, other_bench])
# If still no correlations, use simple average of available ranks
if len(correlations) == 0:
available_ranks = [df_ranks.loc[model_idx, bench] for bench in available_benchmarks]
if available_ranks:
estimated_rank = np.mean(available_ranks)
df_ranks.loc[model_idx, benchmark] = estimated_rank
missing_count += 1
else:
# Weighted average of ranks using correlations as weights
correlations = np.array(correlations)
ranks = np.array(ranks)
# Normalize weights
weights = correlations / correlations.sum()
estimated_rank = np.average(ranks, weights=weights)
df_ranks.loc[model_idx, benchmark] = estimated_rank
missing_count += 1
return df_ranks
@st.cache_data
def create_consensus_ranking(df, method='kendall', use_rank_imputation=True, min_benchmarks_for_ranking=6):
"""
Create a consensus ranking using rank correlation-based estimation.
Parameters:
-----------
df: DataFrame
Input data with models as rows and benchmarks as columns
method: str
Correlation method for rank imputation ('kendall')
use_rank_imputation: bool
Whether to use rank imputation for missing values
min_benchmarks_for_ranking: int
Minimum number of original benchmarks required for a model to be included in ranking
Returns:
tuple: (ranking_df, rank_matrix, metadata)
"""
# Filter models to only include those with sufficient benchmark coverage
original_coverage = df.notna().sum(axis=1)
models_with_sufficient_data = original_coverage[original_coverage >= min_benchmarks_for_ranking].index
if len(models_with_sufficient_data) == 0:
# If no models meet the criteria, lower the threshold
min_benchmarks_for_ranking = max(1, original_coverage.max() // 2)
models_with_sufficient_data = original_coverage[original_coverage >= min_benchmarks_for_ranking].index
# Filter dataframe to only include models with sufficient data
df_filtered = df.loc[models_with_sufficient_data]
if use_rank_imputation:
# Estimate missing ranks
df_ranks = estimate_missing_ranks(df_filtered, method)
# Calculate consensus rank for each model (median rank across all benchmarks)
consensus_ranks = df_ranks.median(axis=1, skipna=True)
# Calculate coverage and estimation statistics
original_coverage_filtered = df_filtered.notna().sum(axis=1)
imputed_coverage = df_ranks.notna().sum(axis=1)
estimated_count = imputed_coverage - original_coverage_filtered
# Create ranking dataframe
ranking_data = []
for model in df_filtered.index:
ranking_data.append({
'Model': model.split('/')[-1] if '/' in model else model,
'Full_Model_Name': model,
'Consensus_Rank': float(consensus_ranks[model]),
'Original_Benchmarks': int(original_coverage_filtered[model]),
'Total_Benchmarks': len(df_filtered.columns), # Always 12 since we fill all missing values
'Estimated_Ranks': int(estimated_count[model]),
'Coverage_Pct': float(original_coverage_filtered[model] / len(df_filtered.columns) * 100)
})
ranking_df = pd.DataFrame(ranking_data).sort_values('Consensus_Rank', ascending=True) # Lower rank = better
metadata = {
'method': method,
'imputation_used': True,
'total_estimates': int(estimated_count.sum()),
'models_with_estimates': int((estimated_count > 0).sum()),
'ranking_method': 'consensus_rank',
'min_benchmarks_required': min_benchmarks_for_ranking,
'models_filtered_out': len(df) - len(df_filtered),
'total_benchmarks': len(df_filtered.columns)
}
else:
# Simple ranking based on available data only
df_ranks = df_filtered.rank(method='min', ascending=False, na_option='keep')
median_ranks = df_ranks.median(axis=1, skipna=True)
ranking_data = []
for model in df_filtered.index:
ranking_data.append({
'Model': model.split('/')[-1] if '/' in model else model,
'Full_Model_Name': model,
'Consensus_Rank': float(median_ranks[model]),
'Original_Benchmarks': int(df_filtered.notna().sum(axis=1)[model]),
'Total_Benchmarks': int(df_filtered.notna().sum(axis=1)[model]),
'Estimated_Ranks': 0,
'Coverage_Pct': float(df_filtered.notna().sum(axis=1)[model] / len(df_filtered.columns) * 100)
})
ranking_df = pd.DataFrame(ranking_data).sort_values('Consensus_Rank', ascending=True)
metadata = {
'method': 'none',
'imputation_used': False,
'total_estimates': 0,
'models_with_estimates': 0,
'ranking_method': 'median_rank',
'min_benchmarks_required': min_benchmarks_for_ranking,
'models_filtered_out': len(df) - len(df_filtered),
'total_benchmarks': len(df_filtered.columns)
}
return ranking_df, df_ranks, metadata
@st.cache_data
def create_optimized_radar_chart(df_display, selected_models, selected_benchmarks_for_radar):
"""Create an optimized radar chart for the selected models and benchmarks."""
if not selected_benchmarks_for_radar or not selected_models:
return None
# Pre-filter data to only what we need
filtered_data = df_display.loc[selected_models, selected_benchmarks_for_radar]
clean_benchmark_names = [clean_benchmark_name(b) for b in selected_benchmarks_for_radar]
# Define colors for different models
colors_list = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd',
'#8c564b', '#e377c2', '#7f7f7f', '#bcbd22', '#17becf']
fig = go.Figure()
# Calculate dynamic range for better visualization
all_values = filtered_data.values.flatten()
all_values = all_values[~pd.isna(all_values)]
if len(all_values) > 0:
min_val = float(np.min(all_values))
max_val = float(np.max(all_values))
# Add some padding
range_padding = (max_val - min_val) * 0.1
radar_min = max(0, min_val - range_padding)
radar_max = min(1, max_val + range_padding)
else:
radar_min, radar_max = 0, 1
for i, model in enumerate(selected_models):
# Get model data for selected benchmarks only
model_scores = []
for benchmark in selected_benchmarks_for_radar:
score = filtered_data.loc[model, benchmark]
# Convert to float, use 0.0 for any remaining NaN values
model_scores.append(0.0 if pd.isna(score) else float(score))
# Close the radar chart by adding the first value at the end
radar_values = model_scores + [model_scores[0]]
radar_benchmarks = clean_benchmark_names + [clean_benchmark_names[0]]
# Create model name for legend (remove path prefix if present)
model_display_name = model.split('/')[-1] if '/' in model else model
# Use color from list, cycling if needed
model_color = colors_list[i % len(colors_list)]
fig.add_trace(go.Scatterpolar(
r=radar_values,
theta=radar_benchmarks,
fill='toself',
name=model_display_name,
line_color=model_color,
hovertemplate='%{theta}
Score: %{r:.3f}'
))
# Adjust chart size based on number of models
chart_height = 600 if len(selected_models) <= 3 else 700
fig.update_layout(
polar=dict(
radialaxis=dict(
visible=True,
range=[radar_min, radar_max],
tickformat='.2f'
)),
showlegend=True,
title=f"Model Performance Radar Chart ({len(selected_benchmarks_for_radar)} benchmarks, {len(selected_models)} models)",
width=700,
height=chart_height
)
return fig
def weighted_correlation(x, y, weights):
"""Compute weighted Pearson correlation coefficient."""
# Remove NaN values
valid_mask = ~(np.isnan(x) | np.isnan(y) | np.isnan(weights))
if valid_mask.sum() < 3:
return np.nan, np.nan
x_clean = x[valid_mask]
y_clean = y[valid_mask]
w_clean = weights[valid_mask]
# Weighted means
x_mean = np.average(x_clean, weights=w_clean)
y_mean = np.average(y_clean, weights=w_clean)
# Weighted covariance and variances
cov = np.average((x_clean - x_mean) * (y_clean - y_mean), weights=w_clean)
var_x = np.average((x_clean - x_mean)**2, weights=w_clean)
var_y = np.average((y_clean - y_mean)**2, weights=w_clean)
# Weighted correlation
if var_x == 0 or var_y == 0:
return np.nan, np.nan
corr = cov / np.sqrt(var_x * var_y)
# Approximate degrees of freedom for weighted data
# Using effective sample size approximation
sum_w = np.sum(w_clean)
sum_w2 = np.sum(w_clean**2)
eff_n = sum_w**2 / sum_w2
# Standard error of correlation (approximate)
if eff_n > 3:
from scipy.stats import t
se_corr = np.sqrt((1 - corr**2) / (eff_n - 2))
t_stat = corr / se_corr
p_value = 2 * (1 - t.cdf(abs(t_stat), eff_n - 2))
else:
p_value = np.nan
return corr, p_value
def match_scores_with_stderr(scores_df, stderr_df, target_benchmarks):
"""Match score columns with their corresponding stderr columns."""
target_benchmarks_dict, benchmark_categories, colors, col_to_category = get_focused_benchmark_mapping()
score_to_stderr_mapping = {}
# Look for stderr matches with various naming patterns
for col in target_benchmarks:
stderr_col = None
# Try different naming patterns in order of preference
potential_stderr_names = [
f"{col}_std_err", # Direct match
f"{col.replace('_accuracy', '_accuracy_std_err')}", # Handle _accuracy vs _accuracy_avg
f"{col.replace('_accuracy_avg', '_accuracy_std_err')}", # Handle _accuracy_avg
]
# Special handling for MATH500 and other variations
if col == 'MATH500_accuracy':
potential_stderr_names.extend([
'MATH500x2_accuracy_std_err',
'MATH500_accuracy_std_err'
])
# Add 'x2' variants for all benchmarks (in case there are other x2 versions)
base_name = col.replace('_accuracy_avg', '').replace('_accuracy', '')
potential_stderr_names.extend([
f"{base_name}x2_accuracy_std_err",
f"{base_name}_accuracy_std_err"
])
# Find the first matching column with sufficient data
for stderr_name in potential_stderr_names:
if stderr_name in stderr_df.columns:
# Check if there's sufficient data (at least 10 models)
non_null_count = stderr_df[stderr_name].notna().sum()
if non_null_count >= 10:
stderr_col = stderr_name
break
if stderr_col:
score_to_stderr_mapping[col] = stderr_col
return score_to_stderr_mapping
def create_uncertainty_aware_correlation_matrix(scores_df, stderr_df, score_to_stderr_mapping):
"""Create correlation matrix accounting for measurement uncertainties."""
target_benchmarks, benchmark_categories, colors, col_to_category = get_focused_benchmark_mapping()
benchmarks = list(score_to_stderr_mapping.keys())
n_benchmarks = len(benchmarks)
# Initialize matrices
corr_matrix = np.full((n_benchmarks, n_benchmarks), np.nan)
pvalue_matrix = np.full((n_benchmarks, n_benchmarks), np.nan)
weighted_corr_matrix = np.full((n_benchmarks, n_benchmarks), np.nan)
weighted_pvalue_matrix = np.full((n_benchmarks, n_benchmarks), np.nan)
for i, bench1 in enumerate(benchmarks):
for j, bench2 in enumerate(benchmarks):
if i == j:
# Diagonal: compute reliability coefficient
stderr_col = score_to_stderr_mapping[bench1]
# Has actual stderr data
# reliability = 1 - (measurement_error_variance / total_variance)
scores = scores_df[bench1].dropna()
stderrs = stderr_df[stderr_col].dropna()
# Align data
common_idx = scores.index.intersection(stderrs.index)
if len(common_idx) >= 3:
aligned_scores = scores.loc[common_idx]
aligned_stderrs = stderrs.loc[common_idx]
# Total variance in observed scores
total_variance = aligned_scores.var()
# Mean measurement error variance
mean_error_variance = (aligned_stderrs**2).mean()
# Reliability = proportion of total variance that is "true" variance
if total_variance > 0:
reliability = max(0, 1 - (mean_error_variance / total_variance))
# For regular correlation, we still use 1.0 (mathematical definition)
corr_matrix[i, j] = 1.0
pvalue_matrix[i, j] = 0.0
# For weighted correlation, use reliability coefficient
weighted_corr_matrix[i, j] = reliability
weighted_pvalue_matrix[i, j] = 0.0
else:
corr_matrix[i, j] = 1.0
weighted_corr_matrix[i, j] = 0.0
pvalue_matrix[i, j] = 0.0
weighted_pvalue_matrix[i, j] = 0.0
else:
# Insufficient data
corr_matrix[i, j] = 1.0
weighted_corr_matrix[i, j] = np.nan
pvalue_matrix[i, j] = 0.0
weighted_pvalue_matrix[i, j] = np.nan
continue
# Get common valid data
# First, align the dataframes by common index
common_idx = scores_df.index.intersection(stderr_df.index)
x = scores_df.loc[common_idx, bench1].values
y = scores_df.loc[common_idx, bench2].values
# Get standard errors
stderr1_col = score_to_stderr_mapping[bench1]
stderr2_col = score_to_stderr_mapping[bench2]
# Standard (unweighted) correlation
valid_mask = ~(np.isnan(x) | np.isnan(y))
if valid_mask.sum() >= 3:
corr, p_val = pearsonr(x[valid_mask], y[valid_mask])
corr_matrix[i, j] = corr
pvalue_matrix[i, j] = p_val
# Weighted correlation
stderr1 = stderr_df.loc[common_idx, stderr1_col].values
stderr2 = stderr_df.loc[common_idx, stderr2_col].values
# Weighted correlation using inverse variance weighting
# Weight = 1 / (stderr1^2 + stderr2^2) - accounting for error in both variables
valid_stderr_mask = ~(np.isnan(stderr1) | np.isnan(stderr2)) & valid_mask
if valid_stderr_mask.sum() >= 3:
combined_variance = stderr1[valid_stderr_mask]**2 + stderr2[valid_stderr_mask]**2
# Avoid division by zero
weights = np.where(combined_variance > 0, 1.0 / combined_variance, 0)
if weights.sum() > 0:
w_corr, w_p_val = weighted_correlation(
x[valid_stderr_mask],
y[valid_stderr_mask],
weights
)
weighted_corr_matrix[i, j] = w_corr
weighted_pvalue_matrix[i, j] = w_p_val
else:
# Use regular correlation for weighted matrix too
if valid_mask.sum() >= 3:
weighted_corr_matrix[i, j] = corr_matrix[i, j]
weighted_pvalue_matrix[i, j] = pvalue_matrix[i, j]
# Convert to DataFrames
corr_df = pd.DataFrame(corr_matrix, index=benchmarks, columns=benchmarks)
pvalue_df = pd.DataFrame(pvalue_matrix, index=benchmarks, columns=benchmarks)
weighted_corr_df = pd.DataFrame(weighted_corr_matrix, index=benchmarks, columns=benchmarks)
weighted_pvalue_df = pd.DataFrame(weighted_pvalue_matrix, index=benchmarks, columns=benchmarks)
return corr_df, pvalue_df, weighted_corr_df, weighted_pvalue_df
def create_uncertainty_weighted_heatmap_plotly(weighted_corr_df, title_prefix="Uncertainty-Weighted Correlation Analysis"):
"""Create a single uncertainty-weighted heatmap using Plotly."""
target_benchmarks, benchmark_categories, colors, col_to_category = get_focused_benchmark_mapping()
# Get clean names for display
clean_names = [clean_benchmark_name(name) for name in weighted_corr_df.columns]
# Weighted correlation heatmap
weighted_corr_pct = (weighted_corr_df * 100).round(1)
# Create hover text for weighted correlations
hover_text_weighted = []
for i, bench1 in enumerate(weighted_corr_df.columns):
hover_row = []
for j, bench2 in enumerate(weighted_corr_df.columns):
if i == j:
reliability = weighted_corr_df.iloc[i, j]
if pd.isna(reliability):
hover_row.append(f"{clean_names[i]}
Reliability: Unknown")
else:
hover_row.append(f"{clean_names[i]}
Reliability: {reliability*100:.1f}%")
else:
corr_val = weighted_corr_pct.iloc[i, j]
if pd.isna(corr_val):
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
No weighted data")
else:
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
Weighted correlation: {corr_val:.1f}%")
hover_text_weighted.append(hover_row)
# Create the heatmap
fig = go.Figure(data=go.Heatmap(
z=weighted_corr_df.values,
x=clean_names,
y=clean_names,
colorscale='RdBu_r',
zmid=0,
text=weighted_corr_pct.values,
texttemplate="%{text}",
textfont={"size": 10},
hoverinfo='text',
hovertext=hover_text_weighted,
colorbar=dict(title="Correlation")
))
# Update layout
fig.update_layout(
title=f"{title_prefix}
Diagonal shows reliability coefficients (signal-to-noise ratios)",
width=800,
height=700,
font=dict(size=12),
xaxis=dict(tickangle=45),
yaxis=dict(tickangle=0)
)
return fig
def create_uncertainty_aware_heatmap_plotly(corr_df, weighted_corr_df, title_prefix="Correlation Analysis"):
"""Create side-by-side interactive heatmaps comparing regular vs weighted correlations using Plotly."""
target_benchmarks, benchmark_categories, colors, col_to_category = get_focused_benchmark_mapping()
# Get clean names for display
clean_names = [clean_benchmark_name(name) for name in corr_df.columns]
# Create subplots
fig = make_subplots(
rows=1, cols=2,
subplot_titles=('Regular Correlation Matrix
(Equal weighting)',
'Uncertainty-Weighted Correlation Matrix
(Inverse variance weighting)'),
horizontal_spacing=0.15
)
# Regular correlation heatmap
corr_matrix_pct = (corr_df * 100).round(1)
# Create hover text for regular correlations
hover_text_regular = []
for i, bench1 in enumerate(corr_df.columns):
hover_row = []
for j, bench2 in enumerate(corr_df.columns):
if i == j:
hover_row.append(f"{clean_names[i]}
Self-correlation: 100%")
else:
corr_val = corr_matrix_pct.iloc[i, j]
if pd.isna(corr_val):
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
No data")
else:
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
Correlation: {corr_val:.1f}%")
hover_text_regular.append(hover_row)
fig.add_trace(go.Heatmap(
z=corr_df.values,
x=clean_names,
y=clean_names,
colorscale='RdBu_r',
zmid=0,
text=corr_matrix_pct.values,
texttemplate="%{text}",
textfont={"size": 8},
hoverinfo='text',
hovertext=hover_text_regular,
showscale=False,
name="Regular"
), row=1, col=1)
# Weighted correlation heatmap
weighted_corr_pct = (weighted_corr_df * 100).round(1)
# Create hover text for weighted correlations
hover_text_weighted = []
for i, bench1 in enumerate(weighted_corr_df.columns):
hover_row = []
for j, bench2 in enumerate(weighted_corr_df.columns):
if i == j:
reliability = weighted_corr_df.iloc[i, j]
if pd.isna(reliability):
hover_row.append(f"{clean_names[i]}
Reliability: Unknown")
else:
hover_row.append(f"{clean_names[i]}
Reliability: {reliability*100:.1f}%")
else:
corr_val = weighted_corr_pct.iloc[i, j]
if pd.isna(corr_val):
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
No weighted data")
else:
hover_row.append(f"{clean_names[i]} vs {clean_names[j]}
Weighted correlation: {corr_val:.1f}%")
hover_text_weighted.append(hover_row)
fig.add_trace(go.Heatmap(
z=weighted_corr_df.values,
x=clean_names,
y=clean_names,
colorscale='RdBu_r',
zmid=0,
text=weighted_corr_pct.values,
texttemplate="%{text}",
textfont={"size": 8},
hoverinfo='text',
hovertext=hover_text_weighted,
showscale=True,
colorbar=dict(title="Correlation", x=1.02),
name="Weighted"
), row=1, col=2)
# Update layout
fig.update_layout(
title=f"{title_prefix}
Diagonal shows reliability coefficients for weighted matrix",
width=1400,
height=700,
font=dict(size=12)
)
# Update axes
fig.update_xaxes(tickangle=45, row=1, col=1)
fig.update_xaxes(tickangle=45, row=1, col=2)
fig.update_yaxes(tickangle=0, row=1, col=1)
fig.update_yaxes(tickangle=0, row=1, col=2)
return fig
def main():
"""Main application."""
# Initialize session state for persistent selections
if 'analysis_mode' not in st.session_state:
st.session_state.analysis_mode = "๐ Overview Dashboard"
if 'use_verified_models' not in st.session_state:
st.session_state.use_verified_models = True
if 'selected_categories' not in st.session_state:
st.session_state.selected_categories = []
if 'filter_zeros' not in st.session_state:
st.session_state.filter_zeros = True
if 'min_models' not in st.session_state:
st.session_state.min_models = 10
st.markdown('
OpenThoughts Evalchemy Benchmark Explorer
',
unsafe_allow_html=True)
# Sidebar
st.sidebar.header("๐๏ธ Controls")
# Analysis mode selection - using session state
analysis_mode = st.sidebar.selectbox(
"Choose Analysis Mode",
["๐ Overview Dashboard", "๐ฅ Correlation Heatmap", "๐ Scatter Plot Explorer",
"๐ฏ Model Performance", "๐ฌ Uncertainty Analysis"],
index=["๐ Overview Dashboard", "๐ฅ Correlation Heatmap", "๐ Scatter Plot Explorer",
"๐ฏ Model Performance", "๐ฌ Uncertainty Analysis"].index(st.session_state.analysis_mode) if st.session_state.analysis_mode != "๐ Statistical Summary" else 0,
key="analysis_mode"
)
# Data filtering options
st.sidebar.subheader("Data Filters")
# Verified models filter - using session state
use_verified_models = st.sidebar.checkbox(
"Include only verified models",
value=st.session_state.use_verified_models,
key="use_verified_models"
)
# Load data with timing (after checkbox is defined)
start_time = time.time()
df = load_comprehensive_data(use_verified_models)
stderr_df = load_stderr_data(use_verified_models)
load_time = time.time() - start_time
# Debug information (hidden in an expander)
# with st.expander("๐ง Debug Information", expanded=False):
# st.write(f"**Data Shape:** {df.shape if not df.empty else 'No data'}")
# st.write(f"**Columns:** {len(df.columns) if not df.empty else 0}")
# st.write(f"**Models:** {len(df.index) if not df.empty else 0}")
# if not df.empty:
# st.write(f"**Sample columns:** {list(df.columns[:5])}")
# st.write(f"**Data types:** {df.dtypes.value_counts().to_dict()}")
# st.write(f"**Missing values per column:** {df.isnull().sum().sum()}")
# st.write(f"**StdErr data available:** {'Yes' if stderr_df is not None else 'No'}")
if df.empty:
st.error("No data available. Please check that the CSV files are properly uploaded and accessible.")
return
# Filter to target benchmarks
df = filter_target_benchmarks(df)
target_benchmarks, benchmark_categories, colors, col_to_category = get_focused_benchmark_mapping()
# Initialize selected categories if empty
if not st.session_state.selected_categories:
st.session_state.selected_categories = list(benchmark_categories.keys())
# Category filter - using session state
selected_categories = st.sidebar.multiselect(
"Select Benchmark Categories",
list(benchmark_categories.keys()),
default=st.session_state.selected_categories,
key="selected_categories"
)
# Filter benchmarks based on selected categories
filtered_benchmarks = []
for category in selected_categories:
for bench_name in benchmark_categories[category]:
actual_name = target_benchmarks.get(bench_name)
if actual_name in df.columns:
filtered_benchmarks.append(actual_name)
if filtered_benchmarks:
df_display = df[filtered_benchmarks].copy()
else:
df_display = df.copy()
# Zero filtering - using session state
filter_zeros = st.sidebar.checkbox(
"Filter out zero/near-zero values",
value=st.session_state.filter_zeros,
key="filter_zeros"
)
if filter_zeros:
for col in df_display.columns:
df_display.loc[(df_display[col] == 0) | (df_display[col] < 0.01), col] = np.nan
# Minimum data points filter
coverage_counts = [df_display[col].notna().sum() for col in df_display.columns]
if coverage_counts:
min_coverage = min(coverage_counts)
max_coverage = max(coverage_counts)
default_min = max(10, min_coverage) # Default to at least 10 or minimum available
# Update session state min_models if it's out of range, but allow minimum of 0
if st.session_state.min_models > max_coverage:
st.session_state.min_models = default_min
min_models = st.sidebar.slider(
"Minimum models per benchmark",
min_value=0, # Always allow 0 minimum to include all benchmarks
max_value=max_coverage,
value=st.session_state.min_models,
help=f"Current range: {min_coverage} to {max_coverage} models. Set to 0 to include all benchmarks.",
key="min_models"
)
else:
min_models = 10
# Apply the minimum models filter
valid_benchmarks = []
for col in df_display.columns:
if df_display[col].notna().sum() >= min_models:
valid_benchmarks.append(col)
df_display = df_display[valid_benchmarks]
# Performance info
# st.sidebar.markdown("---")
# st.sidebar.subheader("โก Performance")
# if load_time > 0:
# st.sidebar.metric("Data Load Time", f"{load_time:.2f}s")
# st.sidebar.metric("Dataset Size", f"{len(df_display)} ร {len(df_display.columns)}")
# if not df_display.empty:
# data_coverage = (df_display.notna().sum().sum() / (len(df_display) * len(df_display.columns))) * 100
# st.sidebar.metric("Data Coverage", f"{data_coverage:.1f}%")
# Main content based on analysis mode
if analysis_mode == "๐ Overview Dashboard":
show_overview_dashboard(df_display, stderr_df)
elif analysis_mode == "๐ฅ Correlation Heatmap":
show_interactive_heatmap(df_display, stderr_df)
elif analysis_mode == "๐ Scatter Plot Explorer":
show_scatter_explorer(df_display, stderr_df)
elif analysis_mode == "๐ฏ Model Performance":
show_model_performance(df_display)
elif analysis_mode == "๐ฌ Uncertainty Analysis":
show_uncertainty_analysis(df_display, stderr_df)
def show_overview_dashboard(df, stderr_df):
"""Show the overview dashboard."""
st.header("๐ Overview Dashboard")
# Key metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Models", len(df))
with col2:
st.metric("Benchmarks", len(df.columns))
with col3:
total_evals = df.notna().sum().sum()
st.metric("Total Evaluations", f"{total_evals:,}")
with col4:
avg_coverage = (df.notna().sum() / len(df)).mean() * 100
st.metric("Avg Coverage", f"{avg_coverage:.1f}%")
# Benchmark coverage chart
st.subheader("Benchmark Coverage")
coverage_data = []
target_benchmarks, benchmark_categories, colors, col_to_category = get_focused_benchmark_mapping()
for col in df.columns:
coverage = int(df[col].notna().sum()) # Ensure integer type
category = col_to_category.get(col, 'Unknown')
clean_name = clean_benchmark_name(col)
# Ensure we have valid data
if coverage >= 0: # Only include valid coverage counts
coverage_data.append({
'Benchmark': str(clean_name), # Ensure string type
'Coverage': coverage,
'Percentage': float(coverage / len(df) * 100), # Ensure float type
'Category': str(category) # Ensure string type
})
if coverage_data: # Only create plot if we have data
coverage_df = pd.DataFrame(coverage_data).sort_values('Coverage', ascending=True)
# Ensure data types are correct
coverage_df['Coverage'] = coverage_df['Coverage'].astype(int)
coverage_df['Percentage'] = coverage_df['Percentage'].astype(float)
coverage_df['Benchmark'] = coverage_df['Benchmark'].astype(str)
coverage_df['Category'] = coverage_df['Category'].astype(str)
# Create bar plot with explicit parameters
fig = px.bar(coverage_df,
x='Coverage',
y='Benchmark',
color='Category',
color_discrete_map=colors,
title="Model Coverage by Benchmark",
labels={'Coverage': 'Number of Models'},
orientation='h',
text='Coverage') # Add text labels to bars
# Update layout for better visibility
fig.update_traces(texttemplate='%{text}', textposition='outside')
fig.update_layout(
height=max(400, len(coverage_df) * 25), # Dynamic height based on data
showlegend=True,
xaxis_title="Number of Models",
yaxis_title="Benchmark"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("No coverage data available to display.")
# Quick correlation insights
st.subheader("Quick Correlation Insights")
corr_matrix = compute_correlations(df, 'kendall')
# Get top correlations
pairs = []
for i, bench1 in enumerate(corr_matrix.columns):
for j, bench2 in enumerate(corr_matrix.columns[i+1:], i+1):
if not pd.isna(corr_matrix.iloc[i, j]):
cat1 = col_to_category.get(bench1, 'Unknown')
cat2 = col_to_category.get(bench2, 'Unknown')
pairs.append((bench1, bench2, corr_matrix.iloc[i, j], cat1, cat2))
pairs.sort(key=lambda x: abs(x[2]), reverse=True)
col1, col2 = st.columns(2)
with col1:
st.markdown("**๐ฅ Top 5 Highest Correlations**")
for i, (bench1, bench2, corr, cat1, cat2) in enumerate(pairs[:5]):
st.write(f"{i+1}. {clean_benchmark_name(bench1)} โ {clean_benchmark_name(bench2)} r = {corr:.3f}")
with col2:
st.markdown("**๐ Category Analysis**")
within_cat = [p[2] for p in pairs if p[3] == p[4]]
across_cat = [p[2] for p in pairs if p[3] != p[4]]
if within_cat:
st.write(f"Within-category avg: {np.mean(within_cat):.3f}")
if across_cat:
st.write(f"Across-category avg: {np.mean(across_cat):.3f}")
st.write(f"Total pairs analyzed: {len(pairs)}")
def show_interactive_heatmap(df, stderr_df):
"""Display interactive correlation heatmap with various options."""
st.header("๐ฅ Correlation Heatmap")
col1, col2, col3 = st.columns(3)
with col1:
# Check if stderr data is available for the uncertainty-aware checkbox
stderr_available = stderr_df is not None
uncertainty_aware = False
if stderr_available:
uncertainty_aware = st.checkbox(
"๐ฌ Uncertainty-Aware Analysis",
value=False,
help="Use measurement uncertainties to weight correlations (requires standard error data)"
)
# Adjust method selector based on uncertainty-aware mode
if uncertainty_aware:
st.selectbox(
"Correlation Method",
["pearson"],
index=0,
disabled=True,
help="**Uncertainty-aware analysis uses Pearson correlations only**\n\nWeighted correlations require parametric methods to properly account for measurement uncertainties."
)
method = "pearson" # Force Pearson for uncertainty-aware analysis
else:
method = st.selectbox(
"Correlation Method",
["kendall", "pearson"],
help="Pearson: Measures linear relationships\nKendall: Measures ordinal relationships"
)
# Additional options
if uncertainty_aware and stderr_df is not None:
st.info("๐ฌ **Uncertainty-Aware Mode**: Correlations are weighted by inverse measurement variance. "
"Diagonal shows reliability coefficients (proportion of variance that is 'true signal' vs measurement error).")
# Match scores with stderr data
available_benchmarks = list(df.columns)
score_to_stderr_mapping = match_scores_with_stderr(df, stderr_df, available_benchmarks)
if len(score_to_stderr_mapping) == 0:
st.warning("No matching standard error data found for the selected benchmarks. "
"Falling back to regular correlation analysis.")
uncertainty_aware = False
else:
# Filter to benchmarks with stderr data
benchmarks_with_stderr = list(score_to_stderr_mapping.keys())
df_stderr = df[benchmarks_with_stderr].copy()
st.success(f"Found standard error data for {len(score_to_stderr_mapping)} benchmarks: "
f"{', '.join([clean_benchmark_name(b) for b in benchmarks_with_stderr])}")
# Align dataframes
common_models = df_stderr.index.intersection(stderr_df.index)
df_aligned = df_stderr.loc[common_models]
stderr_aligned = stderr_df.loc[common_models]
st.write(f"**Analysis scope**: {len(common_models)} models with both scores and standard errors")
# Compute uncertainty-aware correlations
with st.spinner("Computing uncertainty-weighted correlations..."):
corr_df, pvalue_df, weighted_corr_df, weighted_pvalue_df = create_uncertainty_aware_correlation_matrix(
df_aligned, stderr_aligned, score_to_stderr_mapping
)
# Create and display uncertainty-aware heatmap
fig = create_uncertainty_weighted_heatmap_plotly(
weighted_corr_df,
title_prefix=f"Uncertainty-Weighted {method.capitalize()} Correlations"
)
st.plotly_chart(fig, use_container_width=True)
# Show reliability statistics
with st.expander("๐ Reliability Statistics", expanded=False):
st.write("**Benchmark Reliability Coefficients** (proportion of variance that is true signal):")
reliability_data = []
for bench in weighted_corr_df.columns:
diag_val = weighted_corr_df.loc[bench, bench]
if not pd.isna(diag_val):
reliability_data.append({
'Benchmark': clean_benchmark_name(bench),
'Reliability': f"{diag_val*100:.1f}%",
'Category': next((cat for cat, benchs in get_focused_benchmark_mapping()[1].items()
for b in benchs if get_focused_benchmark_mapping()[0].get(b) == bench), 'Unknown')
})
if reliability_data:
reliability_df = pd.DataFrame(reliability_data)
st.dataframe(reliability_df, use_container_width=True)
avg_reliability = pd.to_numeric([d['Reliability'].rstrip('%') for d in reliability_data]).mean() / 100
st.metric("Average Reliability", f"{avg_reliability:.3f} ({avg_reliability*100:.1f}%)")
# Show correlation differences
with st.expander("๐ Impact of Uncertainty Weighting", expanded=False):
st.write("**Correlation Changes** (Weighted - Regular):")
diff_data = []
for i, bench1 in enumerate(corr_df.columns):
for j, bench2 in enumerate(corr_df.columns):
if i < j: # Only upper triangle
regular_corr = corr_df.iloc[i, j]
weighted_corr = weighted_corr_df.iloc[i, j]
if not (pd.isna(regular_corr) or pd.isna(weighted_corr)):
diff = weighted_corr - regular_corr
diff_data.append({
'Benchmark Pair': f"{clean_benchmark_name(bench1)} vs {clean_benchmark_name(bench2)}",
'Regular': f"{regular_corr:.3f}",
'Weighted': f"{weighted_corr:.3f}",
'Difference': f"{diff:+.3f}",
'Abs Difference': abs(diff)
})
if diff_data:
diff_df = pd.DataFrame(diff_data)
# Sort by absolute difference
diff_df_sorted = diff_df.sort_values('Abs Difference', ascending=False)
st.dataframe(diff_df_sorted.drop('Abs Difference', axis=1), use_container_width=True)
# Summary stats
diffs = [float(d['Difference']) for d in diff_data]
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Mean Change", f"{np.mean(diffs):+.4f}")
with col2:
st.metric("Max |Change|", f"{max(abs(d) for d in diffs):.4f}")
with col3:
st.metric("Large Changes (|ฮ| > 0.1)", f"{sum(1 for d in diffs if abs(d) > 0.1)}")
# Regular correlation analysis (original functionality restored)
if df.empty:
st.error("No data available.")
return
# Compute correlation matrix
corr_matrix = compute_correlations(df, method)
if corr_matrix.empty:
st.error("Unable to compute correlations.")
return
# Create and display regular heatmap (original way)
fig = create_interactive_heatmap(corr_matrix, f"{method.capitalize()} Correlation Matrix")
st.plotly_chart(fig, use_container_width=True)
# Correlation statistics (original)
st.subheader("Correlation Statistics")
# Get all off-diagonal correlations (original method)
mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1)
corr_values = corr_matrix.where(mask).stack().dropna()
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Mean Correlation", f"{corr_values.mean():.3f}")
with col2:
st.metric("Median Correlation", f"{corr_values.median():.3f}")
with col3:
st.metric("Max Correlation", f"{corr_values.max():.3f}")
with col4:
st.metric("Min Correlation", f"{corr_values.min():.3f}")
# Distribution of correlations (original)
st.subheader("Correlation Distribution")
fig_hist = px.histogram(corr_values,
nbins=20,
title="Distribution of Pairwise Correlations",
labels={'value': 'Correlation Coefficient', 'count': 'Frequency'})
st.plotly_chart(fig_hist, use_container_width=True)
# Methodology note
with st.expander("โน๏ธ About Correlation Methods", expanded=False):
st.markdown("""
**Pearson**: Measures linear relationships. Values range from -1 to +1.
- +1: Perfect positive linear relationship
- 0: No linear relationship
- -1: Perfect negative linear relationship
**Kendall**: Measures ordinal association using concordant/discordant pairs.
- More robust for small samples
- Better for data with many tied values
**Uncertainty-Aware Analysis**: When available, uses measurement standard errors to:
- Weight correlations by inverse measurement variance
- Show reliability coefficients (signal-to-noise ratios) on diagonal
- Provide more accurate correlation estimates for noisy data
""")
def show_scatter_explorer(df, stderr_df):
"""Show the scatter plot explorer with integrated simplified modeling."""
# Initialize session state for scatter plot selections
if 'scatter_x_benchmark' not in st.session_state:
st.session_state.scatter_x_benchmark = df.columns[0] if len(df.columns) > 0 else None
if 'scatter_y_benchmark' not in st.session_state:
st.session_state.scatter_y_benchmark = df.columns[1] if len(df.columns) > 1 else df.columns[0] if len(df.columns) > 0 else None
st.header("๐ Scatter Plot Explorer")
# Benchmark selection
col1, col2 = st.columns(2)
with col1:
# Get current index for x_benchmark
x_index = 0
if st.session_state.scatter_x_benchmark in df.columns:
x_index = list(df.columns).index(st.session_state.scatter_x_benchmark)
x_benchmark = st.selectbox(
"X-axis Benchmark",
df.columns,
index=x_index,
format_func=clean_benchmark_name,
key="scatter_x_benchmark"
)
with col2:
# Get current index for y_benchmark
y_index = 1 if len(df.columns) > 1 else 0
if st.session_state.scatter_y_benchmark in df.columns:
y_index = list(df.columns).index(st.session_state.scatter_y_benchmark)
y_benchmark = st.selectbox(
"Y-axis Benchmark",
df.columns,
index=y_index,
format_func=clean_benchmark_name,
key="scatter_y_benchmark"
)
if x_benchmark and y_benchmark and x_benchmark != y_benchmark:
# Always use the simplified modeling (linear or saturation)
fig, models = create_advanced_scatter_plot(df, x_benchmark, y_benchmark, stderr_df)
if fig and models:
st.plotly_chart(fig, use_container_width=True)
# Show the best model information
best_model = models[0]
st.info(f"**Best fit: {best_model['name']}** (Rยฒ = {best_model['r2']:.3f})")
# Show model interpretation
if best_model['type'] == 'linear':
st.caption("๐ Linear relationship: One benchmark increases proportionally with the other.")
elif best_model['type'] == 'saturation':
if 'direction' in best_model and best_model['direction'] == 'flipped':
st.caption("๐ Inverse saturation: The Y-axis benchmark plateaus as X-axis benchmark increases.")
else:
st.caption("๐ Saturation: One benchmark plateaus as the other increases.")
# Add detailed explanation for saturation fits
with st.expander("โน๏ธ How saturation fitting works", expanded=False):
st.markdown("""
**Saturation Model**: `y = a ร (1 - e^(-bรx)) + c`
**Bidirectional Fitting Process**:
1. **Try both directions**: Fit `y = f(x)` and `x = f(y)`
2. **Choose best fit**: Select direction with higher Rยฒ score
3. **Consistent plotting**: Curve coordinates are computed in the best-fitting direction and plotted identically regardless of axis orientation
**Why this matters**: Some relationships are better modeled in one direction (e.g., performance plateaus as model size increases). The algorithm automatically finds the best direction and ensures the curve looks the same whether you plot X vs Y or Y vs X.
**Parameters**:
- `a`: Maximum change (amplitude)
- `b`: Rate of saturation (higher = faster plateau)
- `c`: Baseline offset
""")
# Add preference reason if applicable
if best_model.get('preferred', False) and 'preference_reason' in best_model:
st.caption(f"โน๏ธ {best_model['preference_reason']}")
else:
st.warning("Insufficient data for modeling (need at least 5 data points).")
# Additional statistics
common_data = df[[x_benchmark, y_benchmark]].dropna()
if len(common_data) >= 3:
col1, col2, col3 = st.columns(3)
# Correlation metrics
pearson_r, pearson_p = pearsonr(common_data[x_benchmark], common_data[y_benchmark])
kendall_r, kendall_p = kendalltau(common_data[x_benchmark], common_data[y_benchmark])
# Format p-values appropriately
def format_pvalue(p):
if p < 0.001:
info = "P-values < 0.001 indicate very strong statistical significance. This results from good sample sizes and meaningful relationships."
return "p < 0.001", info
elif p < 0.05:
info = "P-values < 0.05 indicate moderate statistical significance. This results from reasonable sample sizes and meaningful relationships."
return f"p = {p:.3f}", info
elif p < 0.1:
info = "P-values < 0.1 indicate weak statistical significance. This results from low sample sizes and/or weak relationships."
return f"p = {p:.3f}", info
else:
info = "P-values > 0.1 indicate very weak statistical significance. This results from insufficient sample sizes and/or weak relationships."
return f"p = {p:.3f}", info
with col1:
p_value, info = format_pvalue(pearson_p)
st.metric("Pearson r", f"{pearson_r:.3f}", help="Pearson's r is a parametric measure of linear correlation.")
st.caption(p_value, help=info)
with col2:
p_value, info = format_pvalue(kendall_p)
st.metric("Kendall ฯ", f"{kendall_r:.3f}", help="Kendall's tau is a non-parametric measure of ordinal correlation that is robust to outliers.")
st.caption(p_value, help=info)
with col3:
# Show data table
st.subheader("Data Points")
display_data = common_data.copy()
display_data.columns = [clean_benchmark_name(col) for col in display_data.columns]
st.dataframe(display_data, use_container_width=True)
else:
st.info("Please select two different benchmarks to compare.")
def show_model_performance(df):
"""Show model performance analysis."""
# Initialize session state for model performance selections
if 'model_search_term' not in st.session_state:
st.session_state.model_search_term = ""
if 'use_rank_imputation' not in st.session_state:
st.session_state.use_rank_imputation = True
if 'min_corr' not in st.session_state:
st.session_state.min_corr = 0.3
if 'min_benchmarks_for_ranking' not in st.session_state:
st.session_state.min_benchmarks_for_ranking = 6
st.header("๐ฏ Model Performance Analysis")
# Model search - using session state
search_term = st.text_input(
"๐ Search for models",
value=st.session_state.model_search_term,
placeholder="Enter model name or part of name",
key="model_search_term"
)
# Filter by search term if provided
if search_term:
matching_models = df.index[df.index.str.contains(search_term, case=False, na=False)]
if len(matching_models) > 0:
df_display = df.loc[matching_models]
else:
st.warning(f"No models found matching '{search_term}'")
df_display = df
else:
df_display = df
# Performance ranking
st.subheader("Model Rankings")
# Ranking method controls
col1, col2, col3, col4 = st.columns(4)
with col1:
use_rank_imputation = st.checkbox(
"Use rank-based estimation",
value=st.session_state.use_rank_imputation,
help="Estimate missing rankings using Kendall rank correlations between benchmarks. More fair than simple averaging.",
key="use_rank_imputation"
)
with col2:
if use_rank_imputation:
# Always use Kendall correlation for rank-based estimation
rank_method = "kendall"
st.info("๐ข Using Kendall rank correlation (robust to outliers and tied values)")
else:
rank_method = "none"
with col3:
if use_rank_imputation:
min_corr = st.slider(
"Min correlation threshold",
min_value=0.1,
max_value=0.8,
value=st.session_state.min_corr,
step=0.1,
help="Minimum rank correlation required to use a benchmark for prediction",
key="min_corr"
)
else:
min_corr = 0.3
with col4:
min_benchmarks_for_ranking = st.slider(
"Min benchmarks required",
min_value=1,
max_value=12,
value=st.session_state.min_benchmarks_for_ranking,
step=1,
help="Minimum number of original benchmarks required for a model to be included in ranking",
key="min_benchmarks_for_ranking"
)
# Generate rankings with progress indicator
# Always compute rankings on the FULL dataset to preserve true ranks
if use_rank_imputation and len(df) > 50:
with st.spinner(f"Computing consensus rankings for {len(df)} models..."):
full_ranking_df, rank_matrix, metadata = create_consensus_ranking(
df, # Use full dataset, not df_display
method=rank_method,
use_rank_imputation=use_rank_imputation,
min_benchmarks_for_ranking=min_benchmarks_for_ranking
)
else:
full_ranking_df, rank_matrix, metadata = create_consensus_ranking(
df, # Use full dataset, not df_display
method=rank_method,
use_rank_imputation=use_rank_imputation,
min_benchmarks_for_ranking=min_benchmarks_for_ranking
)
# Filter ranking results to match search if provided
if search_term:
matching_models = df.index[df.index.str.contains(search_term, case=False, na=False)]
if len(matching_models) > 0:
# Filter the ranking to only show matching models, preserving their original ranks
ranking_df = full_ranking_df[full_ranking_df['Full_Model_Name'].isin(matching_models)]
else:
st.warning(f"No models found matching '{search_term}'")
ranking_df = full_ranking_df.head(0) # Empty dataframe
else:
ranking_df = full_ranking_df
# Show filtering information
if search_term:
if len(ranking_df) > 0:
st.info(f"๐ Found {len(ranking_df)} models matching '{search_term}'. "
f"Rankings computed on full dataset of {len(full_ranking_df)} models.")
else:
st.warning(f"No models found matching '{search_term}'")
elif metadata['models_filtered_out'] > 0:
st.info(f"โน๏ธ Filtered out {metadata['models_filtered_out']} models with fewer than {metadata['min_benchmarks_required']} benchmarks. "
f"Ranking {len(ranking_df)} models on {metadata['total_benchmarks']} benchmarks.")
else:
st.success(f"โ
All {len(ranking_df)} models meet the minimum benchmark requirement ({metadata['min_benchmarks_required']} benchmarks).")
# Display ranking information
col1, col2 = st.columns(2)
with col1:
# Change title and behavior based on search term
if search_term:
st.markdown(f"**๐ Models matching '{search_term}'**")
models_to_show = ranking_df # Show all matching models
show_count = len(ranking_df)
else:
st.markdown("**๐ Top 10 Models**")
models_to_show = ranking_df.head(10) # Show only top 10
show_count = min(10, len(ranking_df))
if metadata['imputation_used']:
st.caption(f"๐ฌ Using {metadata['method']} rank correlations with {metadata['total_estimates']} estimated ranks")
else:
st.caption("๐ Using median rank of available rankings")
# Show models with their actual ranking position
# Use a scrollable container if there are many results
if search_term and show_count > 20:
with st.container(height=400): # Scrollable container for long lists
for i, (idx, row) in enumerate(models_to_show.iterrows()):
# Calculate actual rank position in the FULL ranking (1-based)
actual_rank = full_ranking_df.index.get_loc(idx) + 1
estimated_info = f" (+{row['Estimated_Ranks']} est.)" if row['Estimated_Ranks'] > 0 else ""
coverage_info = f"{row['Coverage_Pct']:.0f}%"
if metadata['imputation_used']:
st.write(f"{actual_rank}. **{row['Model']}** (median rank: {row['Consensus_Rank']:.1f})")
st.caption(f" ๐ {row['Original_Benchmarks']}/{row['Total_Benchmarks']} benchmarks{estimated_info}")
else:
st.write(f"{actual_rank}. **{row['Model']}** (median rank: {row['Consensus_Rank']:.1f})")
st.caption(f" ๐ {row['Original_Benchmarks']} benchmarks ({coverage_info} coverage)")
else:
for i, (idx, row) in enumerate(models_to_show.iterrows()):
# Calculate actual rank position in the FULL ranking (1-based)
actual_rank = full_ranking_df.index.get_loc(idx) + 1
estimated_info = f" (+{row['Estimated_Ranks']} est.)" if row['Estimated_Ranks'] > 0 else ""
coverage_info = f"{row['Coverage_Pct']:.0f}%"
if metadata['imputation_used']:
st.write(f"{actual_rank}. **{row['Model']}** (median rank: {row['Consensus_Rank']:.1f})")
st.caption(f" ๐ {row['Original_Benchmarks']}/{row['Total_Benchmarks']} benchmarks{estimated_info}")
else:
st.write(f"{actual_rank}. **{row['Model']}** (median rank: {row['Consensus_Rank']:.1f})")
st.caption(f" ๐ {row['Original_Benchmarks']} benchmarks ({coverage_info} coverage)")
# Show summary when search results are displayed
if search_term:
if show_count == 0:
st.info("No models found matching the search term.")
else:
st.info(f"Found {show_count} model(s) matching '{search_term}'")
with col2:
st.markdown("**๐ Ranking Distribution**")
# Create histogram of consensus ranks
fig = px.histogram(
ranking_df,
x='Consensus_Rank',
nbins=20,
title="Distribution of Consensus Rankings",
labels={'Consensus_Rank': 'Average Rank (lower is better)', 'count': 'Number of Models'}
)
fig.update_layout(height=400)
st.plotly_chart(fig, use_container_width=True)
# Show ranking methodology explanation
if metadata['imputation_used']:
with st.expander("โน๏ธ How Rank-Based Estimation Works"):
st.write(f"""
**Method**: {metadata['method'].title()} rank correlation
**Process**:
1. Convert benchmark scores to ranks (1st, 2nd, 3rd, etc.)
2. Calculate rank correlations between all benchmark pairs
3. For missing data: predict rank using weighted average of available ranks
4. Weights based on rank correlation strength (min threshold: {min_corr})
5. Final consensus rank = median rank across all benchmarks
**Optimizations**:
- Pre-compute correlation matrices for efficiency
- Limit to top 5 most correlated benchmarks per prediction
- Cache results to avoid recomputation
**Upsides**:
- Eliminates bias from models tested only on easier/harder benchmarks
- Uses the correlation structure to make informed predictions
- Focuses on relative ranking rather than absolute scores
- More robust to outliers and scale differences
- Median consensus rank is less affected by extreme outlier rankings
**Statistics**:
- Total rank estimates made: {metadata['total_estimates']:,}
- Models with estimated ranks: {metadata['models_with_estimates']}
""")
else:
with st.expander("โน๏ธ Simple Ranking Method"):
st.write("""
**Method**: Median rank of available rankings
**Limitation**: Models tested on fewer or easier benchmarks may appear artificially better.
**Recommendation**: Enable rank-based estimation for fairer comparisons.
""")
# Model comparison section
st.subheader("Model Comparison")
# Initialize session state for radar chart selections
if 'selected_benchmarks_for_radar' not in st.session_state:
available_benchmarks = list(df_display.columns)
default_benchmarks = available_benchmarks[:min(8, len(available_benchmarks))]
st.session_state.selected_benchmarks_for_radar = default_benchmarks
if 'complete_data_only' not in st.session_state:
st.session_state.complete_data_only = True
if 'selected_models_for_radar' not in st.session_state:
st.session_state.selected_models_for_radar = []
# Benchmark selection for radar chart (always visible)
st.subheader("๐ Benchmark & Model Selection")
col1, col2 = st.columns([2, 1])
with col1:
available_benchmarks = list(df_display.columns)
# Ensure session state benchmarks are still valid
valid_benchmarks = [b for b in st.session_state.selected_benchmarks_for_radar if b in available_benchmarks]
if not valid_benchmarks:
valid_benchmarks = available_benchmarks[:min(8, len(available_benchmarks))]
st.session_state.selected_benchmarks_for_radar = valid_benchmarks
selected_benchmarks_for_radar = st.multiselect(
"Select benchmarks for radar chart",
available_benchmarks,
default=valid_benchmarks,
format_func=clean_benchmark_name,
help="Choose which benchmarks to display in the radar chart",
key="selected_benchmarks_for_radar"
)
with col2:
complete_data_only = st.checkbox(
"Complete data only",
value=st.session_state.complete_data_only,
help="Show only models that have data for ALL selected benchmarks",
key="complete_data_only"
)
# Filter available models based on benchmark selection and complete data requirement
if complete_data_only and selected_benchmarks_for_radar:
# Only show models that have data for all selected benchmarks
models_with_complete_data = []
for model in df_display.index:
has_all_data = True
for benchmark in selected_benchmarks_for_radar:
if pd.isna(df_display.loc[model, benchmark]):
has_all_data = False
break
if has_all_data:
models_with_complete_data.append(model)
available_models_for_selection = models_with_complete_data
models_info = f"({len(available_models_for_selection)} models with complete data)"
else:
available_models_for_selection = df_display.index.tolist()
models_info = f"({len(available_models_for_selection)} models total)"
# Model selection with filtered list - use top ranked models as default
if available_models_for_selection:
# Get top performers from full ranking (not filtered ranking)
top_models_from_ranking = full_ranking_df['Full_Model_Name'].head(5).tolist()
default_selection = [m for m in top_models_from_ranking if m in available_models_for_selection][:3]
# Sort available models by ranking (best to worst)
# Create a ranking order dictionary for quick lookup using FULL ranking
ranking_order = {model: rank for rank, model in enumerate(full_ranking_df['Full_Model_Name'].tolist())}
# Sort available models by their ranking (lower rank number = better performance)
available_models_sorted = sorted(
available_models_for_selection,
key=lambda x: ranking_order.get(x, float('inf')) # Models not in ranking go to end
)
# Ensure session state models are still valid
valid_selected_models = [m for m in st.session_state.selected_models_for_radar if m in available_models_for_selection]
if not valid_selected_models and default_selection:
valid_selected_models = default_selection
st.session_state.selected_models_for_radar = valid_selected_models
else:
default_selection = []
valid_selected_models = []
available_models_sorted = []
selected_models = st.multiselect(
f"Select models to compare {models_info}",
available_models_sorted, # Use sorted list instead of original
default=valid_selected_models,
help="Models are ordered by ranking (best to worst) and filtered based on benchmark selection and complete data setting above",
key="selected_models_for_radar"
)
if selected_models:
comparison_data = df_display.loc[selected_models].T
comparison_data.index = [clean_benchmark_name(idx) for idx in comparison_data.index]
# Performance Radar Chart
st.subheader("๐ Performance Radar Chart")
if not selected_benchmarks_for_radar:
st.info("Please select at least one benchmark above for the radar chart.")
elif len(selected_models) == 0:
st.info("Please select models above to see the radar chart comparison.")
elif len(selected_models) > 10:
st.warning(f"Too many models selected ({len(selected_models)}). Please select 10 or fewer models for the radar chart.")
st.info("๐ก **Tip**: Use the search box above to filter models, then select a smaller subset for comparison.")
else:
# Show radar chart for 1-10 models with optimization
if len(selected_models) > 3 or len(selected_benchmarks_for_radar) > 8:
with st.spinner("Generating radar chart..."):
fig = create_optimized_radar_chart(df_display, selected_models, selected_benchmarks_for_radar)
else:
fig = create_optimized_radar_chart(df_display, selected_models, selected_benchmarks_for_radar)
if fig:
st.plotly_chart(fig, use_container_width=True)
# Add explanation about missing values (only if not using complete data only)
if not complete_data_only:
missing_info = []
for model in selected_models:
missing_benchmarks = []
for benchmark in selected_benchmarks_for_radar:
if pd.isna(df_display.loc[model, benchmark]):
missing_benchmarks.append(clean_benchmark_name(benchmark))
if missing_benchmarks:
missing_info.append(f"โข {model.split('/')[-1]}: {', '.join(missing_benchmarks)}")
if missing_info:
with st.expander("โน๏ธ Missing Data Information"):
st.write("Missing values are shown as 0 in the radar chart:")
for info in missing_info:
st.write(info)
else:
# When complete data only is enabled, all selected models should have complete data
st.info("โ
All selected models have complete data for the chosen benchmarks.")
# Performance tips for large selections
if len(selected_models) > 5:
st.info(f"๐ก **Viewing {len(selected_models)} models**: For better readability, consider selecting fewer models or use the detailed comparison table below.")
# Detailed comparison table
st.subheader("Detailed Comparison")
st.dataframe(comparison_data, use_container_width=True)
def show_uncertainty_analysis(df, stderr_df):
"""Show uncertainty analysis if standard error data is available."""
# Initialize session state for uncertainty analysis selections
if 'uncertainty_x_benchmark' not in st.session_state:
st.session_state.uncertainty_x_benchmark = None
if 'uncertainty_y_benchmark' not in st.session_state:
st.session_state.uncertainty_y_benchmark = None
st.header("๐ฌ Uncertainty Analysis")
if stderr_df is None:
st.warning("Standard error data not available. This analysis requires benchmark_standard_errors.csv")
return
st.info("This section analyzes measurement uncertainty and reliability of benchmark evaluations.")
# Match benchmarks with standard errors
matched_benchmarks = []
for score_col in df.columns:
# Try to find matching stderr column
potential_stderr_cols = [
f"{score_col}_std_err",
f"{score_col.replace('_accuracy', '_accuracy_std_err')}",
f"{score_col.replace('_accuracy_avg', '_accuracy_std_err')}"
]
for stderr_col in potential_stderr_cols:
if stderr_col in stderr_df.columns:
matched_benchmarks.append((score_col, stderr_col))
break
if not matched_benchmarks:
st.warning("No matching standard error data found for the selected benchmarks.")
return
st.success(f"Found standard error data for {len(matched_benchmarks)} benchmarks.")
# Measurement precision analysis
st.subheader("๐ Measurement Precision")
precision_data = []
for score_col, stderr_col in matched_benchmarks:
scores = df[score_col].dropna()
stderrs = stderr_df[stderr_col].dropna()
if len(stderrs) > 0:
mean_stderr = stderrs.mean()
median_stderr = stderrs.median()
# Signal-to-noise ratio
if len(scores) > 0:
signal_std = scores.std()
snr = signal_std / mean_stderr if mean_stderr > 0 else float('inf')
else:
snr = 0
precision_data.append({
'Benchmark': clean_benchmark_name(score_col),
'Mean StdErr': mean_stderr,
'Median StdErr': median_stderr,
'Signal/Noise': snr,
'N Models': len(stderrs)
})
if precision_data:
precision_df = pd.DataFrame(precision_data)
st.dataframe(precision_df, use_container_width=True)
# Visualization
fig = px.scatter(precision_df,
x='Mean StdErr',
y='Signal/Noise',
size='N Models',
hover_name='Benchmark',
title="Measurement Precision: Signal-to-Noise vs Standard Error",
labels={'Signal/Noise': 'Signal-to-Noise Ratio'})
st.plotly_chart(fig, use_container_width=True)
# Uncertainty-aware scatter plot
st.subheader("๐ฏ Uncertainty-Aware Scatter Plot")
# Let user select benchmarks with stderr data
available_benchmarks = [score_col for score_col, _ in matched_benchmarks]
# Initialize session state benchmarks if not set or invalid
if (st.session_state.uncertainty_x_benchmark not in available_benchmarks or
st.session_state.uncertainty_y_benchmark not in available_benchmarks):
st.session_state.uncertainty_x_benchmark = available_benchmarks[0] if available_benchmarks else None
st.session_state.uncertainty_y_benchmark = available_benchmarks[1] if len(available_benchmarks) > 1 else available_benchmarks[0] if available_benchmarks else None
col1, col2 = st.columns(2)
with col1:
# Get current index for x_benchmark
x_index = 0
if st.session_state.uncertainty_x_benchmark in available_benchmarks:
x_index = available_benchmarks.index(st.session_state.uncertainty_x_benchmark)
x_benchmark = st.selectbox(
"X-axis Benchmark (with uncertainty)",
available_benchmarks,
index=x_index,
format_func=clean_benchmark_name,
key="uncertainty_x_benchmark"
)
with col2:
# Get current index for y_benchmark
y_index = 1 if len(available_benchmarks) > 1 else 0
if st.session_state.uncertainty_y_benchmark in available_benchmarks:
y_index = available_benchmarks.index(st.session_state.uncertainty_y_benchmark)
y_benchmark = st.selectbox(
"Y-axis Benchmark (with uncertainty)",
available_benchmarks,
index=y_index,
format_func=clean_benchmark_name,
key="uncertainty_y_benchmark"
)
if x_benchmark and y_benchmark and x_benchmark != y_benchmark:
# Get matched data
matched_data = match_scores_with_stderr(df, stderr_df, {x_benchmark, y_benchmark})
if not matched_data:
st.error("No matching data found between scores and stderr.")
return
# Check if both benchmarks have stderr matches
if x_benchmark not in matched_data or y_benchmark not in matched_data:
missing = []
if x_benchmark not in matched_data:
missing.append(clean_benchmark_name(x_benchmark))
if y_benchmark not in matched_data:
missing.append(clean_benchmark_name(y_benchmark))
st.error(f"No stderr data found for: {', '.join(missing)}")
return
# Get the stderr column names
score_to_stderr_mapping = matched_data
# Create combined dataframe with scores and stderr
combined_data = df[[x_benchmark, y_benchmark]].copy()
stderr_x_col = score_to_stderr_mapping[x_benchmark]
stderr_y_col = score_to_stderr_mapping[y_benchmark]
# Add stderr columns
combined_data[stderr_x_col] = stderr_df[stderr_x_col]
combined_data[stderr_y_col] = stderr_df[stderr_y_col]
# Drop rows with any missing data
matched_data_df = combined_data.dropna()
if len(matched_data_df) < 3:
st.error("Insufficient data points with both scores and stderr (need at least 3).")
return
# Create scatter plot with error bars
if len(matched_data_df) >= 3:
fig = go.Figure()
# Add scatter points with error bars
fig.add_trace(go.Scatter(
x=matched_data_df[x_benchmark],
y=matched_data_df[y_benchmark],
error_x=dict(
type='data',
array=matched_data_df[score_to_stderr_mapping[x_benchmark]],
visible=True
),
error_y=dict(
type='data',
array=matched_data_df[score_to_stderr_mapping[y_benchmark]],
visible=True
),
mode='markers',
marker=dict(size=8, opacity=0.7),
text=matched_data_df.index,
hovertemplate='%{text}
' +
f'{clean_benchmark_name(x_benchmark)}: %{{x:.3f}} ยฑ %{{error_x:.3f}}
' +
f'{clean_benchmark_name(y_benchmark)}: %{{y:.3f}} ยฑ %{{error_y:.3f}}',
name='Models'
))
# Fit linear regression for reference
from sklearn.linear_model import LinearRegression
X = matched_data_df[x_benchmark].values.reshape(-1, 1)
y = matched_data_df[y_benchmark].values
model = LinearRegression()
model.fit(X, y)
x_line = np.linspace(X.min(), X.max(), 100)
y_line = model.predict(x_line.reshape(-1, 1))
fig.add_trace(go.Scatter(
x=x_line,
y=y_line,
mode='lines',
name=f'Linear Fit (Rยฒ = {model.score(X, y):.3f})',
line=dict(dash='dash', color='red')
))
fig.update_layout(
title=f"Uncertainty-Aware Analysis: {clean_benchmark_name(x_benchmark)} vs {clean_benchmark_name(y_benchmark)}",
xaxis_title=clean_benchmark_name(x_benchmark),
yaxis_title=clean_benchmark_name(y_benchmark),
hovermode='closest'
)
st.plotly_chart(fig, use_container_width=True)
# Uncertainty metrics
st.subheader("๐ Uncertainty Metrics")
col1, col2, col3 = st.columns(3)
with col1:
avg_x_err = matched_data_df[score_to_stderr_mapping[x_benchmark]].mean()
st.metric("Avg X Error", f"{avg_x_err:.4f}")
with col2:
avg_y_err = matched_data_df[score_to_stderr_mapping[y_benchmark]].mean()
st.metric("Avg Y Error", f"{avg_y_err:.4f}")
with col3:
# Signal-to-noise ratio
x_snr = matched_data_df[x_benchmark].std() / avg_x_err
st.metric("X Signal/Noise", f"{x_snr:.2f}")
# Data table
st.subheader("๐ Data with Uncertainties")
display_cols = [x_benchmark, score_to_stderr_mapping[x_benchmark],
y_benchmark, score_to_stderr_mapping[y_benchmark]]
display_data = matched_data_df[display_cols].copy()
# Rename columns for display
new_names = {
x_benchmark: f"{clean_benchmark_name(x_benchmark)} (Score)",
score_to_stderr_mapping[x_benchmark]: f"{clean_benchmark_name(x_benchmark)} (ยฑError)",
y_benchmark: f"{clean_benchmark_name(y_benchmark)} (Score)",
score_to_stderr_mapping[y_benchmark]: f"{clean_benchmark_name(y_benchmark)} (ยฑError)"
}
display_data = display_data.rename(columns=new_names)
st.dataframe(display_data, use_container_width=True)
else:
st.warning("Need at least 3 data points for uncertainty analysis.")
# Linear regression model
def fit_linear_model(x, y):
"""Fit a simple linear model."""
try:
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
X = x.reshape(-1, 1)
model = LinearRegression()
model.fit(X, y)
y_pred = model.predict(X)
r2 = r2_score(y, y_pred)
mse = mean_squared_error(y, y_pred)
# Create prediction function
def predict(x_new):
return model.predict(np.array(x_new).reshape(-1, 1))
return {
'params': [model.coef_[0], model.intercept_],
'r2': r2,
'mse': mse,
'model_func': predict,
'name': f'Linear (slope={model.coef_[0]:.3f})',
'type': 'linear'
}
except Exception as e:
print(f"Linear model fitting failed: {e}")
return None
# Hockey stick model fitting
def fit_hockey_stick_model(x, y):
"""
Fit a hockey stick (saturation) model: y = a * min(x, threshold) + b
"""
def hockey_stick(x, a, b, threshold):
return a * np.minimum(x, threshold) + b
try:
# Initial guess: threshold at 75th percentile, linear fit for slope
x_thresh_guess = np.percentile(x, 75)
linear_fit = np.polyfit(x, y, 1)
# Initial parameters: [slope, intercept, threshold]
p0 = [linear_fit[0], linear_fit[1], x_thresh_guess]
# Fit the model
popt, pcov = curve_fit(hockey_stick, x, y, p0=p0, maxfev=2000)
# Calculate Rยฒ
y_pred = hockey_stick(x, *popt)
r2 = r2_score(y, y_pred)
# Calculate residual standard error
residuals = y - y_pred
mse = np.mean(residuals**2)
return {
'params': popt,
'r2': r2,
'mse': mse,
'model_func': lambda x_new: hockey_stick(x_new, *popt),
'name': f'Hockey Stick (threshold={popt[2]:.3f})',
'param_names': ['slope', 'intercept', 'threshold']
}
except:
return None
def fit_saturation_model(x, y):
"""
Fit saturation model: y = a * (1 - exp(-b * x)) + c
Tries both directions (x vs y and y vs x) and chooses the better fit.
Returns curve coordinates computed in the best direction for consistent plotting.
"""
def saturation(x, a, b, c):
return a * (1 - np.exp(-b * x)) + c
def fit_direction(x_data, y_data, direction_name):
"""Helper function to fit saturation in one direction"""
try:
# Initial guess
y_range = np.max(y_data) - np.min(y_data)
p0 = [y_range, 1.0, np.min(y_data)]
# Fit the model
popt, pcov = curve_fit(saturation, x_data, y_data, p0=p0, maxfev=2000)
# Calculate Rยฒ
y_pred = saturation(x_data, *popt)
r2 = r2_score(y_data, y_pred)
# Calculate residual standard error
residuals = y_data - y_pred
mse = np.mean(residuals**2)
return {
'params': popt,
'r2': r2,
'mse': mse,
'direction': direction_name,
'x_data': x_data,
'y_data': y_data
}
except:
return None
# Try normal direction (x vs y)
normal_fit = fit_direction(x, y, 'normal')
# Try flipped direction (y vs x) - we'll need to invert this later
flipped_fit = fit_direction(y, x, 'flipped')
# Choose the better fit based on Rยฒ
best_fit = None
if normal_fit and flipped_fit:
if normal_fit['r2'] >= flipped_fit['r2']:
best_fit = normal_fit
else:
best_fit = flipped_fit
elif normal_fit:
best_fit = normal_fit
elif flipped_fit:
best_fit = flipped_fit
if best_fit is None:
return None
# Compute curve coordinates in the best-fitting direction
if best_fit['direction'] == 'normal':
# Standard saturation: y = f(x)
x_curve_data = best_fit['x_data'] # This is original x
y_curve_data = best_fit['y_data'] # This is original y
curve_x_range = np.linspace(x_curve_data.min(), x_curve_data.max(), 100)
curve_y_values = saturation(curve_x_range, *best_fit['params'])
# Store curve coordinates in original x,y space
curve_coords = {
'x_coords': curve_x_range,
'y_coords': curve_y_values
}
model_func = lambda x_new: saturation(x_new, *best_fit['params'])
name = f'Saturation (rate={best_fit["params"][1]:.3f})'
else:
# Flipped saturation: curve was fit as x = f(y), need to store in x,y space
y_curve_data = best_fit['x_data'] # This was y when flipped
x_curve_data = best_fit['y_data'] # This was x when flipped
# Generate curve in the fitted direction (y vs x)
y_range = np.linspace(y_curve_data.min(), y_curve_data.max(), 100)
x_fitted = saturation(y_range, *best_fit['params'])
# Store curve coordinates in original x,y space (swap back)
curve_coords = {
'x_coords': x_fitted,
'y_coords': y_range
}
# Create inverse function for predictions
a, b, c = best_fit['params']
def inverse_saturation(x_new):
# Solve: x = a * (1 - exp(-b * y)) + c for y
# Rearranging: y = -ln(1 - (x - c) / a) / b
x_new = np.asarray(x_new)
result = np.full_like(x_new, np.nan, dtype=float)
# Simple domain handling - only compute where mathematically valid
if a > 0 and b > 0:
# Valid domain: c <= x < c + a
valid_mask = (x_new >= c) & (x_new < c + a * 0.999) # Leave small margin from asymptote
if np.any(valid_mask):
x_valid = x_new[valid_mask]
ratio = (x_valid - c) / a
ratio = np.clip(ratio, 1e-10, 0.999) # Avoid log(0) and log(negative)
result[valid_mask] = -np.log(1 - ratio) / b
return result
model_func = inverse_saturation
name = f'Saturation-Inv (rate={best_fit["params"][1]:.3f})'
return {
'params': best_fit['params'],
'r2': best_fit['r2'],
'mse': best_fit['mse'],
'model_func': model_func,
'name': name,
'param_names': ['amplitude', 'rate', 'offset'],
'direction': best_fit['direction'],
'curve_coords': curve_coords # Pre-computed curve coordinates
}
def fit_polynomial_model(x, y, degree=2):
"""
Fit polynomial model of specified degree
"""
try:
# Fit polynomial
poly_coeffs = np.polyfit(x, y, degree)
poly_func = np.poly1d(poly_coeffs)
# Calculate Rยฒ
y_pred = poly_func(x)
r2 = r2_score(y, y_pred)
# Calculate residual standard error
residuals = y - y_pred
mse = np.mean(residuals**2)
return {
'params': poly_coeffs,
'r2': r2,
'mse': mse,
'model_func': lambda x_new: poly_func(x_new),
'name': f'Polynomial (degree={degree})',
'param_names': [f'coeff_{i}' for i in range(degree+1)]
}
except:
return None
def fit_random_forest_model(x, y):
"""
Fit Random Forest model for non-parametric regression with overfitting prevention
"""
try:
# Reshape for sklearn
X = x.values.reshape(-1, 1) if hasattr(x, 'values') else x.reshape(-1, 1)
# Use conservative parameters to prevent overfitting on small datasets
n_samples = len(x)
# Adjust parameters based on dataset size
if n_samples < 30:
# Very conservative for small datasets
rf = RandomForestRegressor(
n_estimators=50, # Fewer trees
max_depth=2, # Very shallow trees
min_samples_split=max(2, n_samples // 10), # At least 10% of data to split
min_samples_leaf=max(1, n_samples // 20), # At least 5% of data per leaf
max_features=1, # Only one feature anyway
random_state=42,
bootstrap=True,
oob_score=True if n_samples > 10 else False
)
elif n_samples < 100:
# Moderately conservative
rf = RandomForestRegressor(
n_estimators=100,
max_depth=3, # Shallow trees
min_samples_split=max(2, n_samples // 8),
min_samples_leaf=max(1, n_samples // 15),
max_features=1,
random_state=42,
bootstrap=True,
oob_score=True
)
else:
# Still conservative but allow more complexity
rf = RandomForestRegressor(
n_estimators=100,
max_depth=4, # Slightly deeper
min_samples_split=max(2, n_samples // 6),
min_samples_leaf=max(2, n_samples // 12),
max_features=1,
random_state=42,
bootstrap=True,
oob_score=True
)
rf.fit(X, y)
# Predict
y_pred = rf.predict(X)
r2 = r2_score(y, y_pred)
mse = np.mean((y - y_pred)**2)
# Use OOB score as a better estimate of performance if available
oob_r2 = getattr(rf, 'oob_score_', None)
display_r2 = oob_r2 if oob_r2 is not None else r2
return {
'model': rf,
'r2': display_r2, # Use OOB score if available to reduce overfitting bias
'r2_train': r2, # Keep training Rยฒ for comparison
'mse': mse,
'model_func': lambda x_new: rf.predict(x_new.reshape(-1, 1) if len(x_new.shape) == 1 else x_new),
'name': f'Random Forest (OOB)' if oob_r2 is not None else 'Random Forest',
'param_names': ['n_estimators', 'max_depth', 'min_samples_split']
}
except:
return None
def detect_clusters_and_fit(x, y, n_clusters=2):
"""
Detect clusters in the data and fit separate models
"""
try:
# Prepare data for clustering
data = np.column_stack([x, y])
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)
# Perform clustering
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
cluster_labels = kmeans.fit_predict(data_scaled)
# Fit linear models for each cluster
cluster_models = []
total_r2_weighted = 0
total_mse_weighted = 0
total_points = len(x)
for i in range(n_clusters):
mask = cluster_labels == i
if np.sum(mask) >= 3: # Need at least 3 points
x_cluster = x[mask]
y_cluster = y[mask]
# Fit linear model for this cluster
coeffs = np.polyfit(x_cluster, y_cluster, 1)
poly_func = np.poly1d(coeffs)
y_pred_cluster = poly_func(x_cluster)
r2_cluster = r2_score(y_cluster, y_pred_cluster)
mse_cluster = np.mean((y_cluster - y_pred_cluster)**2)
cluster_models.append({
'coeffs': coeffs,
'mask': mask,
'r2': r2_cluster,
'mse': mse_cluster,
'n_points': np.sum(mask)
})
# Weight by cluster size
weight = np.sum(mask) / total_points
total_r2_weighted += r2_cluster * weight
total_mse_weighted += mse_cluster * weight
if len(cluster_models) > 0:
def cluster_predict(x_new):
# For prediction, assign to nearest cluster centroid
result = np.zeros_like(x_new)
for i, model in enumerate(cluster_models):
if i == 0: # For simplicity, use first cluster model for all predictions
result = np.polyval(model['coeffs'], x_new)
break
return result
return {
'cluster_models': cluster_models,
'cluster_labels': cluster_labels,
'r2': total_r2_weighted,
'mse': total_mse_weighted,
'model_func': cluster_predict,
'name': f'Clustered Linear (k={n_clusters})',
'param_names': [f'cluster_{i}_slope' for i in range(len(cluster_models))]
}
except:
pass
return None
def fit_all_models(x, y):
"""
Fit simplified model set: only linear and saturation models.
Returns only the single best model between linear and saturation.
"""
models = []
# Linear model (baseline)
try:
linear_coeffs = np.polyfit(x, y, 1)
linear_func = np.poly1d(linear_coeffs)
y_pred_linear = linear_func(x)
r2_linear = r2_score(y, y_pred_linear)
mse_linear = np.mean((y - y_pred_linear)**2)
linear_model = {
'name': 'Linear',
'r2': r2_linear,
'mse': mse_linear,
'model_func': lambda x_new: linear_func(x_new),
'params': linear_coeffs,
'param_names': ['slope', 'intercept'],
'type': 'linear'
}
models.append(linear_model)
except:
pass
# Saturation model (with bidirectional fitting)
saturation_result = fit_saturation_model(x, y)
if saturation_result:
saturation_result['type'] = 'saturation'
models.append(saturation_result)
if not models:
return []
# Sort by Rยฒ score (descending) and get the best one
models.sort(key=lambda m: m['r2'], reverse=True)
best_model = models[0]
# Apply preference logic
if len(models) > 1:
# Find linear model
linear_model = next((m for m in models if m['type'] == 'linear'), None)
if linear_model:
# Prefer linear model in two cases:
# 1. When overall performance is poor (Rยฒ < 0.5)
# 2. When linear model performs well (Rยฒ > 0.7) and is not significantly worse than best model
if best_model['r2'] < 0.5:
linear_model['name'] = 'Linear'
linear_model['preferred'] = True
linear_model['preference_reason'] = f"Preferred due to poor overall performance (best Rยฒ = {best_model['r2']:.3f} < 0.5)"
return [linear_model]
elif linear_model['r2'] > 0.7 and (best_model['r2'] - linear_model['r2']) < 0.1:
linear_model['name'] = 'Linear'
linear_model['preferred'] = True
linear_model['preference_reason'] = f"Preferred due to good linear fit (Rยฒ = {linear_model['r2']:.3f}) with minimal improvement from saturation model"
return [linear_model]
# Return only the best model
return [best_model]
def create_advanced_scatter_plot(df, x_bench, y_bench, stderr_df=None):
"""Create an advanced scatter plot with single best model fit."""
if x_bench not in df.columns or y_bench not in df.columns:
return None, None
# Get common data
common_data = df[[x_bench, y_bench]].dropna()
if len(common_data) < 5: # Need more data for advanced fitting
return None, None
x_vals = common_data[x_bench].values
y_vals = common_data[y_bench].values
# Fit models (returns only the best one)
models = fit_all_models(x_vals, y_vals)
if not models:
return None, None
best_model = models[0]
# Create figure
fig = go.Figure()
# Add scatter points
fig.add_trace(go.Scatter(
x=x_vals,
y=y_vals,
mode='markers',
text=common_data.index,
hovertemplate=(
"%{text}
" +
f"{clean_benchmark_name(x_bench)}: %{{x:.3f}}
" +
f"{clean_benchmark_name(y_bench)}: %{{y:.3f}}
" +
""
),
marker=dict(size=8, opacity=0.7, color='steelblue'),
name='Data Points'
))
# Add the best model fit
try:
# Check if we have pre-computed curve coordinates (for saturation models)
if 'curve_coords' in best_model:
x_line_valid = best_model['curve_coords']['x_coords']
y_line_valid = best_model['curve_coords']['y_coords']
# Filter out NaN values for plotting
valid_mask = ~np.isnan(y_line_valid) & ~np.isnan(x_line_valid)
if np.any(valid_mask):
x_line_valid = x_line_valid[valid_mask]
y_line_valid = y_line_valid[valid_mask]
else:
# For other models, generate curve points as before
x_line = np.linspace(x_vals.min(), x_vals.max(), 100)
y_line = best_model['model_func'](x_line)
# Filter out NaN values for plotting
valid_mask = ~np.isnan(y_line)
if np.any(valid_mask):
x_line_valid = x_line[valid_mask]
y_line_valid = y_line[valid_mask]
else:
x_line_valid = y_line_valid = np.array([])
if len(x_line_valid) > 0:
# Format model name and stats
model_name = f"{best_model['name']} (Rยฒ={best_model['r2']:.3f})"
# Style based on model type
if best_model['type'] == 'linear':
line_color = 'red'
line_width = 3
else: # saturation
line_color = 'green'
line_width = 3
# Add star for preferred models
if best_model.get('preferred', False):
model_name = f"{model_name}"
line_color = 'darkblue'
fig.add_trace(go.Scatter(
x=x_line_valid,
y=y_line_valid,
mode='lines',
name=model_name,
line=dict(color=line_color, width=line_width)
))
except Exception as e:
st.warning(f"Could not plot model curve: {e}")
# Update layout
fig.update_layout(
title=f"{clean_benchmark_name(y_bench)} vs {clean_benchmark_name(x_bench)}",
xaxis_title=clean_benchmark_name(x_bench),
yaxis_title=clean_benchmark_name(y_bench),
showlegend=True,
width=800,
height=600
)
return fig, models
def show_advanced_modeling(df, stderr_df):
"""Show the advanced modeling interface for benchmark prediction."""
st.header("๐ค Advanced Benchmark Modeling & Prediction")
st.markdown("""
This section provides advanced modeling capabilities to better understand and predict benchmark relationships.
It handles the different correlation patterns you've identified: hockey stick (saturation), linear, and noisy/clustered patterns.
""")
# Model configuration
st.subheader("๐ ๏ธ Model Configuration")
col1, col2, col3 = st.columns(3)
with col1:
modeling_approach = st.selectbox(
"Choose Modeling Approach",
["Single Pair Analysis", "Multi-Benchmark Prediction", "Ensemble Prediction"],
help="Single Pair: Analyze relationship between two benchmarks\nMulti-Benchmark: Predict one benchmark from multiple others\nEnsemble: Combine multiple models for robust prediction"
)
with col2:
min_data_points = st.slider(
"Minimum Data Points",
min_value=5,
max_value=50,
value=10,
help="Minimum number of models needed for reliable modeling"
)
with col3:
cross_validation = st.checkbox(
"Cross Validation",
value=True,
help="Use cross-validation to assess model generalization"
)
if modeling_approach == "Single Pair Analysis":
show_single_pair_analysis(df, stderr_df, min_data_points, cross_validation)
elif modeling_approach == "Multi-Benchmark Prediction":
show_multi_benchmark_prediction(df, stderr_df, min_data_points, cross_validation)
else: # Ensemble Prediction
show_ensemble_prediction(df, stderr_df, min_data_points, cross_validation)
def show_single_pair_analysis(df, stderr_df, min_data_points, cross_validation):
"""Detailed single pair analysis with all model types."""
st.subheader("๐ Single Pair Deep Analysis")
col1, col2 = st.columns(2)
with col1:
x_benchmark = st.selectbox("Predictor Benchmark", df.columns, format_func=clean_benchmark_name)
with col2:
y_benchmark = st.selectbox("Target Benchmark", df.columns,
index=1 if len(df.columns) > 1 else 0,
format_func=clean_benchmark_name)
if x_benchmark == y_benchmark:
st.warning("Please select different benchmarks for meaningful analysis.")
return
# Get data
common_data = df[[x_benchmark, y_benchmark]].dropna()
if len(common_data) < min_data_points:
st.error(f"Insufficient data: {len(common_data)} points available, {min_data_points} required.")
return
x_vals = common_data[x_benchmark].values
y_vals = common_data[y_benchmark].values
# Fit all models
with st.spinner("Fitting models..."):
models = fit_all_models(x_vals, y_vals)
if not models:
st.error("Failed to fit any models to the data.")
return
# Cross validation if requested
if cross_validation and len(common_data) >= 10:
with st.spinner("Performing cross-validation..."):
cv_results = perform_cross_validation(x_vals, y_vals, models[:5]) # Top 5 models
st.subheader("๐ Cross-Validation Results")
cv_df = pd.DataFrame(cv_results)
st.dataframe(cv_df, use_container_width=True)
# Create visualization
fig, _ = create_advanced_scatter_plot(df, x_benchmark, y_benchmark, stderr_df)
if fig:
st.plotly_chart(fig, use_container_width=True)
# Model comparison
st.subheader("๐ Model Performance Ranking")
model_data = []
for i, model in enumerate(models):
model_data.append({
'Rank': i + 1,
'Model': model['name'],
'Rยฒ Score': f"{model['r2']:.4f}",
'MSE': f"{model['mse']:.6f}",
'Type': model['type'],
'Recommended': get_model_recommendation(model, x_vals, y_vals)
})
model_df = pd.DataFrame(model_data)
st.dataframe(model_df, use_container_width=True)
# Pattern analysis
st.subheader("๐ Pattern Analysis")
best_model = models[0]
pattern_type = analyze_relationship_pattern(x_vals, y_vals, best_model)
pattern_colors = {
'Linear': 'info',
'Hockey Stick': 'warning',
'Saturation': 'warning',
'Non-linear': 'info',
'Clustered': 'error',
'Noisy': 'error'
}
pattern_color = pattern_colors.get(pattern_type, 'info')
if pattern_color == 'warning':
st.warning(f"**Pattern Detected: {pattern_type}**\n\n{get_pattern_explanation(pattern_type)}")
elif pattern_color == 'error':
st.error(f"**Pattern Detected: {pattern_type}**\n\n{get_pattern_explanation(pattern_type)}")
else:
st.info(f"**Pattern Detected: {pattern_type}**\n\n{get_pattern_explanation(pattern_type)}")
# Interactive prediction
st.subheader("๐ฏ Interactive Prediction")
col1, col2, col3 = st.columns(3)
with col1:
selected_model_idx = st.selectbox(
"Choose Model for Prediction",
range(len(models[:5])), # Top 5 models
format_func=lambda i: f"{models[i]['name']} (Rยฒ={models[i]['r2']:.3f})"
)
with col2:
x_input = st.number_input(
f"{clean_benchmark_name(x_benchmark)} Score",
min_value=0.0,
max_value=1.0,
value=0.5,
step=0.01,
format="%.3f"
)
with col3:
if st.button("๐ฎ Predict", type="primary"):
selected_model = models[selected_model_idx]
try:
prediction = selected_model['model_func'](np.array([x_input]))[0]
confidence = calculate_prediction_confidence(selected_model, x_vals, y_vals, x_input)
st.success(f"**Predicted {clean_benchmark_name(y_benchmark)}: {prediction:.3f}**")
st.info(f"Model: {selected_model['name']} | Confidence: {confidence}")
except Exception as e:
st.error(f"Prediction failed: {str(e)}")
def show_multi_benchmark_prediction(df, stderr_df, min_data_points, cross_validation):
"""Multi-benchmark prediction interface."""
st.subheader("๐ฏ Multi-Benchmark Prediction")
st.info("Predict one benchmark using multiple others as predictors.")
# Target selection
target_benchmark = st.selectbox(
"Select Target Benchmark to Predict",
df.columns,
format_func=clean_benchmark_name
)
# Predictor selection
predictor_benchmarks = st.multiselect(
"Select Predictor Benchmarks",
[col for col in df.columns if col != target_benchmark],
default=[col for col in df.columns if col != target_benchmark][:3], # Default first 3
format_func=clean_benchmark_name
)
if not predictor_benchmarks:
st.warning("Please select at least one predictor benchmark.")
return
# Filter data to models with complete data
all_benchmarks = [target_benchmark] + predictor_benchmarks
complete_data = df[all_benchmarks].dropna()
if len(complete_data) < min_data_points:
st.error(f"Insufficient complete data: {len(complete_data)} models available, {min_data_points} required.")
return
# Prepare data
X = complete_data[predictor_benchmarks].values
y = complete_data[target_benchmark].values
# Fit ensemble of models
with st.spinner("Training multi-benchmark models..."):
ensemble_results = fit_multi_benchmark_models(X, y, predictor_benchmarks)
# Display results
st.subheader("๐ Multi-Benchmark Model Performance")
results_data = []
for model_name, result in ensemble_results.items():
results_data.append({
'Model': model_name,
'Rยฒ Score': f"{result['r2']:.4f}",
'MAE': f"{result['mae']:.4f}",
'Feature Importance': result.get('importance', 'N/A')
})
results_df = pd.DataFrame(results_data)
st.dataframe(results_df, use_container_width=True)
# Feature importance visualization
best_model_name = max(ensemble_results.keys(), key=lambda k: ensemble_results[k]['r2'])
best_model = ensemble_results[best_model_name]
if 'feature_importance' in best_model:
st.subheader("๐ Feature Importance")
importance_data = pd.DataFrame({
'Benchmark': [clean_benchmark_name(b) for b in predictor_benchmarks],
'Importance': best_model['feature_importance']
}).sort_values('Importance', ascending=True)
fig_importance = px.bar(
importance_data,
x='Importance',
y='Benchmark',
orientation='h',
title=f"Feature Importance for Predicting {clean_benchmark_name(target_benchmark)}"
)
st.plotly_chart(fig_importance, use_container_width=True)
# Interactive prediction
st.subheader("๐ฏ Multi-Benchmark Prediction")
st.write("Enter scores for predictor benchmarks:")
input_values = {}
cols = st.columns(min(len(predictor_benchmarks), 3))
for i, benchmark in enumerate(predictor_benchmarks):
with cols[i % 3]:
input_values[benchmark] = st.number_input(
clean_benchmark_name(benchmark),
min_value=0.0,
max_value=1.0,
value=float(df[benchmark].median()),
step=0.001,
format="%.3f",
key=f"input_{benchmark}"
)
if st.button("๐ฎ Predict from Multiple Benchmarks", type="primary"):
input_array = np.array([[input_values[b] for b in predictor_benchmarks]])
# Use best model for prediction
prediction = best_model['model'].predict(input_array)[0]
st.success(f"**Predicted {clean_benchmark_name(target_benchmark)}: {prediction:.3f}**")
st.info(f"Using model: {best_model_name} (Rยฒ = {best_model['r2']:.3f})")
def show_ensemble_prediction(df, stderr_df, min_data_points, cross_validation):
"""Ensemble prediction combining multiple approaches."""
st.subheader("๐ญ Ensemble Prediction")
st.info("Combine multiple modeling approaches for robust predictions.")
# Implementation for ensemble prediction
st.write("๐ง Ensemble prediction coming soon! This will combine:")
st.write("- Multiple model types (linear, non-linear, clustering)")
st.write("- Multiple predictor sets")
st.write("- Uncertainty quantification")
st.write("- Robust prediction intervals")
# Helper functions for advanced modeling
def perform_cross_validation(x, y, models, n_folds=5):
"""Perform cross-validation on models."""
from sklearn.model_selection import KFold
cv_results = []
kf = KFold(n_splits=n_folds, shuffle=True, random_state=42)
for model in models:
fold_r2_scores = []
fold_mae_scores = []
for train_idx, test_idx in kf.split(x):
x_train, x_test = x[train_idx], x[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
try:
# Re-fit model on training data
if model['type'] == 'parametric':
if 'Hockey' in model['name']:
fitted_model = fit_hockey_stick_model(x_train, y_train)
elif 'Saturation' in model['name']:
fitted_model = fit_saturation_model(x_train, y_train)
elif 'Polynomial' in model['name']:
degree = 2 if 'degree=2' in model['name'] else 3
fitted_model = fit_polynomial_model(x_train, y_train, degree)
else: # Linear
fitted_model = fit_polynomial_model(x_train, y_train, 1)
if fitted_model:
y_pred = fitted_model['model_func'](x_test)
fold_r2 = r2_score(y_test, y_pred)
fold_mae = mean_absolute_error(y_test, y_pred)
fold_r2_scores.append(fold_r2)
fold_mae_scores.append(fold_mae)
elif model['type'] == 'non_parametric' and 'Random Forest' in model['name']:
# Handle Random Forest models
fitted_model = fit_random_forest_model(x_train, y_train)
if fitted_model:
y_pred = fitted_model['model_func'](x_test)
fold_r2 = r2_score(y_test, y_pred)
fold_mae = mean_absolute_error(y_test, y_pred)
fold_r2_scores.append(fold_r2)
fold_mae_scores.append(fold_mae)
except:
continue
if fold_r2_scores:
cv_results.append({
'Model': model['name'],
'CV Rยฒ Mean': f"{np.mean(fold_r2_scores):.4f}",
'CV Rยฒ Std': f"{np.std(fold_r2_scores):.4f}",
'CV MAE Mean': f"{np.mean(fold_mae_scores):.4f}",
'CV MAE Std': f"{np.std(fold_mae_scores):.4f}"
})
return cv_results
def get_model_recommendation(model, x_vals, y_vals):
"""Get recommendation for when to use this model."""
model_name = model['name']
r2 = model['r2']
# Check if this is a preferred linear model
is_preferred_linear = 'Linear (Preferred' in model_name
has_preference_reason = 'preference_reason' in model
# Check if this is Random Forest with OOB validation
is_rf_oob = 'Random Forest' in model_name and '(OOB)' in model_name
if is_preferred_linear:
if has_preference_reason:
return f"Recommended: {model.get('preference_reason', 'Simple model preferred')}"
else:
return "Recommended: Simple linear model preferred"
elif r2 < 0.3:
return "Poor fit - not recommended"
elif 'Hockey Stick' in model_name:
return "Good for saturation patterns"
elif 'Saturation' in model_name:
return "Good for gradual leveling off"
elif 'Polynomial' in model_name:
return "Good for curved relationships"
elif 'Clustered' in model_name:
return "Good for grouped data"
elif 'Random Forest' in model_name:
if is_rf_oob:
if r2 > 0.7:
return "Excellent non-parametric fit (OOB validated)"
elif r2 > 0.5:
return "Good non-parametric fit (OOB validated)"
else:
return "Moderate non-parametric fit - consider simpler models"
else:
return "Non-parametric model - may overfit on small datasets"
elif 'Linear' in model_name:
if r2 > 0.8:
return "Excellent linear fit - highly recommended"
elif r2 > 0.6:
return "Good linear fit - recommended"
elif r2 > 0.4:
return "Moderate linear fit - simple and interpretable"
else:
return "Weak linear fit - consider other patterns"
elif r2 > 0.8:
return "Excellent fit - highly recommended"
elif r2 > 0.6:
return "Good fit - recommended"
else:
return "Moderate fit - use with caution"
def analyze_relationship_pattern(x_vals, y_vals, best_model):
"""Analyze the relationship pattern between benchmarks."""
model_name = best_model['name']
r2 = best_model['r2']
# Check for different patterns
if 'Hockey Stick' in model_name and r2 > 0.6:
return 'Hockey Stick'
elif 'Saturation' in model_name and r2 > 0.6:
return 'Saturation'
elif 'Clustered' in model_name and r2 > 0.5:
return 'Clustered'
elif r2 < 0.4:
return 'Noisy'
elif 'Polynomial' in model_name and r2 > 0.6:
return 'Non-linear'
else:
return 'Linear'
def get_pattern_explanation(pattern_type):
"""Get explanation for different pattern types."""
explanations = {
'Linear': "The benchmarks show a consistent linear relationship. Performance on one benchmark predicts the other reliably.",
'Hockey Stick': "One benchmark saturates while the other continues improving. This suggests a capability ceiling for one benchmark.",
'Saturation': "The relationship shows gradual leveling off, indicating diminishing returns at higher performance levels.",
'Non-linear': "The relationship is curved but predictable. Consider the full range when making predictions.",
'Clustered': "The data shows distinct groups or clusters. Different model families may follow different patterns.",
'Noisy': "The relationship is weak or highly variable. Predictions should be made with caution and wide confidence intervals."
}
return explanations.get(pattern_type, "Unknown pattern type.")
def calculate_prediction_confidence(model, x_vals, y_vals, x_input):
"""Calculate prediction confidence."""
# Simple confidence based on how close input is to training data
x_min, x_max = x_vals.min(), x_vals.max()
if x_min <= x_input <= x_max:
# Within training range
distance_from_center = abs(x_input - np.median(x_vals))
max_distance = max(abs(x_min - np.median(x_vals)), abs(x_max - np.median(x_vals)))
confidence_score = 1.0 - (distance_from_center / max_distance)
r2_factor = model['r2']
overall_confidence = confidence_score * r2_factor
if overall_confidence > 0.8:
return "High"
elif overall_confidence > 0.5:
return "Medium"
else:
return "Low"
else:
return "Very Low (Extrapolation)"
def fit_multi_benchmark_models(X, y, predictor_names):
"""Fit multiple models for multi-benchmark prediction."""
from sklearn.linear_model import LinearRegression, Ridge
from sklearn.ensemble import RandomForestRegressor
models = {}
# Linear Regression
try:
lr = LinearRegression()
lr.fit(X, y)
y_pred = lr.predict(X)
models['Linear Regression'] = {
'model': lr,
'r2': r2_score(y, y_pred),
'mae': mean_absolute_error(y, y_pred),
'feature_importance': np.abs(lr.coef_),
'importance': 'Linear coefficients'
}
except:
pass
# Ridge Regression
try:
ridge = Ridge(alpha=1.0)
ridge.fit(X, y)
y_pred = ridge.predict(X)
models['Ridge Regression'] = {
'model': ridge,
'r2': r2_score(y, y_pred),
'mae': mean_absolute_error(y, y_pred),
'feature_importance': np.abs(ridge.coef_),
'importance': 'Regularized coefficients'
}
except:
pass
# Random Forest
try:
n_samples, n_features = X.shape
# Adjust Random Forest parameters based on dataset size to prevent overfitting
if n_samples < 30:
# Very conservative for small datasets
rf = RandomForestRegressor(
n_estimators=50,
max_depth=2,
min_samples_split=max(2, n_samples // 8),
min_samples_leaf=max(1, n_samples // 15),
max_features=min(2, n_features), # Limit feature subset
random_state=42,
bootstrap=True,
oob_score=True if n_samples > 10 else False
)
elif n_samples < 100:
# Moderately conservative
rf = RandomForestRegressor(
n_estimators=100,
max_depth=3,
min_samples_split=max(2, n_samples // 6),
min_samples_leaf=max(1, n_samples // 12),
max_features=min(3, max(1, n_features // 2)),
random_state=42,
bootstrap=True,
oob_score=True
)
else:
# Still conservative but allow more complexity
rf = RandomForestRegressor(
n_estimators=100,
max_depth=5,
min_samples_split=max(2, n_samples // 5),
min_samples_leaf=max(2, n_samples // 10),
max_features='sqrt', # Standard sqrt(n_features)
random_state=42,
bootstrap=True,
oob_score=True
)
rf.fit(X, y)
y_pred = rf.predict(X)
# Use OOB score if available as it's a better estimate
oob_r2 = getattr(rf, 'oob_score_', None)
train_r2 = r2_score(y, y_pred)
display_r2 = oob_r2 if oob_r2 is not None else train_r2
models['Random Forest'] = {
'model': rf,
'r2': display_r2,
'r2_train': train_r2, # Keep training Rยฒ for comparison
'mae': mean_absolute_error(y, y_pred),
'feature_importance': rf.feature_importances_,
'importance': f'Tree-based importance {"(OOB validated)" if oob_r2 is not None else ""}'
}
except:
pass
return models
if __name__ == "__main__":
main()