from concurrent.futures import ThreadPoolExecutor, as_completed import os import csv import json import fitz import time import requests import pandas as pd from time import sleep from pathlib import Path from system.html2lines import url2lines, line_correction, html2metadata MAX_RETRIES = 3 TIMEOUT = 5 # seconds def scrape_text_from_url(url, temp_name): response = None for attempt in range(MAX_RETRIES): try: response = requests.get(url, timeout=TIMEOUT) break except requests.RequestException: if attempt < MAX_RETRIES - 1: sleep(3) if response is None or response.status_code == 503: return [] if url.endswith(".pdf"): pdf_dir = Path("/tmp/pdf_dir") pdf_dir.mkdir(parents=True, exist_ok=True) pdf_path = pdf_dir / f"{temp_name}.pdf" with open(pdf_path, "wb") as f: f.write(response.content) extracted_text = "" doc = fitz.open(str(pdf_path)) for page in doc: extracted_text += page.get_text() or "" return line_correction(extracted_text.split("\n")) return line_correction(url2lines(url)) def process_row(row, claim_id): try: url = row[2] json_data = { "claim_id": claim_id, "type": row[1], "query": row[3], "url": url, "url2text": scrape_text_from_url(url, claim_id), "metadata": {} } meta = html2metadata(url) json_data["metadata"] = { "title": meta.get("title"), "date": meta.get("date") } return json_data except Exception as e: print(f"[WARN] Failed to scrape {row[2]}: {e}") return None def run_scraper(tsv_file_path: str, output_jsonl_path: str, max_workers: int = 10): claim_id = Path(tsv_file_path).stem output_jsonl_path = Path(output_jsonl_path) output_jsonl_path.parent.mkdir(parents=True, exist_ok=True) if output_jsonl_path.exists(): print(f"[INFO] Skipping processing as output file already exists: {output_jsonl_path}") return str(output_jsonl_path) try: df = pd.read_csv(tsv_file_path, sep="\t", header=None) print("[INFO] Data loaded successfully with Pandas.") except Exception as e: raise RuntimeError(f"[ERROR] Failed to load TSV: {e}") results = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(process_row, row, claim_id) for _, row in df.iterrows()] for future in as_completed(futures): result = future.result() if result: results.append(result) with open(output_jsonl_path, "w", encoding="utf-8") as json_file: for item in results: json_file.write(json.dumps(item, ensure_ascii=False) + "\n") print(f"[SYSTEM] Output saved to {output_jsonl_path}") return str(output_jsonl_path)