Spaces:
Sleeping
Sleeping
import json | |
import os | |
import time | |
import requests | |
import pandas as pd | |
from datetime import datetime | |
from pathlib import Path | |
import spacy | |
import subprocess | |
try: | |
nlp = spacy.load("en_core_web_sm") | |
except OSError: | |
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"], check=True) | |
nlp = spacy.load("en_core_web_sm") | |
def clean_keywords(text): | |
doc = nlp(text) | |
keywords = [] | |
for chunk in doc.noun_chunks: | |
words = [token.text for token in chunk if not token.is_stop and token.is_alpha] | |
if words: | |
cleaned_phrase = " ".join(words) | |
if len(cleaned_phrase) > 2: | |
keywords.append(cleaned_phrase) | |
return list(set(keywords)) | |
def google_search(query, api_key, search_engine_id, start_date, end_date): | |
print(f"[SYSTEM] Calling Google Search API for: {query}") | |
sort = f"date:r:{start_date}:{end_date}" | |
url = "https://www.googleapis.com/customsearch/v1" | |
params = { | |
"q": query, | |
"key": api_key, | |
"cx": search_engine_id, | |
"num": 10, | |
"sort": sort, | |
"cr": "countryUK", | |
"gl": "uk" | |
} | |
try: | |
response = requests.get(url, params=params) | |
response.raise_for_status() | |
return response.json().get("items", []) | |
except Exception as e: | |
print(f"[ERROR] Google Search Failed: {e}") | |
return [] | |
def save_tsv(file_path, claim_id, claim_text, url_list): | |
df = pd.DataFrame({ | |
'ID': [claim_id] * len(url_list), | |
'String': ["claim"] * len(url_list), | |
'ListValue': url_list, | |
'query': [claim_text] * len(url_list) | |
}) | |
df.to_csv(file_path, sep='\t', index=False, header=False) | |
def ensure_directory_exists(path): | |
dir_path = Path(path).expanduser().resolve().parent | |
if not str(dir_path).startswith("/home") and not str(dir_path).startswith("/data") and not str(dir_path).startswith("outputs"): | |
raise ValueError(f"[ERROR] Unsafe path: {dir_path}") | |
dir_path.mkdir(parents=True, exist_ok=True) | |
def run_initial_searching(claim_text, pipeline_base_dir, start_date, end_date, user_id, claim_id): | |
api_key = os.environ.get("GOOGLE_API_KEY") | |
search_engine_id = os.environ.get("GOOGLE_SEARCH_CX") | |
if not api_key or not search_engine_id: | |
raise EnvironmentError("[ERROR] GOOGLE_API_KEY and GOOGLE_SEARCH_CX must be set in environment.") | |
base_dir = pipeline_base_dir | |
manifesto_json_file = os.path.join(base_dir,"claim.json") | |
tsv_file_path = os.path.join(base_dir,"initial_search_results.tsv") | |
ensure_directory_exists(tsv_file_path) | |
claim_record = {"claim_id": claim_id, "claim": claim_text} | |
# if manifesto_json_file.exists(): | |
# with open(manifesto_json_file, "r") as f: | |
# records = json.load(f) | |
# else: | |
records = [] | |
records.append(claim_record) | |
with open(manifesto_json_file, "w") as f: | |
json.dump(records, f, indent=1) | |
urls = [] | |
results = google_search(f"{claim_text}", api_key, search_engine_id, start_date, end_date) | |
urls += [r["link"] for r in results if "link" in r] | |
keywords = clean_keywords(claim_text) | |
keyword_text = " ".join(keywords) | |
# for kw in keywords: | |
# results = google_search(kw, api_key, search_engine_id, start_date, end_date) | |
# urls += [r["link"] for r in results if "link" in r] | |
results = google_search(keyword_text, api_key, search_engine_id, start_date, end_date) | |
urls += [r["link"] for r in results if "link" in r] | |
urls = list(dict.fromkeys(urls)) | |
save_tsv(str(tsv_file_path), claim_id, claim_text, urls) | |
print(f"[SYSTEM] Saved {len(urls)} URLs for claim {claim_id} to {tsv_file_path}") | |
return str(tsv_file_path), str(manifesto_json_file) | |