avfranco's picture
ea4all-gradio-agents-mcp-hackathon-tools-apm-upload
47bbae2
"""Shared utility functions used in the project.
Functions:
"""
import os
import datetime
import getpass
import base64
import json
import re
from dotenv import load_dotenv, find_dotenv
import markdown
from markdownify import markdownify as md2text
from io import BytesIO
import pandas as pd
from pydantic import BaseModel, SecretStr
from langchain_community.vectorstores import Chroma
from langchain import hub
from langchain_core.prompts import PromptTemplate
#Model & Index & Embeddings
from langchain_openai import (
ChatOpenAI,
)
from langchain_core.output_parsers import (
PydanticOutputParser,
)
from langchain_core.messages import (
AIMessage,
HumanMessage,
get_buffer_string,
)
from PIL import Image
from ea4all.src.shared.prompts import (
LLAMA31_CHAT_PROMPT_FORMAT,
)
from ea4all.src.shared.configuration import BaseConfiguration as ea4all_config
############
##INIT model
############
#initialise model / constants
EA4ALL_ARCHITECTURE = "ea4all_architecture.png"
EA4ALL_PODCAST = "ea4all_podcast.wav"
class CFG:
# Constants
EA4ALL_OVERVIEW = "ea4all_overview.png"
EA4ALL_ABOUT = "ea4all_overview.txt"
APM_MOCK_QNA = "apm_qna_mock.txt"
STREAM_SLEEP = 0.05
REGEX_BACKTICKS = r"```(.*?)```"
# LLMs
#model = {"gpt-4":'gpt-4o-mini', "gpt-4o":'gpt-4o'}
#llama = {"11": "meta-llama/llama-3.2-11B-Vision-Instruct", "90":"meta-llama/llama-3.2-90B-Vision-Instruct", "70":"meta-llama/Llama-3.1-70B-Instruct", "73":"meta-llama/Llama-3.3-70B-Instruct"}
#hf_model="meta-llama/Llama-3.1-70B-Instruct"
#hf_api_base="https://api-inference.huggingface.co/models/"
#hf_max_tokens=16192
#max_new_tokens = 4096
#llama32_max_tokens = 4096 ##TOKEN ISSUE LLAMA-3.2 w/ ChatOpenAI not working tokens > 4096 2024-10-13
#temperature = 0
top_p = 0.95
repetition_penalty = 1.15
# splitting
split_chunk_size = 500
split_overlap = 0
# embeddings
#embeddings_model = OpenAIEmbeddings()
# similar passages
k = 3
#debug
verbose=True
#streamming
#streamming=True
#VQA resized images - maximum resolution for Llama-3.2
RESIZE_TO = 512
MAX_WIDTH = 1024
MAX_HEIGHT = 768
##Diagrams format
diagram_format = "png"
# paths ea4all/src/tools
#apm_store = "/Users/avfranco/Documents/GitHub/ea4all-agentic-staging/ea4all/apm_store/"
#apm_path = apm_store + 'APM-ea4all (test-split).xlsx'
#dbr_demo = apm_store + "reference_architecture_dbr_demo.txt"
#'ea4all_images = "/Users/avfranco/Documents/GitHub/ea4all-agentic-staging/ea4all/images/"
#apm_faiss = "apm_store"
#faiss_index = 'apm_faiss_index'
###################################
##COLLECTION of re-usable functions
###################################
#return current date-time
def _get_datetime():
now = datetime.datetime.now()
return now.strftime("%m/%d/%Y, %H:%M:%S")
def _get_formatted_date():
current_date = datetime.datetime.now()
formatted_date = current_date.strftime("%d %B %Y")
return formatted_date
#calculate dif end-start execution
def time_elapsed(start,end):
time_elapsed = int(round(end - start, 0))
time_elapsed_str = f'{time_elapsed}'
return time_elapsed_str
def _join_paths(*paths):
"""
Join two or more paths using os.path.join.
Parameters:
*paths: str
Two or more path components to be joined.
Returns:
str
The joined path.
"""
return os.path.join(*paths)
#get user request info
def get_user_identification(request):
if request:
try:
user_pip = request.headers.get('X-Forwarded-For')
return user_pip.split(',')[0]
except Exception:
print(f"user info: {request}")
return request.client.host
return "ea4all_agent"
#Initialise model
## SETUP LLM CLIENT
def get_llm_client(model, api_base_url=None,temperature=0, streaming=False, tokens=ea4all_config.max_tokens) -> ChatOpenAI:
"""Initializes and returns a ChatOpenAI client based on the specified model and parameters."""
client = ChatOpenAI()
if model.startswith("gpt-"):
client = ChatOpenAI(
model=model,
temperature=temperature,
streaming=streaming,
max_completion_tokens=tokens,
stream_usage=True
)
elif "llama" in model.lower(): # Meta-llama models
client = ChatOpenAI(
model=model,
api_key=SecretStr(os.environ['HUGGINGFACEHUB_API_TOKEN']),
base_url=_join_paths(api_base_url, model, "v1/"),
temperature=temperature,
streaming=streaming,
max_completion_tokens=tokens,
stream_usage=True,
)
return client
#load local env variables
def load_local_env(local):
###read local .env file
_ = load_dotenv(find_dotenv())
if local not in os.environ:
os.environ[local] = getpass.getpass(f"Provide your {local} Key")
return os.environ[local]
#locad landscape data into chroma
def load_to_chroma(documents, embeddings, path, collection_name="apm_collection"):
#Read chromadb chroma-apm-db
chroma_collection = Chroma (
collection_name=collection_name,
persist_directory=path,
embedding_function=embeddings
)
if chroma_collection._collection.count():
chroma_collection.delete_collection()
else:
#Add apm records
chroma_collection = Chroma.from_documents(
collection_name=collection_name,
persist_directory=path,
documents=documents,
embedding=embeddings
)
chroma_collection.persist()
return chroma_collection
##Convert gradio chat_history to langchain chat_history_format
def get_history_gradio(history, chat_history=[]):
history_langchain_format = []
#triggered by loaded memory runnable to replace ConversationMemoryBuffer.load_memory_variables
#if chat_history or not history:
# memory = chat_history
#triggered by loaded_memory runnable
#else:
history = history["chat_memory"]
for human, ai in history:
history_langchain_format.append(HumanMessage(content=human))
history_langchain_format.append(AIMessage(content=ai))
history = {"history":get_buffer_string(history_langchain_format)}
return history
#retrieve relevant questions based on user interaction
def get_vqa_examples() -> list:
examples=[
{"text": "Describe this image.", "files": ["ea4all/images/multi-app-architecture.png"]},
{"text": "Assess any risk and vulnerabilities in the current solution.", "files": ["ea4all/images/ea4all_architecture.png"]},
]
return examples
# Function to encode the image
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def resize_image_1(raw_image, input_size):
w, h = raw_image.size
scale = input_size / max(w, h)
new_w = int(w * scale)
new_h = int(h * scale)
resized_image = raw_image.resize((new_w, new_h))
return resized_image
def resize_image_2(image, width):
wpercent = width / float(image.size[0])
hsize = int( float(image.size[1]) * wpercent )
raw_image = image.resize([width, hsize])
return raw_image
def resize_image_3(image):
from PIL import Image
# Get the current size
width, height = image.size
# Calculate the new size maintaining the aspect ratio
if width > CFG.MAX_WIDTH or height > CFG.MAX_HEIGHT:
ratio = min(CFG.MAX_WIDTH / width, CFG.MAX_HEIGHT / height)
new_width = int(width * ratio)
new_height = int(height * ratio)
else:
new_width, new_height = width, height
# Resize the image
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Return new resized image
return image
#Encode PIL.Image to base64
def encode_raw_image(raw_image):
# Create a BytesIO buffer
buffer = BytesIO()
# Save the image to the buffer in PNG format
raw_image.save(buffer, format='PNG')
# Get the content of the buffer
img_bytes = buffer.getvalue()
# Encode the bytes to base64
img_base64 = base64.b64encode(img_bytes)
# Convert the bytes to string
img_str = img_base64.decode('utf-8')
return img_str
#Return a raw image ready to OpenAI GPT4-Vision
def get_raw_image(image_path):
# Open & Resize & Encode image
diagram = Image.open(image_path)
w, h = diagram.size
if w > CFG.RESIZE_TO or h > CFG.RESIZE_TO:
resized_image = resize_image_3(diagram)
else:
resized_image = diagram
#Encode diagram
raw_image = encode_raw_image(resized_image)
return raw_image
def load_mock_content(file_path) -> str:
try:
with open(_join_paths(ea4all_config.ea4all_store,file_path), "r") as file:
content = file.read()
return content
except ValueError as e:
return str(e)
def print_json_to_md(data, indent=0, column=None):
try:
result = ""
header = ""
body = ""
if isinstance(data, dict):
for key, value in data.items():
result += print_json_to_md(value, indent + 2, key)
return result
elif isinstance(data, list):
if column: # Print list items as a Markdown table
header = ' ' * indent + f"| {' | '.join(data[0].keys())} | \n"
header += ' ' * indent + f"| {' | '.join(['---'] * len(data[0]))} | \n"
for item in data:
body += ' ' * indent + f"\n\n | {' | '.join(str(item[k]) for k in item.keys())} |"
result += header + body
return result
else:
for item in data:
header = ' ' * indent + f"| {' | '.join(data[0].keys())} |"
body += ' ' * indent + f"\n\n | {' | '.join(str(item[k]) for k in item.keys())} |"
result += header + "\n" + body
return result
else:
header += ' ' * indent + f"| {column} "
body += f"{str(data)}\n\n"
result += header + body
return result
except Exception as e:
return f"{e} - {data}"
def markdown_to_plain_text(md):
# Convert Markdown to HTML
html = markdown.markdown(md)
# Convert HTML to plain text using markdownify
plain_text = md2text(html)
return plain_text
def extract_structured_output(response):
##EXTRACT Topic from the content
try:
return json.loads(response)
except ValueError:
match = re.search(CFG.REGEX_BACKTICKS, response, re.DOTALL)
if match:
return json.loads(match.group(1))
else:
return None
def get_predicted_num_tokens(llm, content):
return llm.get_num_tokens(content)
def get_predicted_num_tokens_from_prompt(llm, prompt, values):
final_prompt = prompt.format(**values)
return llm.get_num_tokens(final_prompt)
def set_max_new_tokens(predicted_tokens):
#Return max new tokens to be generated
return int((ea4all_config.max_tokens - predicted_tokens) * 0.95)
def escape_special_characters(input_string):
# Use json.dumps to escape special characters
escaped_string = json.dumps(input_string)
# Remove the surrounding double quotes added by json.dumps
return escaped_string[1:-1]
def clean_and_load_json(content) -> dict:
try:
json_data = json.loads(content)
return json_data
except ValueError:
clean_string = content.replace("\n","").replace("json","")
json_data = json.loads(clean_string)
return json_data
def extract_response_from_backticks(response):
pattern = r"```(.*?)```"
match = re.search(pattern, str(response), re.DOTALL)
return match.group(1) if match else response
def extract_topic_from_business_input(response) -> dict:
##IS JSON already
if isinstance(response, dict):
return response
##EXTRACT Topic from the content
topic = extract_response_from_backticks(response)
return clean_and_load_json(topic)
## LLM STRUCTURED OUTPUT Helper functions
def extract_landscape(topic):
# Prompt
extract_landscape_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_landscape_business_query')
# Set up a parser: LandscapeAsIs
parser = PydanticOutputParser(pydantic_object=topic)
final_prompt = extract_landscape_prompt.partial(
format_instructions=parser.get_format_instructions(),
ai_output=LLAMA31_CHAT_PROMPT_FORMAT,
)
return final_prompt
def extract_principles(topic):
# Set up a parser: LandscapeAsIs
parser = PydanticOutputParser(pydantic_object=topic)
#PROMPT REVISED TO WORK w/ Llama-3
principle_template = """Identify the list of principles and its meaning from the given context.
Do not add any superfluous information.
Context: \n {strategic_principles} \n
Output your answer as JSON that matches the given schema and nothing else: \n{format_instructions}\n
"""
prompt = PromptTemplate(
template=principle_template,
input_variables=["strategic_principles"],
partial_variables={
"format_instructions": parser.get_format_instructions(),
},
)
return prompt
# Task-1: Identify the business requirements, objectives, user journey, and all other relevant information
def extract_detailed_business_requirements(llm, topic: type[BaseModel], name:str, values:dict):
parser = PydanticOutputParser(pydantic_object=topic)
hub_prompt = hub.pull('learn-it-all-do-it-all/ea4all_extract_business_topic')
hub_prompt = hub_prompt.partial(
topic=name,
format_instructions=parser.get_format_instructions(),
)
task_1_requirement = hub_prompt | llm | parser
response = task_1_requirement.invoke(
input=values,
config={
'tags': ['assess_business_query'],
'run_name': name # Custom run name
}
)
return response
# Post-processing
def format_docs(docs):
return "\n".join(doc.page_content for doc in docs)
#load mock data
def get_relevant_questions(source: str) -> list:
relevant_questions = []
mock = load_mock_content(source)
for line in mock.splitlines(): relevant_questions += [line]
return relevant_questions