|
"""This "graph" simply exposes an endpoint for a user to upload docs to be indexed.""" |
|
"""Changelog: 2025-06-03 |
|
- Refactored code to fix problems with linter and type checking (Standard mode) |
|
- Refactored to use langgraph state management for MCP compatibility. |
|
- Enabled input BYOD (Bring Your Own Data) for indexing. |
|
""" |
|
|
|
from typing import Optional |
|
|
|
from langchain_core.runnables import RunnableConfig |
|
from langgraph.graph import END, START, StateGraph |
|
|
|
from ea4all.src.ea4all_indexer.configuration import IndexConfiguration |
|
from ea4all.src.ea4all_indexer.state import InputState, OutputState, OverallState |
|
from ea4all.src.shared import vectorstore |
|
from ea4all.src.shared.configuration import BaseConfiguration |
|
|
|
async def index_docs( |
|
state: InputState, *, config: RunnableConfig |
|
) -> dict[str, str]: |
|
"""Asynchronously index documents in the given state using the configured retriever. |
|
|
|
This function takes the documents from the state, ensures they have a user ID, |
|
adds them to the retriever's index, and then signals for the documents to be |
|
deleted from the state. |
|
|
|
If docs are not provided in the state, they will be loaded |
|
from the configuration.docs_file JSON file. |
|
|
|
Args: |
|
state (IndexState): The current state containing documents and retriever. |
|
config (Optional[RunnableConfig]): Configuration for the indexing process.r |
|
""" |
|
if not config: |
|
raise ValueError("Configuration required to run index_docs.") |
|
|
|
|
|
|
|
with vectorstore.make_retriever(config) as retriever: |
|
if len(retriever.vectorstore.docstore._dict) == 0: |
|
apm_docs = vectorstore.get_apm_excel_content(config, file=state.path) |
|
await retriever.aadd_documents(apm_docs) |
|
retriever.vectorstore.save_local( |
|
folder_path=getattr(config, "ea4all_store", BaseConfiguration.ea4all_store), |
|
index_name=getattr(config, "apm_faiss", BaseConfiguration.apm_faiss) |
|
) |
|
|
|
return {"docs": "delete"} |
|
|
|
|
|
builder = StateGraph(OverallState, input=InputState, output=OutputState, config_schema=IndexConfiguration) |
|
builder.add_node("apm_indexer",index_docs) |
|
builder.add_edge(START, "apm_indexer") |
|
|
|
|
|
indexer_graph = builder.compile() |
|
indexer_graph.name = "ea4all_bring_your_onw_data" |
|
|