avfranco's picture
ea4all-gradio-agents-mcp-hackathon-kickoff
7042c3c
from langchain_core.runnables import RunnableConfig
from langchain.docstore.document import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStoreRetriever
import ast
import numpy as np
import pandas as pd
from contextlib import contextmanager
from typing import Generator
from ea4all.src.shared.utils import _join_paths
from ea4all.src.shared.configuration import BaseConfiguration
global _vectorstore
_vectorstore = None
def make_text_encoder(model: str) -> Embeddings:
"""Connect to the configured text encoder."""
provider, model = model.split("/", maxsplit=1)
match provider:
case "openai":
from langchain_openai import OpenAIEmbeddings
return OpenAIEmbeddings(model=model)
case _:
raise ValueError(f"Unsupported embedding provider: {provider}")
@contextmanager
def make_faiss_retriever(
configuration: BaseConfiguration, embeddings: Embeddings
) -> Generator[VectorStoreRetriever, None, None]:
"""Configure this agent to connect to a FAISS index & namespaces."""
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores import FAISS
import faiss
global _vectorstore
if _vectorstore is None:
try:
_vectorstore = FAISS.load_local(
folder_path=configuration.ea4all_store,
embeddings=embeddings,
index_name=configuration.apm_faiss,
allow_dangerous_deserialization=True)
except Exception as e:
# Create an empty index
index = faiss.IndexFlatL2(len(embeddings.embed_query("")))
#Initialize an empty FAISS vectorstore
_vectorstore = FAISS(
embedding_function=embeddings,
index=index,
docstore=InMemoryDocstore(),
index_to_docstore_id={},
)
#apm_docs = get_apm_excel_content(configuration)
#_vectorstore = FAISS.from_documents(apm_docs, embeddings)
#_vectorstore.save_local(folder_path=configuration.ea4all_store, index_name=configuration.apm_faiss,)
search_kwargs = configuration.search_kwargs
yield _vectorstore.as_retriever(search_type="similarity", search_kwargs=search_kwargs)
@contextmanager
def make_retriever(
config: RunnableConfig,
) -> Generator[VectorStoreRetriever, None, None]:
"""Create a retriever for the agent, based on the current configuration."""
configuration = BaseConfiguration.from_runnable_config(config)
embeddings = make_text_encoder(configuration.embedding_model)
match configuration.retriever_provider:
case "faiss":
with make_faiss_retriever(configuration, embeddings) as retriever:
yield retriever
case _:
raise ValueError(
"Unrecognized retriever_provider in configuration. "
f"Expected one of: {', '.join(BaseConfiguration.__annotations__['retriever_provider'].__args__)}\n"
f"Got: {configuration.retriever_provider}"
)
#convert dataframe to langchain document structure, added user_ip
def panda_to_langchain_document(dataframe,user_ip):
# create an empty list to store the documents
apm_documents = []
# iterate over the rows of the dataframe
for index, row in dataframe.iterrows():
# create a document object from the row values for all df columns
page_content = ""
application = ""
capability = ""
description = ""
fit = ""
roadmap = ""
for column in dataframe.columns:
column = ' '.join(column.split())
page_content += f" {column}:{row[column]}"
if 'application' in column.lower(): application = row[column]
elif 'capabilit' in column.lower(): capability = row[column]
elif 'desc' in column.lower(): description = row[column]
elif 'business fit' in column.lower(): fit = row[column]
elif 'roadmap' in column.lower(): roadmap = row[column]
doc = Document(
page_content=page_content,
metadata={
"source": application,
"capability": capability,
"description": description,
"business fit": fit,
"roadmap": roadmap,
"row_number": index, "namespace": user_ip}
)
# append the document object to the list
apm_documents.append(doc)
return(apm_documents)
#local landscape data (excel file)
def apm_dataframe_loader(file):
pd.set_option('display.max_colwidth', None)
df = pd.read_excel(file)
df = df.dropna(axis=0, how='all')
df = df.dropna(axis=1, how='all')
df.fillna('NaN')
return df
##New APM Excel loader
#Removed df from return
def get_apm_excel_content(config:RunnableConfig, file=None, user_ip="ea4all_agent"):
if file is None:
file = _join_paths(
getattr(config, "ea4all_store", BaseConfiguration.ea4all_store),
getattr(config, "apm_catalogue", BaseConfiguration.apm_catalogue)
)
#load file into dataframe
df = apm_dataframe_loader(file)
#add user_id into df
df['namespace'] = user_ip
apm_docs = panda_to_langchain_document(df, user_ip)
return apm_docs
def remove_user_apm_faiss(config, db, ea4all_user):
#apm_vectorstore.docstore.__dict__["_dict"][apm_vectorstore.index_to_docstore_id[0]].metadata
#check if user's uploaded any apm before
byod = ea4all_user in str(db.docstore._dict.values())
#if yes
if byod:
removed_ids = []
for id, doc in db.docstore._dict.items():
if doc.metadata['namespace'] == ea4all_user:
removed_ids.append(id)
##save updated index
if removed_ids:
index_ids = [
i_id
for i_id, d_id in db.index_to_docstore_id.items()
if d_id in removed_ids
]
#Remove ids from docstore
db.delete(ids=removed_ids)
#Remove the corresponding embeddings from the FAISS index
db.index.remove_ids(np.array(index_ids,dtype=np.int64))
#Reorg embeddings
db.index_to_docstore_id = {
i: d_id
for i, d_id in enumerate(db.index_to_docstore_id.values())
}
#save updated index
db.save_local(folder_path=config.ea4all_store, index_name=config.apm_faiss)
#Get faiss index as a retriever
def retriever_faiss(db, user_ip="ea4all_agent"):
##size: len(retriever.vectorstore.index_to_docstore_id), retriever.vectorstore.index.ntotal
#check if user's BYOData
byod = user_ip in str(db.docstore._dict.values())
if byod==False:
namespace="ea4all_agent"
else:
namespace = user_ip
retriever = db.as_retriever(search_type="similarity",
search_kwargs={'k': 50, 'score_threshold': 0.8, 'filter': {'namespace':namespace}})
return retriever