import os import time from datetime import datetime import logging from pathlib import Path import requests import json import numpy as np import pandas as pd import spacy from sentence_transformers import CrossEncoder import litellm # from litellm import completion from tqdm import tqdm from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, AutoConfig, pipeline # from accelerate import PartialState # from accelerate.inference import prepare_pippy import torch import cohere from openai import OpenAI # import google import google.generativeai as genai import src.backend.util as util import src.envs as envs # litellm.set_verbose=False litellm.set_verbose=True # Set up basic configuration for logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Load spacy model for word tokenization nlp = spacy.load("en_core_web_sm") os.environ["HUGGINGFACE_API_KEY"] = envs.TOKEN os.environ["OPENAI_API_KEY"] = "sk-None-tanhMyavhUtpX2G1kmPuT3BlbkFJGEhM5jmyGyhrTd3LdHDI" def load_evaluation_model(model_path): """Load the evaluation model from the given path Args: model_path (str): Path to the evaluation model Returns: CrossEncoder: The evaluation model """ model = CrossEncoder(model_path) return model class ModelLoadingException(Exception): """Exception raised for errors in loading a model. Attributes: model_id (str): The model identifier. revision (str): The model revision. """ def __init__(self, model_id, revision, messages="Error initializing model"): self.model_id = model_id self.revision = revision super().__init__(f"{messages} id={model_id} revision={revision}") class SummaryGenerator: """A class to generate summaries using a causal language model. Attributes: model (str): huggingface/{model_id} api_base (str): https://api-inference.huggingface.co/models/{model_id} summaries_df (DataFrame): DataFrame to store generated summaries. revision (str): Model revision. avg_length (float): Average length of summaries. answer_rate (float): Rate of non-empty summaries. """ def __init__(self, model_id, revision): """ Initializes the SummaryGenerator with a model. Args: model_id (str): Identifier for the model. revision (str): Revision of the model. """ self.model_id = model_id self.model = f"huggingface/{model_id}" self.api_base = f"https://api-inference.huggingface.co/models/{model_id}" self.summaries_df = pd.DataFrame() self.revision = revision self.avg_length = None self.answer_rate = None self.exceptions = None self.local_model = None def generate_summaries(self, dataset, df_prompt, save_path=None): """Generate summaries for a given DataFrame of source docs. 修改这里拉取模型生成结果 Args: df (DataFrame): DataFrame containing source docs. Returns: summaries_df (DataFrame): Generated summaries by the model. """ exceptions = [] if (save_path is not None) and os.path.exists(save_path): '''已存在文件,可以读取已经存在的测试文本''' self.summaries_df = pd.read_csv(save_path) # print(self.summaries_df['Experiment']) print(f'Loaded generated summaries from {save_path}') else: '''测试文件不存在,则需要调用指定的模型来进行测试''' # prompt = {} # for index, row in tqdm(df_prompt.iterrows(), total=df_prompt.shape[0]): # prompt['E' + row['Item']] = row['Prompt'] xls = pd.ExcelFile(dataset) sheet_names = xls.sheet_names # sheet_names = df.sheetnames print(f"Total: {len(sheet_names)}") print(sheet_names) item_ID, questions_ID, user_prompt, response = [], [], [], [] for i, sheet_name in enumerate(sheet_names[0:1], start=1): # 读取每个工作表 df_sheet = pd.read_excel(xls, sheet_name=sheet_name) # 假设第一列是'Prompt0',但这里我们使用列名来避免硬编码 if 'Prompt0' in df_sheet.columns: prompt_column = df_sheet['Prompt0'] else: # 如果'Prompt0'列不存在,则跳过该工作表或进行其他处理 continue # 遍历Prompt0列的值 for j, prompt_value in enumerate(tqdm(prompt_column, desc=f"Processing {sheet_name}"), start=1): ID = 'E' + str(i) q_ID = ID + '_' + str(j) # print(ID, q_ID, prompt_value) for i in range(2): system_prompt = envs.SYSTEM_PROMPT # user_prompt = f"{envs.USER_PROMPT}\nPassage:\n{_source}" _user_prompt = prompt_value while True: try: '''调用''' print('开始调用LLM-API') _response = self.generate_summary(system_prompt, _user_prompt) # print(f"Finish index {index}") break except Exception as e: if 'Rate limit reached' in str(e): wait_time = 3660 current_time = datetime.now().strftime('%H:%M:%S') print(f"Rate limit hit at {current_time}. Waiting for 1 hour before retrying...") time.sleep(wait_time) elif 'is currently loading' in str(e): wait_time = 200 print(f"Model is loading, wait for {wait_time}") time.sleep(wait_time) elif '429 Resource has been exhausted' in str(e): # for gemini models wait_time = 60 print(f"Quota has reached, wait for {wait_time}") time.sleep(wait_time) else: print(f"Error at index {i}: {e}") _response = "" exceptions.append(i) break item_ID.append(ID) questions_ID.append(q_ID) user_prompt.append(_user_prompt) response.append(_response) print(_response) # exit() # Sleep to prevent hitting rate limits too frequently time.sleep(1) self.summaries_df = pd.DataFrame(list(zip(item_ID, questions_ID, user_prompt, response)), columns=["Experiment", "Question_ID", "User_prompt", "Response"]) if save_path is not None: print(f'Save summaries to {save_path}') fpath = Path(save_path) fpath.parent.mkdir(parents=True, exist_ok=True) self.summaries_df.to_csv(fpath) self.exceptions = exceptions # self._compute_avg_length() # self._compute_answer_rate() return self.summaries_df def generate_summary(self, system_prompt: str, user_prompt: str): # Using Together AI API using_together_api = False together_ai_api_models = ['mixtral', 'dbrx', 'wizardlm', 'llama-3'] for together_ai_api_model in together_ai_api_models: if together_ai_api_model in self.model_id.lower(): using_together_api = True break # print('适用哪一种LLM',together_ai_api_model , using_together_api) # print(self.model_id.lower()) #meta-llama/llama-2-7b-chat-hf # print('local',self.local_model) $None # exit() # if 'mixtral' in self.model_id.lower() or 'dbrx' in self.model_id.lower() or 'wizardlm' in self.model_id.lower(): # For mixtral and dbrx models, use Together AI API if using_together_api: # suffix = "completions" if ('mixtral' in self.model_id.lower() or 'base' in self.model_id.lower()) else "chat/completions" suffix = "chat/completions" url = f"https://api.together.xyz/v1/{suffix}" payload = { "model": self.model_id, # "max_tokens": 4096, 'max_new_tokens': 250, "temperature": 0.0, # 'repetition_penalty': 1.1 if 'mixtral' in self.model_id.lower() else 1 } # if 'mixtral' in self.model_id.lower(): # # payload['prompt'] = user_prompt # # payload['prompt'] = "Write a summary of the following passage:\nPassage:\n" + user_prompt.split('Passage:\n')[-1] + '\n\nSummary:' # payload['prompt'] = 'You must stick to the passage provided. Provide a concise summary of the following passage, covering the core pieces of information described:\nPassage:\n' + user_prompt.split('Passage:\n')[-1] + '\n\nSummary:' # print(payload) # else: # payload['messages'] = [{"role": "system", "content": system_prompt}, # {"role": "user", "content": user_prompt}] payload['messages'] = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] headers = { "accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {os.environ['TOGETHER_API_KEY']}" } response = requests.post(url, json=payload, headers=headers) try: result = json.loads(response.text) # print(result) result = result["choices"][0] if 'message' in result: result = result["message"]["content"].strip() else: result = result["text"] result_candidates = [result_cancdidate for result_cancdidate in result.split('\n\n') if len(result_cancdidate) > 0] result = result_candidates[0] print(result) except: print(response) result = '' print(result) return result # Using OpenAI API elif 'gpt' in self.model_id.lower(): response = litellm.completion( model=self.model_id.replace('openai/',''), messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], temperature=0.0, max_tokens=250, ) result = response['choices'][0]['message']['content'] print(result) return result # Using Google AI API for Gemini models elif 'gemini' in self.model_id.lower(): genai.configure(api_key=os.getenv('GOOGLE_AI_API_KEY')) generation_config = { "temperature": 0, "top_p": 0.95, # cannot change "top_k": 0, "max_output_tokens": 250, # "response_mime_type": "application/json", } safety_settings = [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE" }, ] model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest" if "gemini-1.5-pro" in self.model_id.lower() else self.model_id.lower().split('google/')[-1], generation_config=generation_config, system_instruction=system_prompt, safety_settings=safety_settings) convo = model.start_chat(history=[]) convo.send_message(user_prompt) # print(convo.last) result = convo.last.text print(result) return result # Using HF API or download checkpoints elif self.local_model is None: # print(self.model_id) # exit() try: # try use HuggingFace API response = litellm.completion( model='command-r-plus' if 'command' in self.model_id else self.model_id, messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}], temperature=0.0, max_tokens=1024, api_base=self.api_base, ) result = response['choices'][0]['message']['content'] print(result) return result # exit() except: # fail to call api. run it locally. self.tokenizer = AutoTokenizer.from_pretrained(self.model_id, trust_remote_code=True) print("Tokenizer loaded") self.local_model = AutoModelForCausalLM.from_pretrained(self.model_id, trust_remote_code=True, device_map="auto", torch_dtype="auto", cache_dir='/home/paperspace/cache') print("Local model loaded") # exit() # Using local model if self.local_model: # cannot call API. using local model messages=[ {"role": "system", "content": system_prompt}, # gemma-1.1 does not accept system role {"role": "user", "content": user_prompt} ] try: # some models support pipeline pipe = pipeline( "text-generation", model=self.local_model, tokenizer=self.tokenizer, ) generation_args = { "max_new_tokens": 250, "return_full_text": False, "temperature": 0.0, "do_sample": False, } output = pipe(messages, **generation_args) result = output[0]['generated_text'] print(result) except: prompt = self.tokenizer.apply_chat_template(messages,add_generation_prompt=True, tokenize=False) print(prompt) input_ids = self.tokenizer(prompt, return_tensors="pt").to('cuda') with torch.no_grad(): outputs = self.local_model.generate(**input_ids, max_new_tokens=250, do_sample=True, temperature=0.01, pad_token_id=self.tokenizer.eos_token_id) result = self.tokenizer.decode(outputs[0], skip_special_tokens=True) result = result.replace(prompt[0], '') print(result) return result def _compute_avg_length(self): """ Compute the average length of non-empty summaries using SpaCy. """ total_word_count = 0 total_count = 0 for summary in self.summaries_df['summary']: if util.is_summary_valid(summary): doc = nlp(summary) words = [token.text for token in doc if token.is_alpha] total_word_count += len(words) total_count += 1 self.avg_length = 0 if total_count == 0 else total_word_count / total_count def _compute_answer_rate(self): """ Compute the rate of non-empty summaries. """ valid_count = sum(1 for summary in self.summaries_df['summary'] if util.is_summary_valid(summary)) total_count = len(self.summaries_df) self.answer_rate = 0 if total_count == 0 else valid_count / total_count class EvaluationModel: """A class to evaluate generated summaries. Attributes: model (CrossEncoder): The evaluation model. scores (list): List of evaluation scores. accuracy (float): Accuracy of the summaries. hallucination_rate (float): Rate of hallucination in summaries. """ def __init__(self, model_path): """ Initializes the EvaluationModel with a CrossEncoder model. Args: model_path (str): Path to the CrossEncoder model. """ self.model = load_evaluation_model(model_path) self.scores = [] self.factual_consistency_rate = None self.hallucination_rate = None self.humanlike_score = None def code_results(self, summaries_df): '''code results from LLM's response''' output = [] '''item1''' # print(len(summaries_df['Experiment']),len(summaries_df['Response'])) # exit() '''人类数据需要处理Item3''' item3 = pd.read_csv('/Users/tangtang/Desktop/leaderboard/src/datasets/Experiment_3_Items.csv') item2word = {} for j in range(len(item3['Item'])): item2word[item3['Item'][j]] = [item3['Field 2'][j], item3['Field 3'][j]] male_keyword = ["he", "his", "himself"] female_keyword = ["she", "her", "herself"] for i in range(len(summaries_df['Experiment'])): # vote_1_1, vote_1_2, vote_1_3 = 0, 0, 0 if summaries_df["Experiment"][i] == "E1": if summaries_df["Response"][i].strip() == "Round": # vote_1_1 += 1 output.append("Round") elif summaries_df["Response"][i].strip() == "Spiky": output.append("Round") else: output.append("NA") # print() '''item2''' # vote_2_1, vote_2_2, vote_2_3 = 0, 0, 0 if summaries_df["Experiment"][i] == "E2": rs = summaries_df["Response"][i].strip() rs = rs.split(' ') male, female = 0, 0 for word in rs: if word in female_keyword and male != 1: female = 1 output.append("Female") break if word in male_keyword and female != 1: male = 1 output.append("Male") break if male == 0 and female == 0 : output.append("NA") '''item3''' if summaries_df["Experiment"][i] == "E3": rs = summaries_df["Response"][i].strip() id = summaries_df["Item"][i].strip() if '2' in rs: item2word[id][0] '''item4''' '''item5''' '''item6''' '''item7''' if summaries_df["Experiment"][i] == "E7": rs = summaries_df["Response"][i].strip() if rs == "No": output.append("0") elif rs == "Yes": output.append("1") else: output.append("NA") '''item8''' if summaries_df["Experiment"][i] == "E8": rs = summaries_df["Response"][i].strip() if rs == "Something is wrong with the question": output.append("1") else: output.append("0") '''item9''' if summaries_df["Experiment"][i] == "E9": male, female = 0, 0 rs = summaries_df["Response"][i].strip() if "because" in rs: rs = rs.split("because")[1] else: rs = rs condition = summaries_df["Factor 2"][i].strip() rs = rs.split(" ") for w in rs: if w in male_keyword and female != 1: male = 1 break if w in female_keyword and male != 1: break if male == 0 and female == 0: output.append('NA') else: if male == 1 and female==0: if condition == "MF": output.append("Subject") elif condition == "FM": output.append("Object") else: output.append("NA") elif female == 1 and male ==0: if condition == "MF": output.append("Object") elif condition == "FM": output.append("Subject") else: output.append("NA") '''item10''' if summaries_df["Experiment"][i] == "E10": rs = summaries_df["Response"][i].strip() if rs == "Yes": output.append("1") else: output.append("0") '''是不是有不同的问题,如何计算''' def evaluate_humanlike(self, summaries_df, human_data_path): ''' evaluate humanlike score 1. code the result 2. comput the similaritirs between human and model process model responses''' huamn_df = pd.read_csv(human_data_path) self.code_results(summaries_df) return 9.00 def evaluate_hallucination(self, summaries_df): """ Evaluate the hallucination rate in summaries. Updates the 'scores' attribute of the instance with the computed scores. Args: summaries_df (DataFrame): DataFrame containing source docs and summaries. Returns: list: List of hallucination scores. Also updates the 'scores' attribute of the instance. """ hem_scores = [] sources = [] summaries = [] source_summary_pairs = util.create_pairs(summaries_df) '''评价模型结果''' for doc, summary in tqdm(source_summary_pairs, desc="Evaluating Humanlikeness"): if util.is_summary_valid(summary): try: summary = summary.replace('','').replace('','') score = self.model.predict([doc, summary])# [0] if not isinstance(score, float): try: score = score.item() except: logging.warning(f"Score type mismatch: Expected float, got {type(score)}.") continue hem_scores.append(score) sources.append(doc) summaries.append(summary) except Exception as e: logging.error(f"Error while running HEM: {e}") raise self.scores = hem_scores eval_results = {'source': sources, 'summary': summaries, 'HEM scores': hem_scores} return hem_scores, eval_results # for doc, summary in tqdm(source_summary_pairs, desc="Evaluating hallucinations"): # if util.is_summary_valid(summary): # try: # # summary_pieces = summary.split('\n') # # summary = summary_pieces[0] if len(summary_pieces[0].strip()) > 0 else summary_pieces[1] # summary = summary.replace('','').replace('','') # # print([doc, summary]) # # print(self.model.predict([doc, summary])) # score = self.model.predict([doc, summary])# [0] # if not isinstance(score, float): # try: # score = score.item() # except: # logging.warning(f"Score type mismatch: Expected float, got {type(score)}.") # continue # hem_scores.append(score) # sources.append(doc) # summaries.append(summary) # except Exception as e: # logging.error(f"Error while running HEM: {e}") # raise # self.scores = hem_scores # eval_results = {'source': sources, 'summary': summaries, 'HEM scores': hem_scores} # return hem_scores, eval_results def compute_factual_consistency_rate(self, threshold=0.5): """ Compute the factual consistency rate of the evaluated summaries based on the previously calculated scores. This method relies on the 'scores' attribute being populated, typically via the 'evaluate_hallucination' method. Returns: float: Factual Consistency Rate. Also updates the 'factual_consistency_rate' and 'hallucination_rate' attributes of the instance. Raises: ValueError: If scores have not been calculated prior to calling this method. """ if not self.scores: error_msg = "Scores not calculated. Call evaluate_hallucination() first." logging.error(error_msg) raise ValueError(error_msg) # Use threshold of 0.5 to compute factual_consistency_rate num_above_threshold = sum(score >= threshold for score in self.scores) num_total = len(self.scores) if not num_total: raise ValueError("No scores available to compute factual consistency rate.") self.factual_consistency_rate = (num_above_threshold / num_total) * 100 self.hallucination_rate = 100 - self.factual_consistency_rate return self.factual_consistency_rate