|
""" |
|
Task to evaluate models on a YourbBench dataset using LightEval |
|
""" |
|
import os |
|
import sys |
|
import json |
|
import time |
|
import tempfile |
|
import asyncio |
|
import threading |
|
from pathlib import Path |
|
from typing import Optional, List, Dict, Any, Tuple |
|
|
|
from loguru import logger |
|
from huggingface_hub import HfApi, CommitOperationAdd |
|
|
|
from tasks.yourbench_lighteval_task import create_yourbench_task |
|
|
|
|
|
class EvaluationTask: |
|
""" |
|
Task to evaluate models using LightEval on a YourbBench dataset |
|
""" |
|
|
|
def __init__(self, session_uid: str, dataset_name: str): |
|
""" |
|
Initialize the evaluation task |
|
|
|
Args: |
|
session_uid: Session ID for this task |
|
dataset_name: Name of the dataset to evaluate |
|
""" |
|
self.session_uid = session_uid |
|
self.dataset_name = dataset_name |
|
self.logs: List[str] = [] |
|
self.is_completed = False |
|
self.organization = os.getenv("HF_ORGANIZATION", "yourbench") |
|
self.results: Dict[str, Any] = {} |
|
self.output_dir = f"uploaded_files/{session_uid}/lighteval_results" |
|
|
|
|
|
self.models = [ |
|
("Qwen/Qwen2.5-72B-Instruct", "novita"), |
|
("Qwen/QwQ-32B", "novita"), |
|
] |
|
|
|
self._add_log("[INFO] Initializing evaluation task") |
|
self._add_log(f"[INFO] Dataset to evaluate: {self.organization}/{dataset_name}") |
|
self._add_log(f"[INFO] Output directory: {self.output_dir}") |
|
|
|
def _add_log(self, message: str) -> None: |
|
""" |
|
Add a log message to the logs list |
|
|
|
Args: |
|
message: Log message to add |
|
""" |
|
if message not in self.logs: |
|
self.logs.append(message) |
|
|
|
self.logs = self.logs.copy() |
|
|
|
logger.info(f"[{self.session_uid}] {message}") |
|
|
|
def get_logs(self) -> List[str]: |
|
""" |
|
Get all logs for this task |
|
|
|
Returns: |
|
List of log messages |
|
""" |
|
return self.logs.copy() |
|
|
|
def is_task_completed(self) -> bool: |
|
""" |
|
Check if the task is completed |
|
|
|
Returns: |
|
True if completed, False otherwise |
|
""" |
|
return self.is_completed |
|
|
|
async def _evaluate_model(self, model_info: Tuple[str, str]) -> Dict[str, Any]: |
|
""" |
|
Evaluate a specific model |
|
|
|
Args: |
|
model_info: Tuple of (model_name, provider) |
|
|
|
Returns: |
|
Dictionary with evaluation results |
|
""" |
|
model_name, provider = model_info |
|
self._add_log(f"[INFO] Starting evaluation for {model_name} with {provider}") |
|
|
|
|
|
os.makedirs(self.output_dir, exist_ok=True) |
|
|
|
|
|
dataset_path = f"{self.organization}/{self.dataset_name}" |
|
|
|
|
|
temp_file_path = tempfile.mktemp(suffix=".py") |
|
self._add_log(f"[INFO] Creating temporary file for {model_name}: {temp_file_path}") |
|
|
|
with open(temp_file_path, 'w') as temp_file: |
|
temp_file.write(f""" |
|
import os |
|
import sys |
|
sys.path.append("{os.getcwd()}") |
|
|
|
from tasks.yourbench_lighteval_task import create_yourbench_task |
|
|
|
# Create yourbench task |
|
yourbench = create_yourbench_task("{dataset_path}", "lighteval") |
|
|
|
# Define TASKS_TABLE needed by lighteval |
|
TASKS_TABLE = [yourbench] |
|
""") |
|
|
|
|
|
cmd_args = [ |
|
"lighteval", |
|
"endpoint", |
|
"inference-providers", |
|
f"model={model_name},provider={provider}", |
|
"custom|yourbench|0|0", |
|
"--custom-tasks", |
|
temp_file_path, |
|
"--max-samples", "5", |
|
"--output-dir", self.output_dir, |
|
"--save-details", |
|
"--no-push-to-hub" |
|
] |
|
|
|
self._add_log(f"[INFO] Running command for {model_name}: {' '.join(cmd_args)}") |
|
|
|
results = { |
|
"model_name": model_name, |
|
"provider": provider, |
|
"success": False, |
|
"error": None, |
|
"results": None, |
|
"return_code": None |
|
} |
|
|
|
try: |
|
|
|
env = os.environ.copy() |
|
hf_token = os.getenv("HF_TOKEN") |
|
if hf_token: |
|
env["HF_TOKEN"] = hf_token |
|
env["HUGGING_FACE_HUB_TOKEN"] = hf_token |
|
env["HF_ORGANIZATION"] = self.organization |
|
|
|
|
|
process = await asyncio.create_subprocess_exec( |
|
*cmd_args, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE, |
|
env=env |
|
) |
|
|
|
|
|
stdout, stderr = await process.communicate() |
|
|
|
|
|
exit_code = process.returncode |
|
results["return_code"] = exit_code |
|
|
|
|
|
if stdout: |
|
stdout_lines = stdout.decode().strip().split('\n') |
|
for line in stdout_lines[:5]: |
|
self._add_log(f"[INFO] {model_name} - {line}") |
|
|
|
|
|
if stderr and exit_code != 0: |
|
stderr_lines = stderr.decode().strip().split('\n') |
|
for line in stderr_lines[:5]: |
|
self._add_log(f"[ERROR] {model_name} - {line}") |
|
|
|
|
|
result_files = [] |
|
results_dir = Path(self.output_dir) / "results" |
|
if results_dir.exists(): |
|
|
|
for json_file in results_dir.glob("**/*.json"): |
|
|
|
model_parts = [ |
|
model_name, |
|
model_name.replace('/', '_'), |
|
model_name.split('/')[-1] |
|
] |
|
|
|
if any(part in str(json_file) for part in model_parts): |
|
result_files.append(json_file) |
|
|
|
|
|
if result_files: |
|
|
|
result_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) |
|
latest_result = result_files[0] |
|
self._add_log(f"[INFO] {model_name} - Found result file: {latest_result}") |
|
|
|
try: |
|
with open(latest_result, 'r') as f: |
|
test_results = json.load(f) |
|
|
|
|
|
if (test_results and |
|
isinstance(test_results, dict) and |
|
"results" in test_results and |
|
"all" in test_results["results"]): |
|
|
|
|
|
results["results"] = test_results |
|
results["success"] = True |
|
|
|
|
|
accuracy = test_results["results"]["all"]["accuracy"] |
|
accuracy_stderr = test_results["results"]["all"]["accuracy_stderr"] |
|
self._add_log(f"[SUCCESS] {model_name} - Accuracy: {accuracy:.4f} ± {accuracy_stderr:.4f}") |
|
else: |
|
results["error"] = "Incomplete or unexpected result format" |
|
self._add_log(f"[WARNING] {model_name} - Unexpected result format") |
|
|
|
except (json.JSONDecodeError, KeyError) as e: |
|
results["error"] = f"Error reading results: {str(e)}" |
|
self._add_log(f"[ERROR] {model_name} - {results['error']}") |
|
|
|
|
|
if not results["success"]: |
|
if exit_code == 0: |
|
results["error"] = "Execution completed without error but no results found" |
|
self._add_log(f"[WARNING] {model_name} - {results['error']}") |
|
else: |
|
results["error"] = f"Execution error (code: {exit_code})" |
|
self._add_log(f"[ERROR] {model_name} - {results['error']}") |
|
|
|
except Exception as e: |
|
results["error"] = f"Exception: {str(e)}" |
|
self._add_log(f"[ERROR] Exception during evaluation of {model_name}: {str(e)}") |
|
finally: |
|
|
|
try: |
|
os.unlink(temp_file_path) |
|
except: |
|
pass |
|
|
|
return results |
|
|
|
async def _run_evaluations(self) -> List[Dict[str, Any]]: |
|
""" |
|
Run evaluations for all models |
|
|
|
Returns: |
|
List of evaluation results |
|
""" |
|
self._add_log(f"[INFO] Starting evaluations for {len(self.models)} models") |
|
|
|
|
|
tasks = [self._evaluate_model(model) for model in self.models] |
|
|
|
|
|
model_results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
results = [] |
|
for i, result in enumerate(model_results): |
|
if isinstance(result, Exception): |
|
|
|
model_name, provider = self.models[i] |
|
self._add_log(f"[ERROR] Evaluation failed for {model_name}: {str(result)}") |
|
results.append({ |
|
"model_name": model_name, |
|
"provider": provider, |
|
"success": False, |
|
"error": str(result), |
|
"results": None, |
|
"return_code": None |
|
}) |
|
else: |
|
|
|
results.append(result) |
|
|
|
return results |
|
|
|
def _format_comparison_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
""" |
|
Format results for easy comparison between models |
|
|
|
Args: |
|
results: List of evaluation results |
|
|
|
Returns: |
|
Dictionary with formatted comparison results |
|
""" |
|
comparison = { |
|
"metadata": { |
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
|
"dataset": f"{self.organization}/{self.dataset_name}", |
|
"total_models_tested": len(results), |
|
"successful_tests": len([r for r in results if r["success"]]) |
|
}, |
|
"models_comparison": [] |
|
} |
|
|
|
|
|
successful_models = [r for r in results if r["success"]] |
|
failed_models = [r for r in results if not r["success"]] |
|
|
|
|
|
if successful_models: |
|
sorted_successful = sorted( |
|
successful_models, |
|
key=lambda x: x["results"]["results"]["all"]["accuracy"], |
|
reverse=True |
|
) |
|
else: |
|
sorted_successful = [] |
|
|
|
|
|
sorted_failed = sorted(failed_models, key=lambda x: x["model_name"]) |
|
|
|
|
|
sorted_results = sorted_successful + sorted_failed |
|
|
|
|
|
for result in sorted_results: |
|
model_result = { |
|
"model_name": result["model_name"], |
|
"provider": result["provider"], |
|
"success": result["success"] |
|
} |
|
|
|
if result["success"]: |
|
|
|
model_result.update({ |
|
"accuracy": result["results"]["results"]["all"]["accuracy"], |
|
"accuracy_stderr": result["results"]["results"]["all"]["accuracy_stderr"], |
|
"evaluation_time": float(result["results"]["config_general"]["total_evaluation_time_secondes"]) |
|
}) |
|
else: |
|
|
|
model_result["error"] = result.get("error", "Unknown reason") |
|
|
|
comparison["models_comparison"].append(model_result) |
|
|
|
return comparison |
|
|
|
async def _upload_results_to_dataset(self, comparison_results: Dict[str, Any]) -> bool: |
|
""" |
|
Upload evaluation results to the HuggingFace dataset |
|
|
|
Args: |
|
comparison_results: The formatted comparison results |
|
|
|
Returns: |
|
bool: True if upload succeeded, False otherwise |
|
""" |
|
try: |
|
|
|
timestamp = time.strftime("%Y%m%d_%H%M%S") |
|
result_filename = f"lighteval_results.json" |
|
|
|
|
|
temp_file_path = tempfile.mktemp(suffix=".json") |
|
with open(temp_file_path, 'w') as f: |
|
json.dump(comparison_results, f, indent=2) |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if not hf_token: |
|
self._add_log("[ERROR] HF_TOKEN not found, cannot upload results to dataset") |
|
return False |
|
|
|
api = HfApi(token=hf_token) |
|
dataset_id = f"{self.organization}/{self.dataset_name}" |
|
|
|
|
|
operation = CommitOperationAdd( |
|
path_in_repo=f"lighteval_results/{result_filename}", |
|
path_or_fileobj=temp_file_path |
|
) |
|
|
|
|
|
self._add_log(f"[INFO] Uploading results to dataset {dataset_id}") |
|
api.create_commit( |
|
repo_id=dataset_id, |
|
repo_type="dataset", |
|
operations=[operation], |
|
commit_message=f"Add evaluation results from {timestamp}" |
|
) |
|
|
|
|
|
os.unlink(temp_file_path) |
|
|
|
self._add_log(f"[SUCCESS] Results uploaded to dataset {dataset_id} at lighteval_results/{result_filename}") |
|
return True |
|
|
|
except Exception as e: |
|
self._add_log(f"[ERROR] Failed to upload results to dataset: {str(e)}") |
|
return False |
|
|
|
async def _process_evaluation_results(self, results: List[Dict[str, Any]]) -> None: |
|
""" |
|
Process evaluation results, create summaries and save files |
|
|
|
Args: |
|
results: List of evaluation results |
|
""" |
|
if results: |
|
try: |
|
|
|
detailed_output_file = f"{self.output_dir}/detailed_results.json" |
|
os.makedirs(os.path.dirname(detailed_output_file), exist_ok=True) |
|
with open(detailed_output_file, 'w') as f: |
|
json.dump(results, f, indent=2) |
|
self._add_log(f"[INFO] Detailed results saved in {detailed_output_file}") |
|
|
|
|
|
comparison = self._format_comparison_results(results) |
|
comparison_file = f"{self.output_dir}/models_comparison.json" |
|
with open(comparison_file, 'w') as f: |
|
json.dump(comparison, f, indent=2) |
|
self._add_log(f"[INFO] Models comparison saved in {comparison_file}") |
|
|
|
|
|
await self._upload_results_to_dataset(comparison) |
|
|
|
|
|
self.results = comparison |
|
self._add_log("[SUCCESS] Evaluation completed") |
|
except Exception as e: |
|
self._add_log(f"[ERROR] Error during evaluation execution: {str(e)}") |
|
finally: |
|
self.is_completed = True |
|
|
|
def _async_run(self) -> None: |
|
""" |
|
Run the evaluation asynchronously |
|
""" |
|
async def run_async(): |
|
try: |
|
|
|
results = await self._run_evaluations() |
|
|
|
|
|
await self._process_evaluation_results(results) |
|
except Exception as e: |
|
self._add_log(f"[ERROR] Error during evaluation execution: {str(e)}") |
|
finally: |
|
self.is_completed = True |
|
|
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
loop.run_until_complete(run_async()) |
|
loop.close() |
|
|
|
def run(self) -> None: |
|
""" |
|
Run the evaluation task in a separate thread |
|
""" |
|
self._add_log("[INFO] Starting evaluation") |
|
|
|
|
|
thread = threading.Thread(target=self._async_run) |
|
thread.daemon = True |
|
thread.start() |
|
|