"""Shared utility functions used in the project. Functions: """ import os import datetime import getpass import base64 import json import re from dotenv import load_dotenv, find_dotenv import markdown from markdownify import markdownify as md2text from io import BytesIO import pandas as pd from pydantic import BaseModel, SecretStr from langchain_community.vectorstores import Chroma from langchain import hub from langchain_core.prompts import PromptTemplate #Model & Index & Embeddings from langchain_openai import ( ChatOpenAI, ) from langchain_core.output_parsers import ( PydanticOutputParser, ) from langchain_core.messages import ( AIMessage, HumanMessage, get_buffer_string, ) from PIL import Image from ea4all.src.shared.prompts import ( LLAMA31_CHAT_PROMPT_FORMAT, ) from ea4all.src.shared.configuration import BaseConfiguration as ea4all_config ############ ##INIT model ############ #initialise model / constants EA4ALL_ARCHITECTURE = "ea4all_architecture.png" EA4ALL_PODCAST = "ea4all_podcast.wav" class CFG: # Constants EA4ALL_OVERVIEW = "ea4all_overview.png" EA4ALL_ABOUT = "ea4all_overview.txt" APM_MOCK_QNA = "apm_qna_mock.txt" STREAM_SLEEP = 0.05 REGEX_BACKTICKS = r"```(.*?)```" # LLMs #model = {"gpt-4":'gpt-4o-mini', "gpt-4o":'gpt-4o'} #llama = {"11": "meta-llama/llama-3.2-11B-Vision-Instruct", "90":"meta-llama/llama-3.2-90B-Vision-Instruct", "70":"meta-llama/Llama-3.1-70B-Instruct", "73":"meta-llama/Llama-3.3-70B-Instruct"} #hf_model="meta-llama/Llama-3.1-70B-Instruct" #hf_api_base="https://api-inference.huggingface.co/models/" #hf_max_tokens=16192 #max_new_tokens = 4096 #llama32_max_tokens = 4096 ##TOKEN ISSUE LLAMA-3.2 w/ ChatOpenAI not working tokens > 4096 2024-10-13 #temperature = 0 top_p = 0.95 repetition_penalty = 1.15 # splitting split_chunk_size = 500 split_overlap = 0 # embeddings #embeddings_model = OpenAIEmbeddings() # similar passages k = 3 #debug verbose=True #streamming #streamming=True #VQA resized images - maximum resolution for Llama-3.2 RESIZE_TO = 512 MAX_WIDTH = 1024 MAX_HEIGHT = 768 ##Diagrams format diagram_format = "png" # paths ea4all/src/tools #apm_store = "/Users/avfranco/Documents/GitHub/ea4all-agentic-staging/ea4all/apm_store/" #apm_path = apm_store + 'APM-ea4all (test-split).xlsx' #dbr_demo = apm_store + "reference_architecture_dbr_demo.txt" #'ea4all_images = "/Users/avfranco/Documents/GitHub/ea4all-agentic-staging/ea4all/images/" #apm_faiss = "apm_store" #faiss_index = 'apm_faiss_index' ################################### ##COLLECTION of re-usable functions ################################### #return current date-time def _get_datetime(): now = datetime.datetime.now() return now.strftime("%m/%d/%Y, %H:%M:%S") def _get_formatted_date(): current_date = datetime.datetime.now() formatted_date = current_date.strftime("%d %B %Y") return formatted_date #calculate dif end-start execution def time_elapsed(start,end): time_elapsed = int(round(end - start, 0)) time_elapsed_str = f'{time_elapsed}' return time_elapsed_str def _join_paths(*paths): """ Join two or more paths using os.path.join. Parameters: *paths: str Two or more path components to be joined. Returns: str The joined path. """ return os.path.join(*paths) #get user request info def get_user_identification(request): if request: try: user_pip = request.headers.get('X-Forwarded-For') return user_pip.split(',')[0] except Exception: print(f"user info: {request}") return request.client.host return "ea4all_agent" #Initialise model ## SETUP LLM CLIENT def get_llm_client(model, api_base_url=None,temperature=0, streaming=False, tokens=ea4all_config.max_tokens) -> ChatOpenAI: """Initializes and returns a ChatOpenAI client based on the specified model and parameters.""" client = ChatOpenAI() if model.startswith("gpt-"): client = ChatOpenAI( model=model, temperature=temperature, streaming=streaming, max_completion_tokens=tokens, stream_usage=True ) elif "llama" in model.lower(): # Meta-llama models client = ChatOpenAI( model=model, api_key=SecretStr(os.environ['HUGGINGFACEHUB_API_TOKEN']), base_url=_join_paths(api_base_url, model, "v1/"), temperature=temperature, streaming=streaming, max_completion_tokens=tokens, stream_usage=True, ) return client #load local env variables def load_local_env(local): ###read local .env file _ = load_dotenv(find_dotenv()) if local not in os.environ: os.environ[local] = getpass.getpass(f"Provide your {local} Key") return os.environ[local] #locad landscape data into chroma def load_to_chroma(documents, embeddings, path, collection_name="apm_collection"): #Read chromadb chroma-apm-db chroma_collection = Chroma ( collection_name=collection_name, persist_directory=path, embedding_function=embeddings ) if chroma_collection._collection.count(): chroma_collection.delete_collection() else: #Add apm records chroma_collection = Chroma.from_documents( collection_name=collection_name, persist_directory=path, documents=documents, embedding=embeddings ) chroma_collection.persist() return chroma_collection ##Convert gradio chat_history to langchain chat_history_format def get_history_gradio(history, chat_history=[]): history_langchain_format = [] #triggered by loaded memory runnable to replace ConversationMemoryBuffer.load_memory_variables #if chat_history or not history: # memory = chat_history #triggered by loaded_memory runnable #else: history = history["chat_memory"] for human, ai in history: history_langchain_format.append(HumanMessage(content=human)) history_langchain_format.append(AIMessage(content=ai)) history = {"history":get_buffer_string(history_langchain_format)} return history #retrieve relevant questions based on user interaction def get_vqa_examples() -> list: examples=[ {"text": "Describe this image.", "files": ["ea4all/images/multi-app-architecture.png"]}, {"text": "Assess any risk and vulnerabilities in the current solution.", "files": ["ea4all/images/ea4all_architecture.png"]}, ] return examples # Function to encode the image def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def resize_image_1(raw_image, input_size): w, h = raw_image.size scale = input_size / max(w, h) new_w = int(w * scale) new_h = int(h * scale) resized_image = raw_image.resize((new_w, new_h)) return resized_image def resize_image_2(image, width): wpercent = width / float(image.size[0]) hsize = int( float(image.size[1]) * wpercent ) raw_image = image.resize([width, hsize]) return raw_image def resize_image_3(image): from PIL import Image # Get the current size width, height = image.size # Calculate the new size maintaining the aspect ratio if width > CFG.MAX_WIDTH or height > CFG.MAX_HEIGHT: ratio = min(CFG.MAX_WIDTH / width, CFG.MAX_HEIGHT / height) new_width = int(width * ratio) new_height = int(height * ratio) else: new_width, new_height = width, height # Resize the image image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) # Return new resized image return image #Encode PIL.Image to base64 def encode_raw_image(raw_image): # Create a BytesIO buffer buffer = BytesIO() # Save the image to the buffer in PNG format raw_image.save(buffer, format='PNG') # Get the content of the buffer img_bytes = buffer.getvalue() # Encode the bytes to base64 img_base64 = base64.b64encode(img_bytes) # Convert the bytes to string img_str = img_base64.decode('utf-8') return img_str #Return a raw image ready to OpenAI GPT4-Vision def get_raw_image(image_path): # Open & Resize & Encode image diagram = Image.open(image_path) w, h = diagram.size if w > CFG.RESIZE_TO or h > CFG.RESIZE_TO: resized_image = resize_image_3(diagram) else: resized_image = diagram #Encode diagram raw_image = encode_raw_image(resized_image) return raw_image def load_mock_content(file_path) -> str: try: with open(_join_paths(ea4all_config.ea4all_store,file_path), "r") as file: content = file.read() return content except ValueError as e: return str(e) def print_json_to_md(data, indent=0, column=None): try: result = "" header = "" body = "" if isinstance(data, dict): for key, value in data.items(): result += print_json_to_md(value, indent + 2, key) return result elif isinstance(data, list): if column: # Print list items as a Markdown table header = ' ' * indent + f"| {' | '.join(data[0].keys())} | \n" header += ' ' * indent + f"| {' | '.join(['---'] * len(data[0]))} | \n" for item in data: body += ' ' * indent + f"\n\n | {' | '.join(str(item[k]) for k in item.keys())} |" result += header + body return result else: for item in data: header = ' ' * indent + f"| {' | '.join(data[0].keys())} |" body += ' ' * indent + f"\n\n | {' | '.join(str(item[k]) for k in item.keys())} |" result += header + "\n" + body return result else: header += ' ' * indent + f"| {column} " body += f"{str(data)}\n\n" result += header + body return result except Exception as e: return f"{e} - {data}" def markdown_to_plain_text(md): # Convert Markdown to HTML html = markdown.markdown(md) # Convert HTML to plain text using markdownify plain_text = md2text(html) return plain_text def extract_structured_output(response): ##EXTRACT Topic from the content try: return json.loads(response) except ValueError: match = re.search(CFG.REGEX_BACKTICKS, response, re.DOTALL) if match: return json.loads(match.group(1)) else: return None def get_predicted_num_tokens(llm, content): return llm.get_num_tokens(content) def get_predicted_num_tokens_from_prompt(llm, prompt, values): final_prompt = prompt.format(**values) return llm.get_num_tokens(final_prompt) def set_max_new_tokens(predicted_tokens): #Return max new tokens to be generated return int((ea4all_config.max_tokens - predicted_tokens) * 0.95) def escape_special_characters(input_string): # Use json.dumps to escape special characters escaped_string = json.dumps(input_string) # Remove the surrounding double quotes added by json.dumps return escaped_string[1:-1] def clean_and_load_json(content) -> dict: try: json_data = json.loads(content) return json_data except ValueError: clean_string = content.replace("\n","").replace("json","") json_data = json.loads(clean_string) return json_data def extract_response_from_backticks(response): pattern = r"```(.*?)```" match = re.search(pattern, str(response), re.DOTALL) return match.group(1) if match else response def extract_topic_from_business_input(response) -> dict: ##IS JSON already if isinstance(response, dict): return response ##EXTRACT Topic from the content topic = extract_response_from_backticks(response) return clean_and_load_json(topic) ## LLM STRUCTURED OUTPUT Helper functions def extract_landscape(topic): # Prompt extract_landscape_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_landscape_business_query') # Set up a parser: LandscapeAsIs parser = PydanticOutputParser(pydantic_object=topic) final_prompt = extract_landscape_prompt.partial( format_instructions=parser.get_format_instructions(), ai_output=LLAMA31_CHAT_PROMPT_FORMAT, ) return final_prompt def extract_principles(topic): # Set up a parser: LandscapeAsIs parser = PydanticOutputParser(pydantic_object=topic) #PROMPT REVISED TO WORK w/ Llama-3 principle_template = """Identify the list of principles and its meaning from the given context. Do not add any superfluous information. Context: \n {strategic_principles} \n Output your answer as JSON that matches the given schema and nothing else: \n{format_instructions}\n """ prompt = PromptTemplate( template=principle_template, input_variables=["strategic_principles"], partial_variables={ "format_instructions": parser.get_format_instructions(), }, ) return prompt # Task-1: Identify the business requirements, objectives, user journey, and all other relevant information def extract_detailed_business_requirements(llm, topic: type[BaseModel], name:str, values:dict): parser = PydanticOutputParser(pydantic_object=topic) hub_prompt = hub.pull('learn-it-all-do-it-all/ea4all_extract_business_topic') hub_prompt = hub_prompt.partial( topic=name, format_instructions=parser.get_format_instructions(), ) task_1_requirement = hub_prompt | llm | parser response = task_1_requirement.invoke( input=values, config={ 'tags': ['assess_business_query'], 'run_name': name # Custom run name } ) return response # Post-processing def format_docs(docs): return "\n".join(doc.page_content for doc in docs) #load mock data def get_relevant_questions(source: str) -> list: relevant_questions = [] mock = load_mock_content(source) for line in mock.splitlines(): relevant_questions += [line] return relevant_questions