Pledge_Tracker / system /pledge_tracking.py
yulongchen's picture
Add system
273bc17
from huggingface_hub import login
from datetime import datetime
import os, time
import pandas as pd
from system.initial_searching import run_initial_searching
from system.scraper import run_scraper
from system.hero_pipeline import run_hero_pipeline, run_hero_reranking
from system.augmented_searching import run_augmented_searching
from system.generate_output import process_manifesto_data_with_metadata
from system.ee import run_gpt4_event_extraction
from system.process_time import extract_and_sort_events
import spacy
import subprocess
from huggingface_hub import hf_hub_download
import json
try:
spacy.load("en_core_web_sm")
except OSError:
print("πŸ” Downloading en_core_web_sm model ...")
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"])
nlp = spacy.load("en_core_web_sm")
def count_total_events(output_path):
with open(output_path, "r", encoding="utf-8") as f:
results = json.load(f)
total_events = 0
for result in results:
total_events+= len(result["output"]["events"])
print(f"{total_events} events in total")
return total_events
def run_pipeline(claim, pledge_date, pledge_author, start_date, timestamp, user_id, update_fn=None, suggestion_meta=None):
pipeline_base_dir = f"outputs/{timestamp}_{user_id}"
os.makedirs(pipeline_base_dir, exist_ok=True)
step_id=1
# Step 1: Google 搜紒
if suggestion_meta==None:
print("πŸ” Step 1: Initial searching ...")
initial_tsv_file, claim_json_path = run_initial_searching(
claim_text=f"{pledge_author} : {claim} ({pledge_date})",
# pledge_author=pledge_author,
pipeline_base_dir=pipeline_base_dir,
start_date=start_date,
end_date="",
user_id=user_id,
claim_id=0,
)
with open(initial_tsv_file, "r", encoding="utf-8") as f:
line_count = sum(1 for line in f)
if update_fn:
update_fn(step_id, f"{line_count} URLs are retrieved")
step_id+=1
print("🌐 Step 2: Scraping URLs ...")
initial_data_store_dir = os.path.join(pipeline_base_dir, "initial_data_store")
os.makedirs(initial_data_store_dir, exist_ok=True)
initial_scraped_output_path = os.path.join(initial_data_store_dir, "0.jsonl")
run_scraper(initial_tsv_file, initial_scraped_output_path)
with open(initial_scraped_output_path, "r", encoding="utf-8") as f:
line_count = sum(1 for line in f if json.loads(line)["url2text"] != [])
if update_fn:
update_fn(step_id, f"{line_count} URL pages have been successefully scraped")
step_id+=1
print("🧠 Step 3: HerO processing ...")
hero_output_dir = os.path.join(pipeline_base_dir, "hero")
os.makedirs(hero_output_dir, exist_ok=True)
run_hero_pipeline(pipeline_base_dir)
qa_file_path = os.path.join(hero_output_dir, "manifesto_icl_top_k_qa.json")
with open(qa_file_path, "r", encoding="utf-8") as f:
questions = {line["question"] for line in json.load(f)["evidence"]}
questions = list(questions)
line_count = len(questions)
if update_fn:
update_fn(step_id, f"{line_count} relevant queries are generated, for example:\n"
f"    1. {questions[0]}\n"
f"    2. {questions[1]}\n"
f"    3. {questions[2]}\n"
f"    4. {questions[3]}\n"
f"    5. {questions[4]}")
step_id+=1
else:
claim_json_path = None
initial_scraped_output_path = None
initial_tsv_file = None
hero_output_dir = None
qa_file_path = hf_hub_download(
repo_id="PledgeTracker/demo_feedback",
filename="manifesto_with_QA_icl_top_k_qa.json",
repo_type="dataset",
token=os.environ["HF_TOKEN"]
)
idx = suggestion_meta["index"]
qa_lines = open(f"{qa_file_path}","r").readlines()[idx]
questions = {line["question"] for line in json.loads(qa_lines)["evidence"]}
questions = list(questions)
line_count = len(questions)
if update_fn:
update_fn(step_id, f"relevant queries are generated, for example:\n"
f"    1. {questions[0]}\n"
f"    2. {questions[1]}\n"
f"    3. {questions[2]}\n"
f"    4. {questions[3]}\n"
f"    5. {questions[4]}")
step_id+=1
try:
augmented_tsv_file = run_augmented_searching(
qa_file=qa_file_path,
pledge_author=pledge_author,
pledge_date=pledge_date,
pipeline_base_dir=pipeline_base_dir,
start_date=start_date,
suggestion_meta=suggestion_meta,
end_date="",
)
with open(augmented_tsv_file, "r", encoding="utf-8") as f:
line_count = sum(1 for line in f)
if update_fn:
update_fn(step_id, f"{line_count} URLs are retrieved")
step_id+=1
except Exception as e:
if update_fn:
update_fn(step_id, f"❌ run_augmented_searching failed: {e}")
raise
augmented_data_store_dir = os.path.join(pipeline_base_dir, "augmented_data_store")
os.makedirs(augmented_data_store_dir, exist_ok=True)
try:
augmented_scraped_output_path = os.path.join(augmented_data_store_dir, "0.jsonl")
run_scraper(augmented_tsv_file, augmented_scraped_output_path)
with open(augmented_scraped_output_path, "r", encoding="utf-8") as f:
line_count = sum(1 for line in f if json.loads(line)["url2text"] != [])
if update_fn:
update_fn(step_id, f"{line_count} URL pages have been successefully scraped")
step_id+=1
except Exception as e:
if update_fn:
update_fn(step_id, f"❌ run_scraper failed: {e}")
raise
try:
run_hero_reranking(pipeline_base_dir, suggestion_meta)
meta_data_dir = process_manifesto_data_with_metadata(input_base_dir=pipeline_base_dir)
all_info_path = os.path.join(pipeline_base_dir, "all_info_with_txt.json")
unique_urls = set()
with open(all_info_path, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
docs = data.get("evidence", [])
for doc in docs:
if "url" in doc:
unique_urls.add(doc["url"])
if update_fn:
update_fn(step_id, f"{len(unique_urls)} documents are selected")
step_id+=1
except Exception as e:
if update_fn:
update_fn(step_id, f"❌ run_hero_reranking failed: {e}")
raise
try:
extracted_event_path = run_gpt4_event_extraction(data_dir=pipeline_base_dir, max_tokens=100000)
events_num = count_total_events(extracted_event_path)
if update_fn:
update_fn(step_id, f"{events_num} events are extracted from those documents.")
step_id+=1
except Exception as e:
if update_fn:
update_fn(step_id, f"❌ Event extraction failed: {e}")
raise
print("πŸ“… Sorting events temporally ...")
sorted_events = extract_and_sort_events(
data_dir=pipeline_base_dir,
pledge_date=pledge_date,
pledge_author=pledge_author,
claim=claim,
suggestion_meta=suggestion_meta
)
df = pd.DataFrame(sorted_events)
sorted_event_path = f"{pipeline_base_dir}/sorted_events.xlsx"
df.to_excel(sorted_event_path, index=False)
print(sorted_event_path)
if update_fn:
update_fn(step_id, "All done!")
step_id += 1
return {
"claim_json": claim_json_path,
"initial_scraped_jsonl": initial_scraped_output_path,
"initial_tsv_file": initial_tsv_file,
"hero_dir": hero_output_dir,
"augmented_scraped_jsonl": augmented_scraped_output_path,
"augmented_tsv_file": augmented_tsv_file,
"meta_data_dir": meta_data_dir,
"unsorted_events": extracted_event_path,
"sorted_events": sorted_event_path,
"step_id": step_id
}
if __name__ == "__main__":
start = time.time()
if os.environ.get("HF_TOKEN"):
login(token=os.environ["HF_TOKEN"])
else:
print("No Hugging Face token found in environment variable HF_TOKEN.")
claim = "β€œWe will support families with children by introducing free breakfast clubs in every primary school”"
start_date = "20250504"
timestamp = "xxxxx"
user_id = "xxx"
outputs = run_pipeline(claim, time_start, timestamp, user_id)
print("🎯 Pipeline finished. Outputs:", outputs)
print(f"⏱️ Total time: {time.time() - start:.2f} seconds")