|
"""Shared utility functions used in the project. |
|
|
|
Functions: |
|
|
|
""" |
|
|
|
import os |
|
import datetime |
|
import getpass |
|
import base64 |
|
import json |
|
import re |
|
from dotenv import load_dotenv, find_dotenv |
|
import markdown |
|
from markdownify import markdownify as md2text |
|
from io import BytesIO |
|
import pandas as pd |
|
|
|
from pydantic import BaseModel, SecretStr |
|
|
|
from langchain_community.vectorstores import Chroma |
|
from langchain import hub |
|
from langchain_core.prompts import PromptTemplate |
|
|
|
|
|
from langchain_openai import ( |
|
ChatOpenAI, |
|
) |
|
|
|
from langchain_core.output_parsers import ( |
|
PydanticOutputParser, |
|
) |
|
|
|
from langchain_core.messages import ( |
|
AIMessage, |
|
HumanMessage, |
|
get_buffer_string, |
|
) |
|
|
|
from PIL import Image |
|
|
|
from ea4all.src.shared.prompts import ( |
|
LLAMA31_CHAT_PROMPT_FORMAT, |
|
) |
|
|
|
from ea4all.src.shared.configuration import BaseConfiguration as ea4all_config |
|
|
|
|
|
|
|
|
|
|
|
EA4ALL_ARCHITECTURE = "ea4all_architecture.png" |
|
EA4ALL_PODCAST = "ea4all_podcast.wav" |
|
|
|
class CFG: |
|
|
|
EA4ALL_OVERVIEW = "ea4all_overview.png" |
|
EA4ALL_ABOUT = "ea4all_overview.txt" |
|
APM_MOCK_QNA = "apm_qna_mock.txt" |
|
STREAM_SLEEP = 0.05 |
|
REGEX_BACKTICKS = r"```(.*?)```" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
top_p = 0.95 |
|
repetition_penalty = 1.15 |
|
|
|
|
|
split_chunk_size = 500 |
|
split_overlap = 0 |
|
|
|
|
|
|
|
|
|
|
|
k = 3 |
|
|
|
|
|
verbose=True |
|
|
|
|
|
|
|
|
|
|
|
RESIZE_TO = 512 |
|
MAX_WIDTH = 1024 |
|
MAX_HEIGHT = 768 |
|
|
|
|
|
diagram_format = "png" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _get_datetime(): |
|
now = datetime.datetime.now() |
|
return now.strftime("%m/%d/%Y, %H:%M:%S") |
|
|
|
def _get_formatted_date(): |
|
current_date = datetime.datetime.now() |
|
formatted_date = current_date.strftime("%d %B %Y") |
|
|
|
return formatted_date |
|
|
|
|
|
def time_elapsed(start,end): |
|
time_elapsed = int(round(end - start, 0)) |
|
time_elapsed_str = f'{time_elapsed}' |
|
|
|
return time_elapsed_str |
|
|
|
def _join_paths(*paths): |
|
""" |
|
Join two or more paths using os.path.join. |
|
|
|
Parameters: |
|
*paths: str |
|
Two or more path components to be joined. |
|
|
|
Returns: |
|
str |
|
The joined path. |
|
""" |
|
return os.path.join(*paths) |
|
|
|
|
|
def get_user_identification(request): |
|
if request: |
|
try: |
|
user_pip = request.headers.get('X-Forwarded-For') |
|
return user_pip.split(',')[0] |
|
except Exception: |
|
print(f"user info: {request}") |
|
return request.client.host |
|
|
|
return "ea4all_agent" |
|
|
|
|
|
|
|
def get_llm_client(model, api_base_url=None,temperature=0, streaming=False, tokens=ea4all_config.max_tokens) -> ChatOpenAI: |
|
"""Initializes and returns a ChatOpenAI client based on the specified model and parameters.""" |
|
client = ChatOpenAI() |
|
|
|
if model.startswith("gpt-"): |
|
client = ChatOpenAI( |
|
model=model, |
|
temperature=temperature, |
|
streaming=streaming, |
|
max_completion_tokens=tokens, |
|
stream_usage=True |
|
) |
|
elif "llama" in model.lower(): |
|
client = ChatOpenAI( |
|
model=model, |
|
api_key=SecretStr(os.environ['HUGGINGFACEHUB_API_TOKEN']), |
|
base_url=_join_paths(api_base_url, model, "v1/"), |
|
temperature=temperature, |
|
streaming=streaming, |
|
max_completion_tokens=tokens, |
|
stream_usage=True, |
|
) |
|
|
|
return client |
|
|
|
|
|
def load_local_env(local): |
|
|
|
_ = load_dotenv(find_dotenv()) |
|
if local not in os.environ: |
|
os.environ[local] = getpass.getpass(f"Provide your {local} Key") |
|
return os.environ[local] |
|
|
|
|
|
def load_to_chroma(documents, embeddings, path, collection_name="apm_collection"): |
|
|
|
chroma_collection = Chroma ( |
|
collection_name=collection_name, |
|
persist_directory=path, |
|
embedding_function=embeddings |
|
) |
|
|
|
if chroma_collection._collection.count(): |
|
chroma_collection.delete_collection() |
|
else: |
|
|
|
chroma_collection = Chroma.from_documents( |
|
collection_name=collection_name, |
|
persist_directory=path, |
|
documents=documents, |
|
embedding=embeddings |
|
) |
|
chroma_collection.persist() |
|
|
|
return chroma_collection |
|
|
|
|
|
def get_history_gradio(history, chat_history=[]): |
|
history_langchain_format = [] |
|
|
|
|
|
|
|
|
|
|
|
history = history["chat_memory"] |
|
|
|
for human, ai in history: |
|
history_langchain_format.append(HumanMessage(content=human)) |
|
history_langchain_format.append(AIMessage(content=ai)) |
|
|
|
history = {"history":get_buffer_string(history_langchain_format)} |
|
|
|
return history |
|
|
|
|
|
def get_vqa_examples() -> list: |
|
examples=[ |
|
{"text": "Describe this image.", "files": ["ea4all/images/multi-app-architecture.png"]}, |
|
{"text": "Assess any risk and vulnerabilities in the current solution.", "files": ["ea4all/images/ea4all_architecture.png"]}, |
|
] |
|
return examples |
|
|
|
|
|
def encode_image(image_path): |
|
with open(image_path, "rb") as image_file: |
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
def resize_image_1(raw_image, input_size): |
|
w, h = raw_image.size |
|
scale = input_size / max(w, h) |
|
new_w = int(w * scale) |
|
new_h = int(h * scale) |
|
resized_image = raw_image.resize((new_w, new_h)) |
|
|
|
return resized_image |
|
|
|
def resize_image_2(image, width): |
|
wpercent = width / float(image.size[0]) |
|
hsize = int( float(image.size[1]) * wpercent ) |
|
raw_image = image.resize([width, hsize]) |
|
|
|
return raw_image |
|
|
|
def resize_image_3(image): |
|
from PIL import Image |
|
|
|
width, height = image.size |
|
|
|
|
|
if width > CFG.MAX_WIDTH or height > CFG.MAX_HEIGHT: |
|
ratio = min(CFG.MAX_WIDTH / width, CFG.MAX_HEIGHT / height) |
|
new_width = int(width * ratio) |
|
new_height = int(height * ratio) |
|
else: |
|
new_width, new_height = width, height |
|
|
|
|
|
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS) |
|
|
|
|
|
return image |
|
|
|
|
|
def encode_raw_image(raw_image): |
|
|
|
buffer = BytesIO() |
|
|
|
|
|
raw_image.save(buffer, format='PNG') |
|
|
|
|
|
img_bytes = buffer.getvalue() |
|
|
|
|
|
img_base64 = base64.b64encode(img_bytes) |
|
|
|
|
|
img_str = img_base64.decode('utf-8') |
|
|
|
return img_str |
|
|
|
|
|
def get_raw_image(image_path): |
|
|
|
diagram = Image.open(image_path) |
|
w, h = diagram.size |
|
if w > CFG.RESIZE_TO or h > CFG.RESIZE_TO: |
|
resized_image = resize_image_3(diagram) |
|
else: |
|
resized_image = diagram |
|
|
|
|
|
raw_image = encode_raw_image(resized_image) |
|
|
|
return raw_image |
|
|
|
def load_mock_content(file_path) -> str: |
|
try: |
|
with open(_join_paths(ea4all_config.ea4all_store,file_path), "r") as file: |
|
content = file.read() |
|
return content |
|
except ValueError as e: |
|
return str(e) |
|
|
|
def print_json_to_md(data, indent=0, column=None): |
|
try: |
|
result = "" |
|
header = "" |
|
body = "" |
|
if isinstance(data, dict): |
|
for key, value in data.items(): |
|
result += print_json_to_md(value, indent + 2, key) |
|
return result |
|
elif isinstance(data, list): |
|
if column: |
|
header = ' ' * indent + f"| {' | '.join(data[0].keys())} | \n" |
|
header += ' ' * indent + f"| {' | '.join(['---'] * len(data[0]))} | \n" |
|
for item in data: |
|
body += ' ' * indent + f"\n\n | {' | '.join(str(item[k]) for k in item.keys())} |" |
|
result += header + body |
|
return result |
|
else: |
|
for item in data: |
|
header = ' ' * indent + f"| {' | '.join(data[0].keys())} |" |
|
body += ' ' * indent + f"\n\n | {' | '.join(str(item[k]) for k in item.keys())} |" |
|
result += header + "\n" + body |
|
return result |
|
else: |
|
header += ' ' * indent + f"| {column} " |
|
body += f"{str(data)}\n\n" |
|
result += header + body |
|
return result |
|
|
|
except Exception as e: |
|
return f"{e} - {data}" |
|
|
|
def markdown_to_plain_text(md): |
|
|
|
html = markdown.markdown(md) |
|
|
|
plain_text = md2text(html) |
|
return plain_text |
|
|
|
def extract_structured_output(response): |
|
|
|
try: |
|
return json.loads(response) |
|
except ValueError: |
|
match = re.search(CFG.REGEX_BACKTICKS, response, re.DOTALL) |
|
|
|
if match: |
|
return json.loads(match.group(1)) |
|
else: |
|
return None |
|
|
|
def get_predicted_num_tokens(llm, content): |
|
return llm.get_num_tokens(content) |
|
|
|
def get_predicted_num_tokens_from_prompt(llm, prompt, values): |
|
final_prompt = prompt.format(**values) |
|
return llm.get_num_tokens(final_prompt) |
|
|
|
def set_max_new_tokens(predicted_tokens): |
|
|
|
return int((ea4all_config.max_tokens - predicted_tokens) * 0.95) |
|
|
|
def escape_special_characters(input_string): |
|
|
|
escaped_string = json.dumps(input_string) |
|
|
|
return escaped_string[1:-1] |
|
|
|
def clean_and_load_json(content) -> dict: |
|
try: |
|
json_data = json.loads(content) |
|
return json_data |
|
except ValueError: |
|
clean_string = content.replace("\n","").replace("json","") |
|
json_data = json.loads(clean_string) |
|
return json_data |
|
|
|
def extract_response_from_backticks(response): |
|
pattern = r"```(.*?)```" |
|
match = re.search(pattern, str(response), re.DOTALL) |
|
|
|
return match.group(1) if match else response |
|
|
|
def extract_topic_from_business_input(response) -> dict: |
|
|
|
if isinstance(response, dict): |
|
return response |
|
|
|
|
|
topic = extract_response_from_backticks(response) |
|
|
|
return clean_and_load_json(topic) |
|
|
|
|
|
def extract_landscape(topic): |
|
|
|
extract_landscape_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_landscape_business_query') |
|
|
|
|
|
parser = PydanticOutputParser(pydantic_object=topic) |
|
|
|
final_prompt = extract_landscape_prompt.partial( |
|
format_instructions=parser.get_format_instructions(), |
|
ai_output=LLAMA31_CHAT_PROMPT_FORMAT, |
|
) |
|
|
|
return final_prompt |
|
|
|
def extract_principles(topic): |
|
|
|
|
|
parser = PydanticOutputParser(pydantic_object=topic) |
|
|
|
|
|
principle_template = """Identify the list of principles and its meaning from the given context. |
|
Do not add any superfluous information. |
|
Context: \n {strategic_principles} \n |
|
Output your answer as JSON that matches the given schema and nothing else: \n{format_instructions}\n |
|
""" |
|
|
|
prompt = PromptTemplate( |
|
template=principle_template, |
|
input_variables=["strategic_principles"], |
|
partial_variables={ |
|
"format_instructions": parser.get_format_instructions(), |
|
}, |
|
) |
|
|
|
return prompt |
|
|
|
|
|
def extract_detailed_business_requirements(llm, topic: type[BaseModel], name:str, values:dict): |
|
parser = PydanticOutputParser(pydantic_object=topic) |
|
|
|
hub_prompt = hub.pull('learn-it-all-do-it-all/ea4all_extract_business_topic') |
|
hub_prompt = hub_prompt.partial( |
|
topic=name, |
|
format_instructions=parser.get_format_instructions(), |
|
) |
|
|
|
task_1_requirement = hub_prompt | llm | parser |
|
response = task_1_requirement.invoke( |
|
input=values, |
|
config={ |
|
'tags': ['assess_business_query'], |
|
'run_name': name |
|
} |
|
) |
|
|
|
return response |
|
|
|
|
|
def format_docs(docs): |
|
return "\n".join(doc.page_content for doc in docs) |
|
|
|
|
|
def get_relevant_questions(source: str) -> list: |
|
relevant_questions = [] |
|
mock = load_mock_content(source) |
|
for line in mock.splitlines(): relevant_questions += [line] |
|
|
|
return relevant_questions |
|
|