|
""" |
|
Task to ingest and transform documents to markdown using yourbench |
|
""" |
|
import os |
|
import time |
|
import pathlib |
|
import subprocess |
|
import threading |
|
from typing import Optional, List, Tuple, Dict, Any |
|
import yaml |
|
|
|
from loguru import logger |
|
|
|
|
|
class CreateBenchTask: |
|
""" |
|
Task to ingest and transform documents to markdown using yourbench |
|
""" |
|
|
|
def __init__(self, session_uid: str, config_path: Optional[str] = None): |
|
""" |
|
Initialize the ingestion task |
|
|
|
Args: |
|
session_uid: Session ID for this task |
|
config_path: Path to the configuration file, will be generated if None |
|
""" |
|
self.session_uid = session_uid |
|
self.logs: List[str] = [] |
|
self.is_completed = False |
|
self.process = None |
|
self.is_running_flag = threading.Event() |
|
|
|
|
|
if config_path is None: |
|
config_path = f"uploaded_files/{session_uid}/config.yml" |
|
self.config_path = config_path |
|
|
|
|
|
self.command = ["yourbench", "run", "--config", str(self.config_path)] |
|
|
|
self._add_log("[INFO] Initializing ingestion task") |
|
self._add_log(f"[INFO] Using configuration file: {self.config_path}") |
|
|
|
def _add_log(self, message: str) -> None: |
|
""" |
|
Add a log message to the logs list |
|
|
|
Args: |
|
message: Log message to add |
|
""" |
|
if message not in self.logs: |
|
self.logs.append(message) |
|
|
|
self.logs = self.logs.copy() |
|
|
|
logger.info(f"[{self.session_uid}] {message}") |
|
|
|
def get_logs(self) -> List[str]: |
|
""" |
|
Get all logs for this task |
|
|
|
Returns: |
|
List of log messages |
|
""" |
|
return self.logs.copy() |
|
|
|
def is_task_completed(self) -> bool: |
|
""" |
|
Check if the task is completed |
|
|
|
Returns: |
|
True if completed, False otherwise |
|
""" |
|
return self.is_completed |
|
|
|
def is_running(self) -> bool: |
|
""" |
|
Check if the process is running |
|
|
|
Returns: |
|
True if running, False otherwise |
|
""" |
|
return self.is_running_flag.is_set() |
|
|
|
def stop(self) -> None: |
|
""" |
|
Stop the process if it's running |
|
""" |
|
if self.process and self.is_running(): |
|
self._add_log("[INFO] Stopping ingestion process") |
|
try: |
|
self.process.terminate() |
|
|
|
self.process.wait(timeout=5) |
|
except subprocess.TimeoutExpired: |
|
self._add_log("[WARN] Process not responding, forcing termination") |
|
self.process.kill() |
|
finally: |
|
self.is_running_flag.clear() |
|
self.is_completed = True |
|
self._add_log("[INFO] Ingestion process stopped") |
|
|
|
def _capture_output(self) -> None: |
|
""" |
|
Capture and process the output from the yourbench process |
|
""" |
|
self._add_log("[INFO] Starting output capture") |
|
|
|
try: |
|
while self.is_running() and self.process: |
|
line = self.process.stdout.readline() |
|
if not line: |
|
|
|
if self.process.poll() is not None: |
|
self.is_running_flag.clear() |
|
break |
|
|
|
time.sleep(0.1) |
|
continue |
|
|
|
|
|
line = line.strip() |
|
if line: |
|
|
|
if "ERROR" in line: |
|
self._add_log(f"[ERROR] {line}") |
|
elif "WARNING" in line: |
|
self._add_log(f"[WARN] {line}") |
|
else: |
|
|
|
if "Completed stage:" in line: |
|
stage = line.split("'")[1] if "'" in line else line |
|
self._add_log(f"[SUCCESS] Stage completed: {stage}") |
|
else: |
|
self._add_log(f"[INFO] {line}") |
|
|
|
|
|
if self.process: |
|
exit_code = self.process.poll() |
|
if exit_code == 0: |
|
self._add_log("[SUCCESS] Ingestion process completed successfully") |
|
else: |
|
self._add_log(f"[ERROR] Ingestion process terminated with error code: {exit_code}") |
|
except Exception as e: |
|
self._add_log(f"[ERROR] Error during output capture: {str(e)}") |
|
finally: |
|
self.is_completed = True |
|
self.is_running_flag.clear() |
|
self._add_log("[INFO] Output capture completed") |
|
|
|
def run(self, token: Optional[str] = None) -> None: |
|
""" |
|
Run the ingestion task |
|
|
|
Args: |
|
token: Hugging Face token |
|
""" |
|
try: |
|
self._add_log("[INFO] Starting ingestion process") |
|
|
|
|
|
if not os.path.exists(self.config_path): |
|
raise FileNotFoundError(f"Configuration file does not exist: {self.config_path}") |
|
|
|
|
|
try: |
|
with open(self.config_path, 'r') as f: |
|
config_yaml = yaml.safe_load(f) |
|
|
|
|
|
source_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("source_documents_dir", "") |
|
output_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("output_dir", "") |
|
|
|
if source_dir: |
|
self._add_log(f"[INFO] Source directory: {source_dir}") |
|
if output_dir: |
|
self._add_log(f"[INFO] Output directory: {output_dir}") |
|
|
|
|
|
if source_dir and os.path.exists(source_dir): |
|
files = os.listdir(source_dir) |
|
if files: |
|
self._add_log(f"[INFO] Files to process: {', '.join(files)}") |
|
else: |
|
self._add_log("[WARN] No files found in source directory") |
|
|
|
except Exception as e: |
|
self._add_log(f"[WARN] Unable to read configuration: {str(e)}") |
|
|
|
|
|
env = os.environ.copy() |
|
|
|
|
|
hf_token = os.getenv("HF_TOKEN") |
|
if hf_token: |
|
|
|
env["HF_TOKEN"] = hf_token |
|
env["HUGGING_FACE_HUB_TOKEN"] = hf_token |
|
env["HF_ORGANIZATION"] = os.getenv("HF_ORGANIZATION", "yourbench") |
|
self._add_log("[INFO] Environment variables HF_TOKEN, HUGGING_FACE_HUB_TOKEN and HF_ORGANIZATION exported") |
|
|
|
|
|
if os.environ.get("DEVELOPMENT_MODE", "").lower() == "true": |
|
self._add_log("[INFO] Development mode enabled, simulating ingestion") |
|
self._simulate_ingestion_process() |
|
return |
|
|
|
|
|
self._add_log(f"[INFO] Executing command: {' '.join(self.command)}") |
|
|
|
self.process = subprocess.Popen( |
|
self.command, |
|
stdout=subprocess.PIPE, |
|
stderr=subprocess.STDOUT, |
|
text=True, |
|
bufsize=1, |
|
universal_newlines=True, |
|
env=env |
|
) |
|
|
|
|
|
self.is_running_flag.set() |
|
|
|
|
|
output_thread = threading.Thread(target=self._capture_output) |
|
output_thread.daemon = True |
|
output_thread.start() |
|
|
|
self._add_log(f"[INFO] Process started with PID: {self.process.pid}") |
|
|
|
except Exception as e: |
|
self._add_log(f"[ERROR] Error starting ingestion process: {str(e)}") |
|
self.is_completed = True |
|
|
|
def _simulate_ingestion_process(self) -> None: |
|
""" |
|
Simulate the ingestion process for testing/development |
|
This will be removed in production |
|
""" |
|
|
|
|
|
|
|
threading.Thread(target=self._simulate_logs).start() |
|
|
|
def _simulate_logs(self) -> None: |
|
""" |
|
Simulate logs for testing/development |
|
This will be used when yourbench isn't installed or in development mode |
|
""" |
|
|
|
self._add_log("[INFO] Simulation mode enabled (yourbench is not actually running)") |
|
|
|
|
|
source_files = [] |
|
try: |
|
with open(self.config_path, 'r') as f: |
|
config_yaml = yaml.safe_load(f) |
|
|
|
source_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("source_documents_dir", "") |
|
if source_dir and os.path.exists(source_dir): |
|
source_files = [f for f in os.listdir(source_dir) |
|
if os.path.isfile(os.path.join(source_dir, f))] |
|
except Exception: |
|
source_files = ["document.pdf", "document.txt"] |
|
|
|
|
|
output_dir = "" |
|
try: |
|
output_dir = config_yaml.get("pipeline", {}).get("ingestion", {}).get("output_dir", "") |
|
if output_dir: |
|
os.makedirs(output_dir, exist_ok=True) |
|
except Exception: |
|
pass |
|
|
|
|
|
time.sleep(1) |
|
self._add_log("[INFO] Initializing document ingestion") |
|
time.sleep(1.5) |
|
self._add_log("[INFO] Loading configuration parameters") |
|
time.sleep(1) |
|
self._add_log("[INFO] Verifying source files") |
|
|
|
|
|
for file in source_files: |
|
time.sleep(1.5) |
|
self._add_log(f"[INFO] Processing file: {file}") |
|
time.sleep(2) |
|
self._add_log(f"[INFO] Extracting content from {file}") |
|
time.sleep(1.5) |
|
self._add_log(f"[INFO] Converting to markdown: {file}") |
|
|
|
|
|
if output_dir: |
|
base_name = os.path.splitext(file)[0] |
|
output_file = os.path.join(output_dir, f"{base_name}.md") |
|
try: |
|
with open(output_file, 'w') as f: |
|
f.write(f"# {base_name}\n\n") |
|
f.write("This is a markdown document automatically generated by the simulation.\n\n") |
|
f.write("## Section 1\n\n") |
|
f.write("Content of section 1...\n\n") |
|
f.write("## Section 2\n\n") |
|
f.write("Content of section 2...\n\n") |
|
self._add_log(f"[INFO] Markdown file created: {output_file}") |
|
except Exception as e: |
|
self._add_log(f"[ERROR] Error creating markdown file: {str(e)}") |
|
|
|
time.sleep(2) |
|
self._add_log("[INFO] Finalizing processing") |
|
time.sleep(1) |
|
self._add_log("[SUCCESS] Stage completed: ingestion") |
|
time.sleep(0.5) |
|
self._add_log("[SUCCESS] Ingestion completed successfully") |
|
|
|
|
|
self.is_completed = True |