Spaces:
Sleeping
Sleeping
import json | |
import datetime | |
import re | |
import pandas as pd | |
import os, argparse | |
import random | |
import csv | |
from openai import OpenAI | |
from huggingface_hub import hf_hub_download | |
import json | |
import os | |
def gpt_4o_useful(input): | |
client=OpenAI(api_key=os.environ.get("OAI")) | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": input | |
} | |
] | |
} | |
], | |
response_format={"type": "text"}, | |
temperature=0.0000000001, | |
max_tokens=4096, | |
top_p=0, | |
frequency_penalty=0, | |
presence_penalty=0, | |
logprobs=True | |
) | |
text = response.choices[0].message.content | |
if response.choices[0].logprobs and response.choices[0].logprobs.content: | |
first_token_logprob = response.choices[0].logprobs.content[0] | |
token = first_token_logprob.token | |
logprob = first_token_logprob.logprob | |
else: | |
token = None | |
logprob = None | |
return text, token, logprob | |
def get_ICL(data, top_k=None): | |
ICL ="" | |
if top_k == None: | |
data = data | |
else: | |
# print(data) | |
data = data[:top_k] | |
for line in data: | |
# line = json.loads(line) | |
pledge = line["pledge"] | |
event = line["event_description"] | |
time = line["event_date"] | |
input=f"Pledge: {pledge}\nEvent Summary: {event} (Event Date: {time})\nIs this event summary useful to track the fulfilment of this pledge" | |
input = input.strip() | |
output = line["label"].strip() | |
ICL = f"{ICL}Input: {input}\nOutput: {output}\n\n" | |
return ICL | |
def load_json(file_path): | |
with open(file_path, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
return data | |
def gpt_eval(test_instance, train_data, instruction, suggestion_meta, ICL_id=None): | |
if suggestion_meta: | |
# print(ICL_id) | |
train_data = [line for line in train_data if str(line.get("pledge_id")) == str(ICL_id)] | |
else: | |
random.seed(42) | |
random.shuffle(train_data) | |
ICL = get_ICL(train_data, top_k=50) | |
# print(ICL) | |
input = f"{instruction}\nBelow are examples:\n\n{ICL}Now, please assign a label for the below instance.\nInput: {test_instance}\nOutput:" | |
try: | |
text, tokens, logprobs = gpt_4o_useful(input) | |
except Exception as e: | |
print(e) | |
tokens = None | |
logprobs = None | |
return tokens, logprobs | |
def extract_columns_to_dict(file_path, delimiter='\t'): | |
data_dict = {} | |
with open(file_path, mode='r', encoding='utf-8') as file: | |
reader = csv.reader(file, delimiter=delimiter) | |
for row in reader: | |
if len(row) >= 4: | |
key = row[2] | |
value = row[3] | |
data_dict[key] = value | |
return data_dict | |
import datetime | |
import re | |
def parse_date(date_str): | |
if not date_str: | |
return None, date_str | |
date_str = date_str.strip() | |
# Case 1: YYYY-MM-DD | |
try: | |
return datetime.datetime.strptime(date_str, "%Y-%m-%d"), date_str | |
except ValueError: | |
pass | |
# Case 2: Relative date | |
match = re.search(r'(.*) \(relative to (\d{4}-\d{2}-\d{2})\)', date_str) | |
if match: | |
reference = datetime.datetime.strptime(match.group(2), "%Y-%m-%d") | |
relative_term = match.group(1).strip().lower() | |
if relative_term == "last month": | |
target_date = reference - datetime.timedelta(days=30) | |
elif relative_term == "yesterday": | |
target_date = reference - datetime.timedelta(days=1) | |
elif relative_term == "last week": | |
target_date = reference - datetime.timedelta(days=7) | |
elif relative_term == "this week": | |
target_date = reference | |
else: | |
return None, date_str | |
return target_date, date_str | |
# Case 3: YYYY | |
match = re.fullmatch(r'(\d{4})', date_str) | |
if match: | |
year = int(match.group(1)) | |
return datetime.datetime(year, 1, 1), date_str | |
# Case 4: Month YYYY | |
match = re.fullmatch(r'(\w+) (\d{4})', date_str) | |
if match: | |
try: | |
target_date = datetime.datetime.strptime(date_str, "%B %Y") | |
return target_date, date_str | |
except ValueError: | |
return None, date_str | |
# Case 5: YYYY-QX | |
match = re.fullmatch(r'(\d{4})-Q(\d)', date_str) | |
if match: | |
year, quarter = int(match.group(1)), int(match.group(2)) | |
month = (quarter - 1) * 3 + 1 | |
return datetime.datetime(year, month, 1), date_str | |
# Case 6: YYYY Season | |
match = re.fullmatch(r'(\d{4}) (Spring|Summer|Autumn|Fall|Winter)', date_str, re.IGNORECASE) | |
if match: | |
year = int(match.group(1)) | |
season_map = {"spring": 3, "summer": 6, "autumn": 9, "fall": 9, "winter": 12} | |
month = season_map[match.group(2).lower()] | |
return datetime.datetime(year, month, 1), date_str | |
return None, date_str | |
def extract_and_sort_events(data_dir, pledge_date, pledge_author, claim, suggestion_meta): | |
events = [] | |
# url_path = os.path.join(data_dir, "augmented_search_results.tsv") | |
# url_query_dict = extract_columns_to_dict(file_path=url_path, delimiter='\t') | |
pledge = claim.strip() | |
file_path = os.path.join(data_dir, "gpt4_event_extraction", "gpt4o_results_0_claim.json") | |
gpt4_results_json = load_json(file_path) | |
# print(gpt4_results_json) | |
train_file_path = hf_hub_download( | |
repo_id="PledgeTracker/demo_feedback", | |
filename="train_useful.json", | |
repo_type="dataset", | |
token=os.environ["HF_TOKEN"] | |
) | |
with open(train_file_path, "r", encoding="utf-8") as f: | |
train_data = json.load(f) | |
# print(train_data[0]) | |
instruction_path = hf_hub_download( | |
repo_id="PledgeTracker/demo_feedback", | |
filename="instruction.txt", | |
repo_type="dataset", | |
token=os.environ["HF_TOKEN"] | |
) | |
instruction = open(instruction_path, "r").read() | |
map_file_path = hf_hub_download( | |
repo_id="PledgeTracker/demo_feedback", | |
filename="mapping.txt", | |
repo_type="dataset", | |
token=os.environ["HF_TOKEN"] | |
) | |
mapping_f = open(map_file_path, "r").readlines() | |
mapping = {} | |
for map_id, line in enumerate(mapping_f): | |
mapping[map_id] = int(line.strip()) | |
ICL_id = None | |
if suggestion_meta: | |
try: | |
idx = int(suggestion_meta["index"]) | |
ICL_id = mapping.get(idx) | |
print(f"[Suggestion] index: {idx} → pledge_id: {ICL_id}") | |
except Exception as e: | |
print(f"[Mapping error]: {e}") | |
for doc in gpt4_results_json: | |
mete_date = doc["date"] | |
for event in doc.get("output", {}).get("events", []): | |
parsed_date, original_date = parse_date(event["date"]) | |
if parsed_date: | |
parsed_date_str = parsed_date.strftime("%Y-%m-%d") | |
if parsed_date_str != mete_date: | |
event_date_and_pub_date = f"{parsed_date_str} ({mete_date})" | |
else: | |
event_date_and_pub_date = parsed_date_str | |
test_instance = f"Pledge: {pledge} (Speaker: {pledge_author}; Pledge Date: {pledge_date})\nEvent Summary: {event['event']} (Event Date: {original_date})\nIs this event summary useful to track the fulfilment of this pledge" | |
label, score = gpt_eval(test_instance, train_data, instruction, suggestion_meta, ICL_id=ICL_id) | |
URL = doc["url"] | |
events.append({ | |
"date": original_date, | |
"event date (publication date if different)": event_date_and_pub_date, | |
"event": event["event"], | |
"url": URL, | |
"label": label, | |
"confident": score, | |
}) | |
events.sort(key=lambda x: parse_date(x["date"])[0], reverse=True) | |
return events | |