avfranco's picture
ea4all-mcp-lgs-sync-UAT-passed
4a6af9d
"""This "graph" simply exposes an endpoint for a user to upload docs to be indexed."""
"""Changelog: 2025-06-03
- Refactored code to fix problems with linter and type checking (Standard mode)
- Refactored to use langgraph state management for MCP compatibility.
- Enabled input BYOD (Bring Your Own Data) for indexing.
"""
from typing import Optional
from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraph
from ea4all.src.ea4all_indexer.configuration import IndexConfiguration
from ea4all.src.ea4all_indexer.state import InputState, OutputState, OverallState
from ea4all.src.shared import vectorstore
from ea4all.src.shared.configuration import BaseConfiguration
async def index_docs(
state: InputState, *, config: RunnableConfig
) -> dict[str, str]:
"""Asynchronously index documents in the given state using the configured retriever.
This function takes the documents from the state, ensures they have a user ID,
adds them to the retriever's index, and then signals for the documents to be
deleted from the state.
If docs are not provided in the state, they will be loaded
from the configuration.docs_file JSON file.
Args:
state (IndexState): The current state containing documents and retriever.
config (Optional[RunnableConfig]): Configuration for the indexing process.r
"""
if not config:
raise ValueError("Configuration required to run index_docs.")
#configuration = IndexConfiguration.from_runnable_config(config)
with vectorstore.make_retriever(config) as retriever:
if len(retriever.vectorstore.docstore._dict) == 0:
apm_docs = vectorstore.get_apm_excel_content(config, file=state.path)
await retriever.aadd_documents(apm_docs)
retriever.vectorstore.save_local(
folder_path=getattr(config, "ea4all_store", BaseConfiguration.ea4all_store),
index_name=getattr(config, "apm_faiss", BaseConfiguration.apm_faiss)
)
return {"docs": "delete"}
# Define the graph
builder = StateGraph(OverallState, input=InputState, output=OutputState, config_schema=IndexConfiguration)
builder.add_node("apm_indexer",index_docs)
builder.add_edge(START, "apm_indexer")
# Compile into a graph object that you can invoke and deploy.
indexer_graph = builder.compile()
indexer_graph.name = "ea4all_bring_your_onw_data"