|
from langchain_core.runnables import RunnableConfig |
|
from langchain.docstore.document import Document |
|
from langchain_core.embeddings import Embeddings |
|
from langchain_core.vectorstores import VectorStoreRetriever |
|
|
|
import ast |
|
import numpy as np |
|
import pandas as pd |
|
from contextlib import contextmanager |
|
from typing import Generator |
|
|
|
from ea4all.src.shared.utils import _join_paths |
|
from ea4all.src.shared.configuration import BaseConfiguration |
|
|
|
global _vectorstore |
|
_vectorstore = None |
|
|
|
def make_text_encoder(model: str) -> Embeddings: |
|
"""Connect to the configured text encoder.""" |
|
provider, model = model.split("/", maxsplit=1) |
|
match provider: |
|
case "openai": |
|
from langchain_openai import OpenAIEmbeddings |
|
|
|
return OpenAIEmbeddings(model=model) |
|
case _: |
|
raise ValueError(f"Unsupported embedding provider: {provider}") |
|
|
|
@contextmanager |
|
def make_faiss_retriever( |
|
configuration: BaseConfiguration, embeddings: Embeddings |
|
) -> Generator[VectorStoreRetriever, None, None]: |
|
"""Configure this agent to connect to a FAISS index & namespaces.""" |
|
from langchain_community.docstore.in_memory import InMemoryDocstore |
|
from langchain_community.vectorstores import FAISS |
|
import faiss |
|
|
|
global _vectorstore |
|
|
|
if _vectorstore is None: |
|
try: |
|
_vectorstore = FAISS.load_local( |
|
folder_path=configuration.ea4all_store, |
|
embeddings=embeddings, |
|
index_name=configuration.apm_faiss, |
|
allow_dangerous_deserialization=True) |
|
|
|
except Exception as e: |
|
|
|
index = faiss.IndexFlatL2(len(embeddings.embed_query(""))) |
|
|
|
|
|
_vectorstore = FAISS( |
|
embedding_function=embeddings, |
|
index=index, |
|
docstore=InMemoryDocstore(), |
|
index_to_docstore_id={}, |
|
) |
|
|
|
|
|
|
|
|
|
search_kwargs = configuration.search_kwargs |
|
|
|
yield _vectorstore.as_retriever(search_type="similarity", search_kwargs=search_kwargs) |
|
|
|
@contextmanager |
|
def make_retriever( |
|
config: RunnableConfig, |
|
) -> Generator[VectorStoreRetriever, None, None]: |
|
"""Create a retriever for the agent, based on the current configuration.""" |
|
configuration = BaseConfiguration.from_runnable_config(config) |
|
embeddings = make_text_encoder(configuration.embedding_model) |
|
match configuration.retriever_provider: |
|
case "faiss": |
|
with make_faiss_retriever(configuration, embeddings) as retriever: |
|
yield retriever |
|
|
|
case _: |
|
raise ValueError( |
|
"Unrecognized retriever_provider in configuration. " |
|
f"Expected one of: {', '.join(BaseConfiguration.__annotations__['retriever_provider'].__args__)}\n" |
|
f"Got: {configuration.retriever_provider}" |
|
) |
|
|
|
|
|
def panda_to_langchain_document(dataframe,user_ip): |
|
|
|
apm_documents = [] |
|
|
|
for index, row in dataframe.iterrows(): |
|
|
|
page_content = "" |
|
application = "" |
|
capability = "" |
|
description = "" |
|
fit = "" |
|
roadmap = "" |
|
for column in dataframe.columns: |
|
column = ' '.join(column.split()) |
|
page_content += f" {column}:{row[column]}" |
|
if 'application' in column.lower(): application = row[column] |
|
elif 'capabilit' in column.lower(): capability = row[column] |
|
elif 'desc' in column.lower(): description = row[column] |
|
elif 'business fit' in column.lower(): fit = row[column] |
|
elif 'roadmap' in column.lower(): roadmap = row[column] |
|
doc = Document( |
|
page_content=page_content, |
|
metadata={ |
|
"source": application, |
|
"capability": capability, |
|
"description": description, |
|
"business fit": fit, |
|
"roadmap": roadmap, |
|
"row_number": index, "namespace": user_ip} |
|
) |
|
|
|
apm_documents.append(doc) |
|
return(apm_documents) |
|
|
|
|
|
def apm_dataframe_loader(file): |
|
pd.set_option('display.max_colwidth', None) |
|
df = pd.read_excel(file) |
|
df = df.dropna(axis=0, how='all') |
|
df = df.dropna(axis=1, how='all') |
|
df.fillna('NaN') |
|
|
|
return df |
|
|
|
|
|
|
|
def get_apm_excel_content(config:RunnableConfig, file=None, user_ip="ea4all_agent"): |
|
|
|
if file is None: |
|
file = _join_paths( |
|
getattr(config, "ea4all_store", BaseConfiguration.ea4all_store), |
|
getattr(config, "apm_catalogue", BaseConfiguration.apm_catalogue) |
|
) |
|
|
|
|
|
df = apm_dataframe_loader(file) |
|
|
|
df['namespace'] = user_ip |
|
|
|
apm_docs = panda_to_langchain_document(df, user_ip) |
|
return apm_docs |
|
|
|
def remove_user_apm_faiss(config, db, ea4all_user): |
|
|
|
|
|
|
|
byod = ea4all_user in str(db.docstore._dict.values()) |
|
|
|
|
|
if byod: |
|
removed_ids = [] |
|
for id, doc in db.docstore._dict.items(): |
|
if doc.metadata['namespace'] == ea4all_user: |
|
removed_ids.append(id) |
|
|
|
|
|
if removed_ids: |
|
index_ids = [ |
|
i_id |
|
for i_id, d_id in db.index_to_docstore_id.items() |
|
if d_id in removed_ids |
|
] |
|
|
|
db.delete(ids=removed_ids) |
|
|
|
db.index.remove_ids(np.array(index_ids,dtype=np.int64)) |
|
|
|
db.index_to_docstore_id = { |
|
i: d_id |
|
for i, d_id in enumerate(db.index_to_docstore_id.values()) |
|
} |
|
|
|
db.save_local(folder_path=config.ea4all_store, index_name=config.apm_faiss) |
|
|
|
|
|
def retriever_faiss(db, user_ip="ea4all_agent"): |
|
|
|
|
|
|
|
byod = user_ip in str(db.docstore._dict.values()) |
|
|
|
if byod==False: |
|
namespace="ea4all_agent" |
|
else: |
|
namespace = user_ip |
|
|
|
retriever = db.as_retriever(search_type="similarity", |
|
search_kwargs={'k': 50, 'score_threshold': 0.8, 'filter': {'namespace':namespace}}) |
|
|
|
return retriever |
|
|