Spaces:
Sleeping
Sleeping
from huggingface_hub import login | |
from datetime import datetime | |
import os, time | |
import pandas as pd | |
from system.initial_searching import run_initial_searching | |
from system.scraper import run_scraper | |
from system.hero_pipeline import run_hero_pipeline, run_hero_reranking | |
from system.augmented_searching import run_augmented_searching | |
from system.generate_output import process_manifesto_data_with_metadata | |
from system.ee import run_gpt4_event_extraction | |
from system.process_time import extract_and_sort_events | |
import spacy | |
import subprocess | |
from huggingface_hub import hf_hub_download | |
import json | |
try: | |
spacy.load("en_core_web_sm") | |
except OSError: | |
print("π Downloading en_core_web_sm model ...") | |
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"]) | |
nlp = spacy.load("en_core_web_sm") | |
def count_total_events(output_path): | |
with open(output_path, "r", encoding="utf-8") as f: | |
results = json.load(f) | |
total_events = 0 | |
for result in results: | |
total_events+= len(result["output"]["events"]) | |
print(f"{total_events} events in total") | |
return total_events | |
def run_pipeline(claim, pledge_date, pledge_author, start_date, timestamp, user_id, update_fn=None, suggestion_meta=None): | |
pipeline_base_dir = f"outputs/{timestamp}_{user_id}" | |
os.makedirs(pipeline_base_dir, exist_ok=True) | |
step_id=1 | |
# Step 1: Google ζη΄’ | |
if suggestion_meta==None: | |
print("π Step 1: Initial searching ...") | |
initial_tsv_file, claim_json_path = run_initial_searching( | |
claim_text=f"{pledge_author} : {claim} ({pledge_date})", | |
# pledge_author=pledge_author, | |
pipeline_base_dir=pipeline_base_dir, | |
start_date=start_date, | |
end_date="", | |
user_id=user_id, | |
claim_id=0, | |
) | |
with open(initial_tsv_file, "r", encoding="utf-8") as f: | |
line_count = sum(1 for line in f) | |
if update_fn: | |
update_fn(step_id, f"{line_count} URLs are retrieved") | |
step_id+=1 | |
print("π Step 2: Scraping URLs ...") | |
initial_data_store_dir = os.path.join(pipeline_base_dir, "initial_data_store") | |
os.makedirs(initial_data_store_dir, exist_ok=True) | |
initial_scraped_output_path = os.path.join(initial_data_store_dir, "0.jsonl") | |
run_scraper(initial_tsv_file, initial_scraped_output_path) | |
with open(initial_scraped_output_path, "r", encoding="utf-8") as f: | |
line_count = sum(1 for line in f if json.loads(line)["url2text"] != []) | |
if update_fn: | |
update_fn(step_id, f"{line_count} URL pages have been successefully scraped") | |
step_id+=1 | |
print("π§ Step 3: HerO processing ...") | |
hero_output_dir = os.path.join(pipeline_base_dir, "hero") | |
os.makedirs(hero_output_dir, exist_ok=True) | |
run_hero_pipeline(pipeline_base_dir) | |
qa_file_path = os.path.join(hero_output_dir, "manifesto_icl_top_k_qa.json") | |
with open(qa_file_path, "r", encoding="utf-8") as f: | |
questions = {line["question"] for line in json.load(f)["evidence"]} | |
questions = list(questions) | |
line_count = len(questions) | |
if update_fn: | |
update_fn(step_id, f"{line_count} relevant queries are generated, for example:\n" | |
f" 1. {questions[0]}\n" | |
f" 2. {questions[1]}\n" | |
f" 3. {questions[2]}\n" | |
f" 4. {questions[3]}\n" | |
f" 5. {questions[4]}") | |
step_id+=1 | |
else: | |
claim_json_path = None | |
initial_scraped_output_path = None | |
initial_tsv_file = None | |
hero_output_dir = None | |
qa_file_path = hf_hub_download( | |
repo_id="PledgeTracker/demo_feedback", | |
filename="manifesto_with_QA_icl_top_k_qa.json", | |
repo_type="dataset", | |
token=os.environ["HF_TOKEN"] | |
) | |
idx = suggestion_meta["index"] | |
qa_lines = open(f"{qa_file_path}","r").readlines()[idx] | |
questions = {line["question"] for line in json.loads(qa_lines)["evidence"]} | |
questions = list(questions) | |
line_count = len(questions) | |
if update_fn: | |
update_fn(step_id, f"relevant queries are generated, for example:\n" | |
f" 1. {questions[0]}\n" | |
f" 2. {questions[1]}\n" | |
f" 3. {questions[2]}\n" | |
f" 4. {questions[3]}\n" | |
f" 5. {questions[4]}") | |
step_id+=1 | |
try: | |
augmented_tsv_file = run_augmented_searching( | |
qa_file=qa_file_path, | |
pledge_author=pledge_author, | |
pledge_date=pledge_date, | |
pipeline_base_dir=pipeline_base_dir, | |
start_date=start_date, | |
suggestion_meta=suggestion_meta, | |
end_date="", | |
) | |
with open(augmented_tsv_file, "r", encoding="utf-8") as f: | |
line_count = sum(1 for line in f) | |
if update_fn: | |
update_fn(step_id, f"{line_count} URLs are retrieved") | |
step_id+=1 | |
except Exception as e: | |
if update_fn: | |
update_fn(step_id, f"β run_augmented_searching failed: {e}") | |
raise | |
augmented_data_store_dir = os.path.join(pipeline_base_dir, "augmented_data_store") | |
os.makedirs(augmented_data_store_dir, exist_ok=True) | |
try: | |
augmented_scraped_output_path = os.path.join(augmented_data_store_dir, "0.jsonl") | |
run_scraper(augmented_tsv_file, augmented_scraped_output_path) | |
with open(augmented_scraped_output_path, "r", encoding="utf-8") as f: | |
line_count = sum(1 for line in f if json.loads(line)["url2text"] != []) | |
if update_fn: | |
update_fn(step_id, f"{line_count} URL pages have been successefully scraped") | |
step_id+=1 | |
except Exception as e: | |
if update_fn: | |
update_fn(step_id, f"β run_scraper failed: {e}") | |
raise | |
try: | |
run_hero_reranking(pipeline_base_dir, suggestion_meta) | |
meta_data_dir = process_manifesto_data_with_metadata(input_base_dir=pipeline_base_dir) | |
all_info_path = os.path.join(pipeline_base_dir, "all_info_with_txt.json") | |
unique_urls = set() | |
with open(all_info_path, "r", encoding="utf-8") as f: | |
for line in f: | |
data = json.loads(line) | |
docs = data.get("evidence", []) | |
for doc in docs: | |
if "url" in doc: | |
unique_urls.add(doc["url"]) | |
if update_fn: | |
update_fn(step_id, f"{len(unique_urls)} documents are selected") | |
step_id+=1 | |
except Exception as e: | |
if update_fn: | |
update_fn(step_id, f"β run_hero_reranking failed: {e}") | |
raise | |
try: | |
extracted_event_path = run_gpt4_event_extraction(data_dir=pipeline_base_dir, max_tokens=100000) | |
events_num = count_total_events(extracted_event_path) | |
if update_fn: | |
update_fn(step_id, f"{events_num} events are extracted from those documents.") | |
step_id+=1 | |
except Exception as e: | |
if update_fn: | |
update_fn(step_id, f"β Event extraction failed: {e}") | |
raise | |
print("π Sorting events temporally ...") | |
sorted_events = extract_and_sort_events( | |
data_dir=pipeline_base_dir, | |
pledge_date=pledge_date, | |
pledge_author=pledge_author, | |
claim=claim, | |
suggestion_meta=suggestion_meta | |
) | |
df = pd.DataFrame(sorted_events) | |
sorted_event_path = f"{pipeline_base_dir}/sorted_events.xlsx" | |
df.to_excel(sorted_event_path, index=False) | |
print(sorted_event_path) | |
if update_fn: | |
update_fn(step_id, "All done!") | |
step_id += 1 | |
return { | |
"claim_json": claim_json_path, | |
"initial_scraped_jsonl": initial_scraped_output_path, | |
"initial_tsv_file": initial_tsv_file, | |
"hero_dir": hero_output_dir, | |
"augmented_scraped_jsonl": augmented_scraped_output_path, | |
"augmented_tsv_file": augmented_tsv_file, | |
"meta_data_dir": meta_data_dir, | |
"unsorted_events": extracted_event_path, | |
"sorted_events": sorted_event_path, | |
"step_id": step_id | |
} | |
if __name__ == "__main__": | |
start = time.time() | |
if os.environ.get("HF_TOKEN"): | |
login(token=os.environ["HF_TOKEN"]) | |
else: | |
print("No Hugging Face token found in environment variable HF_TOKEN.") | |
claim = "βWe will support families with children by introducing free breakfast clubs in every primary schoolβ" | |
start_date = "20250504" | |
timestamp = "xxxxx" | |
user_id = "xxx" | |
outputs = run_pipeline(claim, time_start, timestamp, user_id) | |
print("π― Pipeline finished. Outputs:", outputs) | |
print(f"β±οΈ Total time: {time.time() - start:.2f} seconds") | |