Spaces:
Sleeping
Sleeping
import json | |
import os | |
from datetime import datetime, timezone | |
from src.display.formatting import styled_error, styled_message, styled_warning | |
from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, QUEUE_REPO, EVAL_RESULTS_PATH, RESULTS_REPO | |
from src.submission.check_validity import ( | |
already_submitted_models, | |
check_model_card, | |
get_model_size, | |
is_model_on_hub, | |
) | |
REQUESTED_MODELS = None | |
USERS_TO_SUBMISSION_DATES = None | |
def add_new_eval( | |
model: str, | |
base_model: str, | |
revision: str, | |
precision: str, | |
weight_type: str, | |
model_type: str, | |
): | |
global REQUESTED_MODELS | |
global USERS_TO_SUBMISSION_DATES | |
if not REQUESTED_MODELS: | |
REQUESTED_MODELS, USERS_TO_SUBMISSION_DATES = already_submitted_models(EVAL_REQUESTS_PATH) | |
user_name = "" | |
model_path = model | |
if "/" in model: | |
user_name = model.split("/")[0] | |
model_path = model.split("/")[1] | |
precision = precision.split(" ")[0] | |
current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
if model_type is None or model_type == "": | |
return styled_error("Please select a model type.") | |
# Does the model actually exist? | |
if revision == "": | |
revision = "main" | |
# Is the model on the hub? | |
if weight_type in ["Delta", "Adapter"]: | |
base_model_on_hub, error, _ = is_model_on_hub(model_name=base_model, revision=revision, token=TOKEN, test_tokenizer=True) | |
if not base_model_on_hub: | |
return styled_error(f'Base model "{base_model}" {error}') | |
if not weight_type == "Adapter": | |
model_on_hub, error, _ = is_model_on_hub(model_name=model, revision=revision, token=TOKEN, test_tokenizer=True) | |
if not model_on_hub: | |
return styled_error(f'Model "{model}" {error}') | |
# Is the model info correctly filled? | |
try: | |
model_info = API.model_info(repo_id=model, revision=revision) | |
except Exception: | |
return styled_error("Could not get your model information. Please fill it up properly.") | |
model_size = get_model_size(model_info=model_info, precision=precision) | |
# Were the model card and license filled? | |
try: | |
license = model_info.cardData["license"] | |
except Exception: | |
return styled_error("Please select a license for your model") | |
modelcard_OK, error_msg = check_model_card(model) | |
if not modelcard_OK: | |
return styled_error(error_msg) | |
# Seems good, creating the eval | |
print("Adding new eval") | |
eval_entry = { | |
"model": model, | |
"base_model": base_model, | |
"revision": revision, | |
"precision": precision, | |
"weight_type": weight_type, | |
"status": "PENDING", | |
"submitted_time": current_time, | |
"model_type": model_type, | |
"likes": model_info.likes, | |
"params": model_size, | |
"license": license, | |
"private": False, | |
} | |
# Check for duplicate submission | |
if f"{model}_{revision}_{precision}" in REQUESTED_MODELS: | |
return styled_warning("This model has been already submitted.") | |
print("Creating eval file") | |
OUT_DIR = f"{EVAL_REQUESTS_PATH}/{user_name}" | |
os.makedirs(OUT_DIR, exist_ok=True) | |
out_path = f"{OUT_DIR}/{model_path}_eval_request_False_{precision}_{weight_type}.json" | |
with open(out_path, "w") as f: | |
f.write(json.dumps(eval_entry)) | |
print("Uploading eval file") | |
API.upload_file( | |
path_or_fileobj=out_path, | |
path_in_repo=out_path.split("eval-queue/")[1], | |
repo_id=QUEUE_REPO, | |
repo_type="dataset", | |
commit_message=f"Add {model} to eval queue", | |
) | |
# Remove the local file | |
os.remove(out_path) | |
return styled_message( | |
"Your request has been submitted to the evaluation queue!\nPlease wait for up to an hour for the model to show in the PENDING list." | |
) | |
# -------------------------------------------------------- | |
# Manual metrics submission (bypass evaluation queue) | |
# -------------------------------------------------------- | |
ALL_SUBJECTIVE_FIELDS = [ | |
"readability", | |
"relevance", | |
"explanation_clarity", | |
"problem_identification", | |
"actionability", | |
"completeness", | |
"specificity", | |
"contextual_adequacy", | |
"consistency", | |
"brevity", | |
] | |
def _compute_multimetric(payload: dict) -> float: | |
"""Average of the 10 subjective metrics.""" | |
total = sum(float(payload[f]) for f in ALL_SUBJECTIVE_FIELDS) | |
return total / len(ALL_SUBJECTIVE_FIELDS) | |
def add_manual_results( | |
model: str, | |
revision: str, | |
bleu: float, | |
readability: int, | |
relevance: int, | |
explanation_clarity: int, | |
problem_identification: int, | |
actionability: int, | |
completeness: int, | |
specificity: int, | |
contextual_adequacy: int, | |
consistency: int, | |
brevity: int, | |
pass_at_1: float, | |
pass_at_5: float, | |
pass_at_10: float, | |
): | |
"""Directly submit evaluation metrics for a model and push them to the results dataset.""" | |
# Basic validation | |
if model == "": | |
return styled_error("Please specify a model name.") | |
if revision == "": | |
revision = "main" | |
if pass_at_5 < pass_at_1: | |
return styled_error("pass@5 must be greater or equal to pass@1") | |
if pass_at_10 < pass_at_5: | |
return styled_error("pass@10 must be greater or equal to pass@5") | |
# Prepare dictionary in the same format used by read_evals.py | |
payload_dict = { | |
"model": model, | |
"revision": revision, | |
"bleu": bleu, | |
"readability": readability, | |
"relevance": relevance, | |
"explanation_clarity": explanation_clarity, | |
"problem_identification": problem_identification, | |
"actionability": actionability, | |
"completeness": completeness, | |
"specificity": specificity, | |
"contextual_adequacy": contextual_adequacy, | |
"consistency": consistency, | |
"brevity": brevity, | |
"pass_at_1": pass_at_1, | |
"pass_at_5": pass_at_5, | |
"pass_at_10": pass_at_10, | |
} | |
multimetric = _compute_multimetric(payload_dict) | |
# Compose final results file (same structure as api_submit_results) | |
result_json = { | |
"config": { | |
"model_dtype": "unknown", | |
"model_name": model, | |
"model_sha": revision, | |
}, | |
"results": { | |
"bleu": {"score": bleu}, | |
"multimetric": {"score": multimetric}, | |
"pass_at_1": {"score": pass_at_1}, | |
"pass_at_5": {"score": pass_at_5}, | |
"pass_at_10": {"score": pass_at_10}, | |
}, | |
} | |
# Add subjective metrics | |
for field in ALL_SUBJECTIVE_FIELDS: | |
result_json["results"][field] = {"score": payload_dict[field]} | |
# Write file locally then upload | |
try: | |
os.makedirs(EVAL_RESULTS_PATH, exist_ok=True) | |
except Exception: | |
pass | |
from datetime import datetime, timezone | |
import uuid | |
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") | |
unique_id = uuid.uuid4().hex[:8] | |
filename = f"results_{model.replace('/', '_')}_{ts}_{unique_id}.json" | |
local_path = os.path.join(EVAL_RESULTS_PATH, filename) | |
try: | |
with open(local_path, "w") as fp: | |
json.dump(result_json, fp) | |
API.upload_file( | |
path_or_fileobj=local_path, | |
path_in_repo=filename, | |
repo_id=RESULTS_REPO, | |
repo_type="dataset", | |
commit_message=f"Add manual results for {model}", | |
) | |
except Exception as e: | |
return styled_error(f"Failed to upload results: {e}") | |
finally: | |
if os.path.exists(local_path): | |
os.remove(local_path) | |
return styled_message("Metrics successfully submitted! The leaderboard will refresh shortly.") | |