|
|
|
""" |
|
Script to run lighteval tests in parallel for multiple models |
|
""" |
|
import os |
|
import sys |
|
import json |
|
import time |
|
import tempfile |
|
import asyncio |
|
from pathlib import Path |
|
from typing import Tuple, List, Dict, Any |
|
|
|
|
|
from dotenv import load_dotenv |
|
load_dotenv() |
|
|
|
|
|
sys.path.append(os.getcwd()) |
|
from tasks.yourbench_lighteval_task import create_yourbench_task |
|
|
|
|
|
INIT_MODELS = [ |
|
|
|
("Qwen/Qwen2.5-72B-Instruct", "novita"), |
|
("meta-llama/Llama-3.3-70B-Instruct", "novita"), |
|
("deepseek-ai/DeepSeek-R1-Distill-Llama-70B", "novita"), |
|
|
|
("Qwen/QwQ-32B", "novita"), |
|
|
|
] |
|
|
|
async def run_lighteval_test_for_model(model_info: Tuple[str, str]) -> Dict[str, Any]: |
|
""" |
|
Run lighteval test for a specific model |
|
""" |
|
model_name, provider = model_info |
|
|
|
|
|
dataset_name = "yourbench_a" |
|
organization = "yourbench" |
|
output_dir = f"uploaded_files/test_parallel_{provider}/lighteval_results" |
|
|
|
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
dataset_path = f"{organization}/{dataset_name}" |
|
print(f"Dataset to evaluate for {model_name}: {dataset_path}") |
|
|
|
|
|
temp_file_path = tempfile.mktemp(suffix=".py") |
|
print(f"Creating temporary file for {model_name}: {temp_file_path}") |
|
|
|
with open(temp_file_path, 'w') as temp_file: |
|
temp_file.write(f""" |
|
import os |
|
import sys |
|
sys.path.append("{os.getcwd()}") |
|
|
|
from tasks.yourbench_lighteval_task import create_yourbench_task |
|
|
|
# Create yourbench task |
|
yourbench = create_yourbench_task("{dataset_path}", "lighteval") |
|
|
|
# Define TASKS_TABLE needed by lighteval |
|
TASKS_TABLE = [yourbench] |
|
""") |
|
|
|
|
|
cmd_args = [ |
|
"lighteval", |
|
"endpoint", |
|
"inference-providers", |
|
f"model={model_name},provider={provider}", |
|
"custom|yourbench|0|0", |
|
"--custom-tasks", |
|
temp_file_path, |
|
"--max-samples", "5", |
|
"--output-dir", output_dir, |
|
"--save-details", |
|
"--no-push-to-hub" |
|
] |
|
|
|
print(f"Running command for {model_name}: {' '.join(cmd_args)}") |
|
print(f"Start time for {model_name}: {time.strftime('%H:%M:%S')}") |
|
|
|
results = { |
|
"model_name": model_name, |
|
"provider": provider, |
|
"success": False, |
|
"error": None, |
|
"results": None, |
|
"return_code": None |
|
} |
|
|
|
try: |
|
|
|
env = os.environ.copy() |
|
hf_token = os.getenv("HF_TOKEN") |
|
if hf_token: |
|
env["HF_TOKEN"] = hf_token |
|
env["HUGGING_FACE_HUB_TOKEN"] = hf_token |
|
env["HF_ORGANIZATION"] = organization |
|
|
|
|
|
process = await asyncio.create_subprocess_exec( |
|
*cmd_args, |
|
stdout=asyncio.subprocess.PIPE, |
|
stderr=asyncio.subprocess.PIPE, |
|
env=env |
|
) |
|
|
|
|
|
stdout, stderr = await process.communicate() |
|
|
|
|
|
exit_code = process.returncode |
|
results["return_code"] = exit_code |
|
|
|
|
|
if stdout: |
|
stdout_lines = stdout.decode().strip().split('\n') |
|
if stdout_lines and len(stdout_lines) > 0: |
|
print(f"Output from {model_name}: {stdout_lines[0]}") |
|
|
|
|
|
results_dir = Path(output_dir) / "results" |
|
if results_dir.exists(): |
|
result_files = list(results_dir.glob("**/*.json")) |
|
if result_files: |
|
|
|
with open(result_files[0], 'r') as f: |
|
test_results = json.load(f) |
|
results["results"] = test_results |
|
results["success"] = True |
|
|
|
except asyncio.CancelledError: |
|
results["error"] = "Task cancelled" |
|
print(f"Task cancelled for {model_name}") |
|
except Exception as e: |
|
results["error"] = f"Exception: {str(e)}" |
|
print(f"Error running test for {model_name}: {str(e)}") |
|
finally: |
|
|
|
try: |
|
os.unlink(temp_file_path) |
|
except: |
|
pass |
|
|
|
print(f"End time for {model_name}: {time.strftime('%H:%M:%S')}") |
|
return results |
|
|
|
async def run_parallel_tests(models: List[Tuple[str, str]]) -> List[Dict[str, Any]]: |
|
""" |
|
Run tests in parallel for multiple models using asyncio |
|
""" |
|
print(f"Starting parallel tests for {len(models)} models") |
|
|
|
|
|
tasks = [run_lighteval_test_for_model(model) for model in models] |
|
|
|
|
|
model_results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
results = [] |
|
for i, result in enumerate(model_results): |
|
if isinstance(result, Exception): |
|
|
|
model_name, provider = models[i] |
|
print(f"Test failed for {model_name}: {str(result)}") |
|
results.append({ |
|
"model_name": model_name, |
|
"provider": provider, |
|
"success": False, |
|
"error": str(result), |
|
"results": None, |
|
"return_code": None |
|
}) |
|
else: |
|
|
|
results.append(result) |
|
print(f"Test completed for {result['model_name']}") |
|
|
|
return results |
|
|
|
def format_comparison_results(results: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
""" |
|
Format results for easy comparison between models |
|
""" |
|
comparison = { |
|
"metadata": { |
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
|
"total_models_tested": len(results), |
|
"successful_tests": len([r for r in results if r["success"]]) |
|
}, |
|
"models_comparison": [] |
|
} |
|
|
|
|
|
sorted_results = sorted( |
|
results, |
|
key=lambda x: ( |
|
x["results"]["results"]["all"]["accuracy"] if x["success"] and x["results"] else -1, |
|
x["model_name"] |
|
), |
|
reverse=True |
|
) |
|
|
|
for result in sorted_results: |
|
model_result = { |
|
"model_name": result["model_name"], |
|
"provider": result["provider"], |
|
"success": result["success"] |
|
} |
|
|
|
if result["success"] and result["results"]: |
|
model_result.update({ |
|
"accuracy": result["results"]["results"]["all"]["accuracy"], |
|
"accuracy_stderr": result["results"]["results"]["all"]["accuracy_stderr"], |
|
"evaluation_time": float(result["results"]["config_general"]["total_evaluation_time_secondes"]) |
|
}) |
|
else: |
|
model_result["error"] = result["error"] |
|
|
|
comparison["models_comparison"].append(model_result) |
|
|
|
return comparison |
|
|
|
async def main_async(): |
|
""" |
|
Async main function to run parallel tests |
|
""" |
|
print("Starting parallel lighteval tests") |
|
start_time = time.time() |
|
|
|
|
|
results = await run_parallel_tests(INIT_MODELS) |
|
|
|
|
|
detailed_output_file = "parallel_test_detailed_results.json" |
|
with open(detailed_output_file, 'w') as f: |
|
json.dump(results, f, indent=2) |
|
|
|
|
|
comparison = format_comparison_results(results) |
|
comparison_file = "models_comparison.json" |
|
with open(comparison_file, 'w') as f: |
|
json.dump(comparison, f, indent=2) |
|
|
|
|
|
print("\nTest Summary:") |
|
for model in comparison["models_comparison"]: |
|
status = "✅" if model["success"] else "❌" |
|
print(f"{status} {model['model_name']} ({model['provider']})") |
|
if not model["success"]: |
|
print(f" Error: {model['error']}") |
|
else: |
|
print(f" Accuracy: {model['accuracy']:.2%} (±{model['accuracy_stderr']:.2%})") |
|
print(f" Evaluation time: {model['evaluation_time']:.2f}s") |
|
|
|
duration = time.time() - start_time |
|
print(f"\nTotal execution time: {duration:.2f} seconds") |
|
print(f"Detailed results saved to: {detailed_output_file}") |
|
print(f"Comparison results saved to: {comparison_file}") |
|
|
|
def main(): |
|
""" |
|
Main function to run parallel tests |
|
""" |
|
|
|
loop = asyncio.get_event_loop() |
|
loop.run_until_complete(main_async()) |
|
loop.close() |
|
|
|
if __name__ == "__main__": |
|
main() |