Spaces:
Running
Running
""" | |
Database schema for Dynamic Highscores system. | |
This module defines the SQLite database schema for the Dynamic Highscores system, | |
which integrates benchmark selection, model evaluation, and leaderboard functionality. | |
""" | |
import sqlite3 | |
import os | |
import json | |
import threading | |
from datetime import datetime, timedelta | |
import pandas as pd | |
class ThreadLocalDB: | |
"""Thread-local database connection manager.""" | |
_thread_local = threading.local() | |
def __init__(self, db_path): | |
"""Initialize with database path.""" | |
self.db_path = db_path | |
def get_connection(self): | |
"""Get a thread-local database connection.""" | |
if not hasattr(self._thread_local, 'conn') or self._thread_local.conn is None: | |
self._thread_local.conn = sqlite3.connect(self.db_path) | |
self._thread_local.conn.row_factory = sqlite3.Row | |
return self._thread_local.conn | |
def get_cursor(self): | |
"""Get a cursor from the thread-local connection.""" | |
conn = self.get_connection() | |
if not hasattr(self._thread_local, 'cursor') or self._thread_local.cursor is None: | |
self._thread_local.cursor = conn.cursor() | |
return self._thread_local.cursor | |
def close(self): | |
"""Close the thread-local connection if it exists.""" | |
if hasattr(self._thread_local, 'conn') and self._thread_local.conn is not None: | |
if hasattr(self._thread_local, 'cursor') and self._thread_local.cursor is not None: | |
self._thread_local.cursor.close() | |
self._thread_local.cursor = None | |
self._thread_local.conn.close() | |
self._thread_local.conn = None | |
class DynamicHighscoresDB: | |
"""Database manager for the Dynamic Highscores system.""" | |
def __init__(self, db_path="dynamic_highscores.db"): | |
"""Initialize the database connection and create tables if they don't exist.""" | |
self.db_path = db_path | |
self.thread_local_db = ThreadLocalDB(db_path) | |
self.create_tables() | |
def get_conn(self): | |
"""Get the thread-local database connection.""" | |
return self.thread_local_db.get_connection() | |
def get_cursor(self): | |
"""Get the thread-local database cursor.""" | |
return self.thread_local_db.get_cursor() | |
def close(self): | |
"""Close the thread-local database connection.""" | |
self.thread_local_db.close() | |
def create_tables(self): | |
"""Create all necessary tables if they don't exist.""" | |
cursor = self.get_cursor() | |
conn = self.get_conn() | |
# Users table - stores user information | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS users ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
username TEXT UNIQUE NOT NULL, | |
hf_user_id TEXT UNIQUE NOT NULL, | |
is_admin BOOLEAN DEFAULT 0, | |
last_submission_date TEXT, | |
created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
# Benchmarks table - stores information about available benchmarks | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS benchmarks ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT NOT NULL, | |
dataset_id TEXT NOT NULL, | |
description TEXT, | |
metrics TEXT, -- JSON string of metrics | |
created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
# Models table - stores information about submitted models | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS models ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
name TEXT NOT NULL, | |
hf_model_id TEXT NOT NULL, | |
user_id INTEGER NOT NULL, | |
tag TEXT NOT NULL, -- One of: Merge, Agent, Reasoning, Coding, etc. | |
parameters TEXT, -- Number of parameters (can be NULL) | |
description TEXT, | |
created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (user_id) REFERENCES users (id), | |
UNIQUE (hf_model_id, user_id) | |
) | |
''') | |
# Evaluations table - stores evaluation results | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS evaluations ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
model_id INTEGER NOT NULL, | |
benchmark_id INTEGER NOT NULL, | |
status TEXT NOT NULL, -- pending, running, completed, failed | |
results TEXT, -- JSON string of results | |
score REAL, -- Overall score (can be NULL) | |
submitted_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
started_at TEXT, | |
completed_at TEXT, | |
FOREIGN KEY (model_id) REFERENCES models (id), | |
FOREIGN KEY (benchmark_id) REFERENCES benchmarks (id) | |
) | |
''') | |
# Queue table - stores evaluation queue | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS queue ( | |
id INTEGER PRIMARY KEY AUTOINCREMENT, | |
evaluation_id INTEGER NOT NULL, | |
priority INTEGER DEFAULT 0, -- Higher number = higher priority | |
added_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
FOREIGN KEY (evaluation_id) REFERENCES evaluations (id) | |
) | |
''') | |
conn.commit() | |
# User management methods | |
def add_user(self, username, hf_user_id, is_admin=False): | |
"""Add a new user to the database.""" | |
cursor = self.get_cursor() | |
conn = self.get_conn() | |
try: | |
cursor.execute( | |
"INSERT INTO users (username, hf_user_id, is_admin) VALUES (?, ?, ?)", | |
(username, hf_user_id, is_admin) | |
) | |
conn.commit() | |
return cursor.lastrowid | |
except sqlite3.IntegrityError: | |
# User already exists | |
cursor.execute( | |
"SELECT id FROM users WHERE hf_user_id = ?", | |
(hf_user_id,) | |
) | |
row = cursor.fetchone() | |
return row[0] if row else None | |
def get_user(self, hf_user_id): | |
"""Get user information by HuggingFace user ID.""" | |
cursor = self.get_cursor() | |
cursor.execute( | |
"SELECT * FROM users WHERE hf_user_id = ?", | |
(hf_user_id,) | |
) | |
row = cursor.fetchone() | |
return dict(row) if row else None | |
def get_user_by_username(self, username): | |
"""Get user information by username.""" | |
cursor = self.get_cursor() | |
cursor.execute( | |
"SELECT * FROM users WHERE username = ?", | |
(username,) | |
) | |
row = cursor.fetchone() | |
return dict(row) if row else None | |
def can_submit_today(self, user_id): | |
"""Check if a user can submit a benchmark evaluation today.""" | |
cursor = self.get_cursor() | |
cursor.execute( | |
"SELECT is_admin, last_submission_date FROM users WHERE id = ?", | |
(user_id,) | |
) | |
result = cursor.fetchone() | |
if not result: | |
return False | |
user_data = dict(result) | |
# Admin can always submit | |
if user_data['is_admin']: | |
return True | |
# If no previous submission, user can submit | |
if not user_data['last_submission_date']: | |
return True | |
# Check if last submission was before today | |
last_date = datetime.fromisoformat(user_data['last_submission_date']) | |
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |
return last_date < today | |
def update_submission_date(self, user_id): | |
"""Update the last submission date for a user.""" | |
cursor = self.get_cursor() | |
conn = self.get_conn() | |
current_time = datetime.now().isoformat() | |
cursor.execute( | |
"UPDATE users SET last_submission_date = ? WHERE id = ?", | |
(current_time, user_id) | |
) | |
conn.commit() | |
# Benchmark management methods | |
def add_benchmark(self, name, dataset_id, description="", metrics=None): | |
"""Add a new benchmark to the database.""" | |
cursor = self.get_cursor() | |
conn = self.get_conn() | |
if metrics is None: | |
metrics = {} | |
metrics_json = json.dumps(metrics) | |
try: | |
cursor.execute( | |
"INSERT INTO benchmarks (name, dataset_id, description, metrics) VALUES (?, ?, ?, ?)", | |
(name, dataset_id, description, metrics_json) | |
) | |
conn.commit() | |
return cursor.lastrowid | |
except sqlite3.IntegrityError: | |
# Benchmark already exists with this dataset_id | |
cursor.execute( | |
"SELECT id FROM benchmarks WHERE dataset_id = ?", | |
(dataset_id,) | |
) | |
row = cursor.fetchone() | |
return row[0] if row else None | |
def get_benchmarks(self): | |
"""Get all available benchmarks.""" | |
cursor = self.get_cursor() | |
cursor.execute("SELECT * FROM benchmarks") | |
benchmarks = [dict(row) for row in cursor.fetchall()] | |
# Parse metrics JSON | |
for benchmark in benchmarks: | |
if benchmark['metrics']: | |
benchmark['metrics'] = json.loads(benchmark['metrics']) | |
else: | |
benchmark['metrics'] = {} | |
return benchmarks | |
def get_benchmark(self, benchmark_id): | |
"""Get benchmark information by ID.""" | |
cursor = self.get_cursor() | |
cursor.execute( | |
"SELECT * FROM benchmarks WHERE id = ?", | |
(benchmark_id,) | |
) | |
row = cursor.fetchone() | |
benchmark = dict(row) if row else None | |
if benchmark and benchmark['metrics']: | |
benchmark['metrics'] = json.loads(benchmark['metrics']) | |
return benchmark | |
# Model management methods | |
def add_model(self, name, hf_model_id, user_id, tag, parameters=None, description=""): | |
"""Add a new model to the database.""" | |
cursor = self.get_cursor() | |
conn = self.get_conn() | |
try: | |
cursor.execute( | |
"INSERT INTO models (name, hf_model_id, user_id, tag, parameters, description) VALUES (?, ?, ?, ?, ?, ?)", | |
(name, hf_model_id, user_id, tag, parameters, description) | |
) | |
conn.commit() | |
return cursor.lastrowid | |
except sqlite3.IntegrityError: | |
# Model already exists for this user | |
cursor.execute( | |
"SELECT id FROM models WHERE hf_model_id = ? AND user_id = ?", | |
(hf_model_id, user_id) | |
) | |
row = cursor.fetchone() | |
return row[0] if row else None | |
def get_models(self, tag=None): | |
"""Get all models, optionally filtered by tag.""" | |
cursor = self.get_cursor() | |
if tag and tag.lower() != "all": | |
cursor.execute( | |
"SELECT * FROM models WHERE tag = ?", | |
(tag,) | |
) | |
else: | |
cursor.execute("SELECT * FROM models") | |
return [dict(row) for row in cursor.fetchall()] | |
def get_model(self, model_id): | |
"""Get model information by ID.""" | |
cursor = self.get_cursor() | |
cursor.execute( | |
"SELECT * FROM models WHERE id = ?", | |
(model_id,) | |
) | |
row = cursor.fetchone() | |
return dict(row) if row else None | |
# Evaluation management methods | |
def add_evaluation(self, model_id, benchmark_id, priority=0): | |
"""Add a new evaluation to the database and queue.""" | |
cursor = self.get_cursor() | |
conn = self.get_conn() | |
# First, add the evaluation | |
cursor.execute( | |
"INSERT INTO evaluations (model_id, benchmark_id, status) VALUES (?, ?, 'pending')", | |
(model_id, benchmark_id) | |
) | |
evaluation_id = cursor.lastrowid | |
# Then, add it to the queue | |
cursor.execute( | |
"INSERT INTO queue (evaluation_id, priority) VALUES (?, ?)", | |
(evaluation_id, priority) | |
) | |
conn.commit() | |
return evaluation_id | |
def update_evaluation_status(self, evaluation_id, status, results=None, score=None): | |
"""Update the status of an evaluation.""" | |
cursor = self.get_cursor() | |
conn = self.get_conn() | |
params = [status, evaluation_id] | |
sql = "UPDATE evaluations SET status = ?" | |
if results is not None: | |
sql += ", results = ?" | |
params.insert(1, json.dumps(results)) | |
if score is not None: | |
sql += ", score = ?" | |
params.insert(1 if results is None else 2, score) | |
if status in ['completed', 'failed']: | |
sql += ", completed_at = datetime('now')" | |
elif status == 'running': | |
sql += ", started_at = datetime('now')" | |
sql += " WHERE id = ?" | |
cursor.execute(sql, params) | |
conn.commit() | |
def get_next_in_queue(self): | |
"""Get the next evaluation in the queue.""" | |
cursor = self.get_cursor() | |
cursor.execute(""" | |
SELECT q.*, e.id as evaluation_id, e.model_id, e.benchmark_id, e.status | |
FROM queue q | |
JOIN evaluations e ON q.evaluation_id = e.id | |
WHERE e.status = 'pending' | |
ORDER BY q.priority DESC, q.added_at ASC | |
LIMIT 1 | |
""") | |
row = cursor.fetchone() | |
return dict(row) if row else None | |
def get_evaluation_results(self, model_id=None, benchmark_id=None, tag=None, status=None, limit=None): | |
"""Get evaluation results, optionally filtered by model, benchmark, tag, or status.""" | |
cursor = self.get_cursor() | |
sql = """ | |
SELECT e.id, e.model_id, e.benchmark_id, e.status, e.results, e.score, | |
e.submitted_at, e.started_at, e.completed_at, m.name as model_name, m.tag, | |
b.name as benchmark_name | |
FROM evaluations e | |
JOIN models m ON e.model_id = m.id | |
JOIN benchmarks b ON e.benchmark_id = b.id | |
WHERE 1=1 | |
""" | |
params = [] | |
if status: | |
sql += " AND e.status = ?" | |
params.append(status) | |
if model_id: | |
sql += " AND e.model_id = ?" | |
params.append(model_id) | |
if benchmark_id and benchmark_id != "all" and benchmark_id.lower() != "all": | |
sql += " AND e.benchmark_id = ?" | |
params.append(benchmark_id) | |
if tag and tag.lower() != "all": | |
sql += " AND m.tag = ?" | |
params.append(tag) | |
sql += " ORDER BY e.submitted_at DESC" | |
if limit: | |
sql += " LIMIT ?" | |
params.append(limit) | |
cursor.execute(sql, params) | |
results = [dict(row) for row in cursor.fetchall()] | |
# Parse results JSON | |
for result in results: | |
if result['results']: | |
try: | |
result['results'] = json.loads(result['results']) | |
except: | |
result['results'] = {} | |
return results | |
def get_leaderboard_df(self, tag=None, benchmark_id=None): | |
"""Get a pandas DataFrame of the leaderboard, optionally filtered by tag and benchmark.""" | |
results = self.get_evaluation_results(tag=tag, benchmark_id=benchmark_id, status="completed") | |
if not results: | |
return pd.DataFrame() | |
# Create a list of dictionaries for the DataFrame | |
leaderboard_data = [] | |
for result in results: | |
entry = { | |
'model_name': result['model_name'], | |
'tag': result['tag'], | |
'benchmark_name': result['benchmark_name'], | |
'score': result['score'], | |
'completed_at': result['completed_at'] | |
} | |
# Add any additional metrics from results | |
if result['results'] and isinstance(result['results'], dict): | |
for key, value in result['results'].items(): | |
if isinstance(value, (int, float)) and key not in entry: | |
entry[key] = value | |
leaderboard_data.append(entry) | |
# Convert to DataFrame | |
df = pd.DataFrame(leaderboard_data) | |
# Sort by score (descending) | |
if not df.empty and 'score' in df.columns: | |
df = df.sort_values('score', ascending=False) | |
return df |