|
""" |
|
Task to run evaluation using lighteval |
|
""" |
|
import os |
|
import time |
|
import subprocess |
|
import tempfile |
|
from pathlib import Path |
|
import concurrent.futures |
|
from dotenv import load_dotenv |
|
from datetime import datetime |
|
import json |
|
from typing import List, Dict |
|
from tasks.get_model_providers import get_model_providers |
|
from huggingface_hub import HfApi |
|
|
|
class EvaluationTask: |
|
""" |
|
Task to run evaluation using lighteval |
|
""" |
|
|
|
def __init__(self, session_uid: str, dataset_name: str): |
|
""" |
|
Initialize the evaluation task |
|
|
|
Args: |
|
session_uid: Session ID for this task |
|
dataset_name: Name of the dataset to evaluate |
|
""" |
|
self.session_uid = session_uid |
|
self.dataset_name = dataset_name |
|
self.is_completed = False |
|
self.results = [] |
|
self.hf_api = HfApi() |
|
|
|
def _save_results_to_hub(self) -> None: |
|
""" |
|
Save evaluation results to the dataset on the Hub |
|
""" |
|
try: |
|
|
|
results_dir = Path("data/lighteval_results") |
|
results_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
results_file = results_dir / "lighteval_results.json" |
|
with open(results_file, "w") as f: |
|
json.dump(self.results, f, indent=2) |
|
|
|
|
|
self.hf_api.upload_file( |
|
path_or_fileobj=str(results_file), |
|
path_in_repo="lighteval_results.json", |
|
repo_id=self.dataset_name, |
|
repo_type="dataset", |
|
commit_message="Add lighteval evaluation results" |
|
) |
|
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Results saved to Hub at {self.dataset_name}/lighteval_results.json") |
|
except Exception as e: |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to save results to Hub: {str(e)}") |
|
|
|
def _run_lighteval(self, model_name: str, provider: str, dataset_name: str) -> dict: |
|
start_time = time.time() |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting evaluation with {provider} provider for {model_name}") |
|
|
|
|
|
temp_file_path = tempfile.mktemp(suffix=".py") |
|
with open(temp_file_path, 'w') as temp_file: |
|
temp_file.write(f""" |
|
from lighteval_task.lighteval_task import create_yourbench_task |
|
|
|
# Create yourbench task |
|
yourbench = create_yourbench_task("{dataset_name}", "single_shot_questions") |
|
|
|
# Define TASKS_TABLE needed by lighteval |
|
TASKS_TABLE = [yourbench] |
|
""") |
|
|
|
|
|
cmd_args = [ |
|
"lighteval", |
|
"endpoint", |
|
"inference-providers", |
|
f"model={model_name},provider={provider}", |
|
"custom|yourbench|0|0", |
|
"--custom-tasks", |
|
temp_file_path, |
|
"--max-samples", "15", |
|
"--output-dir", "data/lighteval_results", |
|
|
|
"--no-push-to-hub" |
|
] |
|
|
|
try: |
|
|
|
subprocess.run(cmd_args, env=os.environ, timeout=60) |
|
except subprocess.TimeoutExpired: |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Evaluation timed out for {model_name} after {time.time() - start_time:.2f}s") |
|
return { |
|
"model": model_name, |
|
"provider": provider, |
|
"accuracy": 0.0, |
|
"execution_time": 60.0, |
|
"status": "timeout" |
|
} |
|
|
|
|
|
execution_time = time.time() - start_time |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Finished evaluation for {model_name} in {execution_time:.2f}s") |
|
|
|
|
|
os.unlink(temp_file_path) |
|
|
|
try: |
|
|
|
results_dir = Path("data/lighteval_results/results") / model_name.replace("/", "/") |
|
results_file = next(results_dir.glob("results_*.json")) |
|
|
|
with open(results_file) as f: |
|
results = json.load(f) |
|
accuracy = results["results"]["all"]["accuracy"] |
|
|
|
return { |
|
"model": model_name, |
|
"provider": provider, |
|
"accuracy": accuracy, |
|
"execution_time": execution_time, |
|
"status": "success" |
|
} |
|
except Exception as e: |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Failed to parse results for {model_name} after {execution_time:.2f}s: {str(e)}") |
|
return { |
|
"model": model_name, |
|
"provider": provider, |
|
"accuracy": 0.0, |
|
"execution_time": execution_time, |
|
"status": "parse_error" |
|
} |
|
|
|
def run_parallel(self) -> List[Dict]: |
|
""" |
|
Run the evaluation task with multiple models in parallel using ProcessPoolExecutor |
|
|
|
Returns: |
|
List of results for each model |
|
""" |
|
|
|
script_start_time = time.time() |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
models = [ |
|
"Qwen/QwQ-32B", |
|
"Qwen/Qwen2.5-72B-Instruct", |
|
"deepseek-ai/DeepSeek-V3-0324", |
|
"deepseek-ai/DeepSeek-R1-Distill-Llama-70B", |
|
] |
|
|
|
|
|
model_providers = get_model_providers(models) |
|
|
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] Starting parallel evaluations") |
|
|
|
|
|
with concurrent.futures.ProcessPoolExecutor() as executor: |
|
futures = [ |
|
executor.submit(self._run_lighteval, model_name, providers[0], self.dataset_name) |
|
for model_name, providers in model_providers |
|
if providers |
|
] |
|
self.results = [future.result() for future in concurrent.futures.as_completed(futures)] |
|
|
|
|
|
total_time = time.time() - script_start_time |
|
print(f"[{datetime.now().strftime('%H:%M:%S')}] All evaluations completed in {total_time:.2f}s") |
|
|
|
|
|
self._save_results_to_hub() |
|
|
|
|
|
self.is_completed = True |
|
|
|
return self.results |
|
|
|
def get_logs(self) -> List[str]: |
|
""" |
|
Get logs for this task (empty list since we don't track logs anymore) |
|
|
|
Returns: |
|
Empty list of logs |
|
""" |
|
return [] |
|
|
|
def is_task_completed(self) -> bool: |
|
""" |
|
Check if the task is completed |
|
|
|
Returns: |
|
True if completed, False otherwise |
|
""" |
|
return self.is_completed |
|
|
|
def run(self) -> None: |
|
""" |
|
Run the evaluation task (wrapper around run_parallel) |
|
""" |
|
self.run_parallel() |