|
""" |
|
Task to create and save the configuration file |
|
""" |
|
import os |
|
import pathlib |
|
import uuid |
|
import yaml |
|
import shutil |
|
import time |
|
import threading |
|
from typing import Optional, Dict, Any, List, Tuple |
|
|
|
from loguru import logger |
|
from huggingface_hub import HfApi |
|
|
|
|
|
class CreateBenchConfigTask: |
|
""" |
|
Task to create and save a configuration file for YourbenchSimpleDemo |
|
""" |
|
|
|
def __init__(self, session_uid: Optional[str] = None): |
|
""" |
|
Initialize the task with a session ID |
|
|
|
Args: |
|
session_uid: Optional session ID, will be generated if None |
|
""" |
|
self.session_uid = session_uid or str(uuid.uuid4()) |
|
self.logs: List[str] = [] |
|
self.is_completed = False |
|
self.is_running_flag = threading.Event() |
|
self.thread = None |
|
self._add_log("[INFO] Initializing configuration creation task") |
|
|
|
def _add_log(self, message: str) -> None: |
|
""" |
|
Add a log message to the logs list |
|
|
|
Args: |
|
message: Log message to add |
|
""" |
|
if message not in self.logs: |
|
self.logs.append(message) |
|
|
|
self.logs = self.logs.copy() |
|
|
|
logger.info(f"[{self.session_uid}] {message}") |
|
|
|
def get_logs(self) -> List[str]: |
|
""" |
|
Get all logs for this task |
|
|
|
Returns: |
|
List of log messages |
|
""" |
|
return self.logs.copy() |
|
|
|
def save_uploaded_file(self, file_path: str) -> str: |
|
""" |
|
Process the uploaded file that is already in the correct directory |
|
|
|
Args: |
|
file_path: Path to the uploaded file |
|
|
|
Returns: |
|
Path to the file (same as input) |
|
""" |
|
try: |
|
|
|
|
|
self._add_log(f"[INFO] Processing file: {os.path.basename(file_path)}") |
|
return file_path |
|
except Exception as e: |
|
error_msg = f"Error processing file: {str(e)}" |
|
self._add_log(f"[ERROR] {error_msg}") |
|
raise RuntimeError(error_msg) |
|
|
|
def generate_base_config(self, hf_org: str, hf_dataset_name: str) -> Dict[str, Any]: |
|
""" |
|
Create the base configuration dictionary |
|
|
|
Args: |
|
hf_org: Hugging Face organization name |
|
hf_dataset_name: Hugging Face dataset name |
|
|
|
Returns: |
|
Configuration dictionary |
|
""" |
|
self._add_log(f"[INFO] Generating base configuration for {hf_dataset_name}") |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if not hf_token: |
|
raise RuntimeError("HF_TOKEN environment variable is not defined") |
|
|
|
return { |
|
"hf_configuration": { |
|
"token": "$HF_TOKEN", |
|
"hf_organization": "$HF_ORGANIZATION", |
|
"private": True, |
|
"hf_dataset_name": hf_dataset_name, |
|
"concat_if_exist": False, |
|
}, |
|
"model_list": [ |
|
{ |
|
"model_name": "Qwen/Qwen2.5-VL-72B-Instruct", |
|
"provider": "novita", |
|
"api_key": "$HF_TOKEN", |
|
"max_concurrent_requests": 32, |
|
}, |
|
{ |
|
"model_name": "Qwen/Qwen2.5-72B-Instruct", |
|
"provider": "novita", |
|
"api_key": "$HF_TOKEN", |
|
"max_concurrent_requests": 32, |
|
}, |
|
], |
|
|
|
"model_roles": { |
|
"ingestion": ["Qwen/Qwen2.5-VL-72B-Instruct"], |
|
"summarization": ["Qwen/Qwen2.5-72B-Instruct"], |
|
"chunking": ["intfloat/multilingual-e5-large-instruct"], |
|
"single_shot_question_generation": ["Qwen/Qwen2.5-72B-Instruct"], |
|
"multi_hop_question_generation": ["Qwen/Qwen2.5-72B-Instruct"], |
|
}, |
|
"pipeline": { |
|
"ingestion": { |
|
"source_documents_dir": f"uploaded_files/{self.session_uid}/uploaded_files/", |
|
"output_dir": f"uploaded_files/{self.session_uid}/ingested", |
|
"run": True, |
|
}, |
|
"upload_ingest_to_hub": { |
|
"source_documents_dir": f"uploaded_files/{self.session_uid}/ingested", |
|
"run": True, |
|
}, |
|
"summarization": { |
|
"run": True, |
|
}, |
|
"chunking": { |
|
"run": True, |
|
"chunking_configuration": { |
|
"l_min_tokens": 64, |
|
"l_max_tokens": 128, |
|
"tau_threshold": 0.8, |
|
"h_min": 2, |
|
"h_max": 5, |
|
"num_multihops_factor": 2, |
|
}, |
|
}, |
|
"single_shot_question_generation": { |
|
"run": True, |
|
"additional_instructions": "Generate questions to test a curious adult", |
|
"chunk_sampling": { |
|
"mode": "count", |
|
"value": 5, |
|
"random_seed": 123, |
|
}, |
|
}, |
|
"multi_hop_question_generation": { |
|
"run": True, |
|
"additional_instructions": "Generate questions to test a curious adult", |
|
"chunk_sampling": { |
|
"mode": "percentage", |
|
"value": 0.3, |
|
"random_seed": 42, |
|
}, |
|
}, |
|
"lighteval": { |
|
"run": True, |
|
}, |
|
}, |
|
} |
|
|
|
def save_yaml_file(self, config: Dict[str, Any], path: str) -> str: |
|
""" |
|
Save the given configuration dictionary to a YAML file |
|
|
|
Args: |
|
config: Configuration dictionary |
|
path: Path to save the file |
|
|
|
Returns: |
|
Path to the saved file |
|
""" |
|
try: |
|
|
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
|
|
with open(path, "w") as file: |
|
yaml.dump(config, file, default_flow_style=False, sort_keys=False) |
|
|
|
self._add_log(f"[INFO] Configuration saved: {path}") |
|
return path |
|
except Exception as e: |
|
error_msg = f"Error saving configuration: {str(e)}" |
|
self._add_log(f"[ERROR] {error_msg}") |
|
raise RuntimeError(error_msg) |
|
|
|
def _run_task(self, file_path: str) -> str: |
|
""" |
|
Internal method to run the task in a separate thread |
|
|
|
Args: |
|
file_path: Path to the uploaded file |
|
|
|
Returns: |
|
Path to the configuration file |
|
""" |
|
try: |
|
|
|
org_name = os.getenv("HF_ORGANIZATION") |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if not hf_token: |
|
raise RuntimeError("HF_TOKEN environment variable is not defined") |
|
|
|
self._add_log(f"[INFO] Organization: {org_name}") |
|
|
|
time.sleep(0.5) |
|
|
|
|
|
saved_file_path = self.save_uploaded_file(file_path) |
|
|
|
time.sleep(1) |
|
|
|
|
|
config_dir = pathlib.Path(f"uploaded_files/{self.session_uid}") |
|
config_path = config_dir / "config.yml" |
|
|
|
|
|
dataset_name = f"yourbench_{self.session_uid}" |
|
self._add_log(f"[INFO] Dataset name: {dataset_name}") |
|
|
|
time.sleep(0.8) |
|
|
|
|
|
config = self.generate_base_config(org_name, dataset_name) |
|
|
|
time.sleep(1.2) |
|
|
|
config_file_path = self.save_yaml_file(config, str(config_path)) |
|
|
|
self._add_log(f"[INFO] Configuration generated successfully: {config_file_path}") |
|
|
|
|
|
time.sleep(1.5) |
|
self._add_log("[INFO] Starting ingestion") |
|
|
|
time.sleep(2) |
|
self._add_log(f"[INFO] Processing file: {dataset_name}") |
|
|
|
time.sleep(2) |
|
self._add_log("[SUCCESS] Stage completed: config_generation") |
|
|
|
|
|
self.mark_task_completed() |
|
|
|
return str(config_path) |
|
except Exception as e: |
|
error_msg = f"Error generating configuration: {str(e)}" |
|
self._add_log(f"[ERROR] {error_msg}") |
|
self.mark_task_completed() |
|
raise RuntimeError(error_msg) |
|
|
|
def run(self, file_path: str, token: Optional[str] = None) -> str: |
|
""" |
|
Run the task to create and save the configuration file asynchronously |
|
|
|
Args: |
|
file_path: Path to the uploaded file |
|
token: Hugging Face token (not used, using HF_TOKEN from environment) |
|
|
|
Returns: |
|
Path to the configuration file |
|
""" |
|
|
|
self.is_running_flag.set() |
|
|
|
|
|
self.thread = threading.Thread(target=self._run_task, args=(file_path,)) |
|
self.thread.daemon = True |
|
self.thread.start() |
|
|
|
|
|
return f"uploaded_files/{self.session_uid}/config.yml" |
|
|
|
def is_running(self) -> bool: |
|
""" |
|
Check if the task is running |
|
|
|
Returns: |
|
True if running, False otherwise |
|
""" |
|
return self.is_running_flag.is_set() and not self.is_completed |
|
|
|
def is_task_completed(self) -> bool: |
|
""" |
|
Check if the task is completed |
|
|
|
Returns: |
|
True if completed, False otherwise |
|
""" |
|
return self.is_completed |
|
|
|
def mark_task_completed(self) -> None: |
|
""" |
|
Mark the task as completed |
|
""" |
|
self.is_completed = True |
|
self.is_running_flag.clear() |
|
self._add_log("[INFO] Configuration generation task completed") |