import json import datetime import re import pandas as pd import os, argparse import random import csv from openai import OpenAI from huggingface_hub import hf_hub_download import json import os def gpt_4o_useful(input): client=OpenAI(api_key=os.environ.get("OAI")) response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "user", "content": [ { "type": "text", "text": input } ] } ], response_format={"type": "text"}, temperature=0.0000000001, max_tokens=4096, top_p=0, frequency_penalty=0, presence_penalty=0, logprobs=True ) text = response.choices[0].message.content if response.choices[0].logprobs and response.choices[0].logprobs.content: first_token_logprob = response.choices[0].logprobs.content[0] token = first_token_logprob.token logprob = first_token_logprob.logprob else: token = None logprob = None return text, token, logprob def get_ICL(data, top_k=None): ICL ="" if top_k == None: data = data else: # print(data) data = data[:top_k] for line in data: # line = json.loads(line) pledge = line["pledge"] event = line["event_description"] time = line["event_date"] input=f"Pledge: {pledge}\nEvent Summary: {event} (Event Date: {time})\nIs this event summary useful to track the fulfilment of this pledge" input = input.strip() output = line["label"].strip() ICL = f"{ICL}Input: {input}\nOutput: {output}\n\n" return ICL def load_json(file_path): with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return data def gpt_eval(test_instance, train_data, instruction, suggestion_meta, ICL_id=None): if suggestion_meta: # print(ICL_id) train_data = [line for line in train_data if str(line.get("pledge_id")) == str(ICL_id)] else: random.seed(42) random.shuffle(train_data) ICL = get_ICL(train_data, top_k=50) # print(ICL) input = f"{instruction}\nBelow are examples:\n\n{ICL}Now, please assign a label for the below instance.\nInput: {test_instance}\nOutput:" try: text, tokens, logprobs = gpt_4o_useful(input) except Exception as e: print(e) tokens = None logprobs = None return tokens, logprobs def extract_columns_to_dict(file_path, delimiter='\t'): data_dict = {} with open(file_path, mode='r', encoding='utf-8') as file: reader = csv.reader(file, delimiter=delimiter) for row in reader: if len(row) >= 4: key = row[2] value = row[3] data_dict[key] = value return data_dict import datetime import re def parse_date(date_str): if not date_str: return None, date_str date_str = date_str.strip() # Case 1: YYYY-MM-DD try: return datetime.datetime.strptime(date_str, "%Y-%m-%d"), date_str except ValueError: pass # Case 2: Relative date match = re.search(r'(.*) \(relative to (\d{4}-\d{2}-\d{2})\)', date_str) if match: reference = datetime.datetime.strptime(match.group(2), "%Y-%m-%d") relative_term = match.group(1).strip().lower() if relative_term == "last month": target_date = reference - datetime.timedelta(days=30) elif relative_term == "yesterday": target_date = reference - datetime.timedelta(days=1) elif relative_term == "last week": target_date = reference - datetime.timedelta(days=7) elif relative_term == "this week": target_date = reference else: return None, date_str return target_date, date_str # Case 3: YYYY match = re.fullmatch(r'(\d{4})', date_str) if match: year = int(match.group(1)) return datetime.datetime(year, 1, 1), date_str # Case 4: Month YYYY match = re.fullmatch(r'(\w+) (\d{4})', date_str) if match: try: target_date = datetime.datetime.strptime(date_str, "%B %Y") return target_date, date_str except ValueError: return None, date_str # Case 5: YYYY-QX match = re.fullmatch(r'(\d{4})-Q(\d)', date_str) if match: year, quarter = int(match.group(1)), int(match.group(2)) month = (quarter - 1) * 3 + 1 return datetime.datetime(year, month, 1), date_str # Case 6: YYYY Season match = re.fullmatch(r'(\d{4}) (Spring|Summer|Autumn|Fall|Winter)', date_str, re.IGNORECASE) if match: year = int(match.group(1)) season_map = {"spring": 3, "summer": 6, "autumn": 9, "fall": 9, "winter": 12} month = season_map[match.group(2).lower()] return datetime.datetime(year, month, 1), date_str return None, date_str def extract_and_sort_events(data_dir, pledge_date, pledge_author, claim, suggestion_meta): events = [] # url_path = os.path.join(data_dir, "augmented_search_results.tsv") # url_query_dict = extract_columns_to_dict(file_path=url_path, delimiter='\t') pledge = claim.strip() file_path = os.path.join(data_dir, "gpt4_event_extraction", "gpt4o_results_0_claim.json") gpt4_results_json = load_json(file_path) # print(gpt4_results_json) train_file_path = hf_hub_download( repo_id="PledgeTracker/demo_feedback", filename="train_useful.json", repo_type="dataset", token=os.environ["HF_TOKEN"] ) with open(train_file_path, "r", encoding="utf-8") as f: train_data = json.load(f) # print(train_data[0]) instruction_path = hf_hub_download( repo_id="PledgeTracker/demo_feedback", filename="instruction.txt", repo_type="dataset", token=os.environ["HF_TOKEN"] ) instruction = open(instruction_path, "r").read() map_file_path = hf_hub_download( repo_id="PledgeTracker/demo_feedback", filename="mapping.txt", repo_type="dataset", token=os.environ["HF_TOKEN"] ) mapping_f = open(map_file_path, "r").readlines() mapping = {} for map_id, line in enumerate(mapping_f): mapping[map_id] = int(line.strip()) ICL_id = None if suggestion_meta: try: idx = int(suggestion_meta["index"]) ICL_id = mapping.get(idx) print(f"[Suggestion] index: {idx} → pledge_id: {ICL_id}") except Exception as e: print(f"[Mapping error]: {e}") for doc in gpt4_results_json: mete_date = doc["date"] for event in doc.get("output", {}).get("events", []): parsed_date, original_date = parse_date(event["date"]) if parsed_date: parsed_date_str = parsed_date.strftime("%Y-%m-%d") if parsed_date_str != mete_date: event_date_and_pub_date = f"{parsed_date_str} ({mete_date})" else: event_date_and_pub_date = parsed_date_str test_instance = f"Pledge: {pledge} (Speaker: {pledge_author}; Pledge Date: {pledge_date})\nEvent Summary: {event['event']} (Event Date: {original_date})\nIs this event summary useful to track the fulfilment of this pledge" label, score = gpt_eval(test_instance, train_data, instruction, suggestion_meta, ICL_id=ICL_id) URL = doc["url"] events.append({ "date": original_date, "event date (publication date if different)": event_date_and_pub_date, "event": event["event"], "url": URL, "label": label, "confident": score, }) events.sort(key=lambda x: parse_date(x["date"])[0], reverse=True) return events