Pledge_Tracker / system /initial_searching.py
yulongchen's picture
Add system
fcd14e1
raw
history blame
3.74 kB
import json
import os
import time
import requests
import pandas as pd
from datetime import datetime
from pathlib import Path
import spacy
import subprocess
try:
nlp = spacy.load("en_core_web_sm")
except OSError:
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True)
nlp = spacy.load("en_core_web_sm")
def clean_keywords(text):
doc = nlp(text)
keywords = []
for chunk in doc.noun_chunks:
words = [token.text for token in chunk if not token.is_stop and token.is_alpha]
if words:
cleaned_phrase = " ".join(words)
if len(cleaned_phrase) > 2:
keywords.append(cleaned_phrase)
return list(set(keywords))
def google_search(query, api_key, search_engine_id, start_date, end_date):
print(f"[SYSTEM] Calling Google Search API for: {query}")
sort = f"date:r:{start_date}:{end_date}"
url = "https://www.googleapis.com/customsearch/v1"
params = {
"q": query,
"key": api_key,
"cx": search_engine_id,
"num": 10,
"sort": sort,
"cr": "countryUK",
"gl": "uk"
}
try:
response = requests.get(url, params=params)
response.raise_for_status()
return response.json().get("items", [])
except Exception as e:
print(f"[ERROR] Google Search Failed: {e}")
return []
def save_tsv(file_path, claim_id, claim_text, url_list):
df = pd.DataFrame({
'ID': [claim_id] * len(url_list),
'String': ["claim"] * len(url_list),
'ListValue': url_list,
'query': [claim_text] * len(url_list)
})
df.to_csv(file_path, sep='\t', index=False, header=False)
def ensure_directory_exists(path):
dir_path = Path(path).expanduser().resolve().parent
if not str(dir_path).startswith("/home") and not str(dir_path).startswith("/data") and not str(dir_path).startswith("outputs"):
raise ValueError(f"[ERROR] Unsafe path: {dir_path}")
dir_path.mkdir(parents=True, exist_ok=True)
def run_initial_searching(claim_text, pipeline_base_dir, start_date, end_date, user_id, claim_id):
api_key = os.environ.get("GOOGLE_API_KEY")
search_engine_id = os.environ.get("GOOGLE_SEARCH_CX")
if not api_key or not search_engine_id:
raise EnvironmentError("[ERROR] GOOGLE_API_KEY and GOOGLE_SEARCH_CX must be set in environment.")
base_dir = pipeline_base_dir
manifesto_json_file = os.path.join(base_dir,"claim.json")
tsv_file_path = os.path.join(base_dir,"initial_search_results.tsv")
ensure_directory_exists(tsv_file_path)
claim_record = {"claim_id": claim_id, "claim": claim_text}
# if manifesto_json_file.exists():
# with open(manifesto_json_file, "r") as f:
# records = json.load(f)
# else:
records = []
records.append(claim_record)
with open(manifesto_json_file, "w") as f:
json.dump(records, f, indent=1)
urls = []
results = google_search(f"{claim_text}", api_key, search_engine_id, start_date, end_date)
urls += [r["link"] for r in results if "link" in r]
keywords = clean_keywords(claim_text)
keyword_text = " ".join(keywords)
# for kw in keywords:
# results = google_search(kw, api_key, search_engine_id, start_date, end_date)
# urls += [r["link"] for r in results if "link" in r]
results = google_search(keyword_text, api_key, search_engine_id, start_date, end_date)
urls += [r["link"] for r in results if "link" in r]
urls = list(dict.fromkeys(urls))
save_tsv(str(tsv_file_path), claim_id, claim_text, urls)
print(f"[SYSTEM] Saved {len(urls)} URLs for claim {claim_id} to {tsv_file_path}")
return str(tsv_file_path), str(manifesto_json_file)