File size: 2,424 Bytes
7042c3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4a6af9d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""This "graph" simply exposes an endpoint for a user to upload docs to be indexed."""
"""Changelog: 2025-06-03
- Refactored code to fix problems with linter and type checking (Standard mode)
- Refactored to use langgraph state management for MCP compatibility.
- Enabled input BYOD (Bring Your Own Data) for indexing.
"""

from typing import Optional

from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraph

from ea4all.src.ea4all_indexer.configuration import IndexConfiguration
from ea4all.src.ea4all_indexer.state import InputState, OutputState, OverallState
from ea4all.src.shared import vectorstore
from ea4all.src.shared.configuration import BaseConfiguration

async def index_docs(
    state: InputState, *, config: RunnableConfig
) -> dict[str, str]:
    """Asynchronously index documents in the given state using the configured retriever.

    This function takes the documents from the state, ensures they have a user ID,
    adds them to the retriever's index, and then signals for the documents to be
    deleted from the state.

    If docs are not provided in the state, they will be loaded
    from the configuration.docs_file JSON file.

    Args:
        state (IndexState): The current state containing documents and retriever.
        config (Optional[RunnableConfig]): Configuration for the indexing process.r
    """
    if not config:
        raise ValueError("Configuration required to run index_docs.")

    #configuration = IndexConfiguration.from_runnable_config(config)

    with vectorstore.make_retriever(config) as retriever:
        if len(retriever.vectorstore.docstore._dict) == 0:
            apm_docs = vectorstore.get_apm_excel_content(config, file=state.path)
            await retriever.aadd_documents(apm_docs)
            retriever.vectorstore.save_local(
                folder_path=getattr(config, "ea4all_store", BaseConfiguration.ea4all_store), 
                index_name=getattr(config, "apm_faiss", BaseConfiguration.apm_faiss)
            )

    return {"docs": "delete"}

# Define the graph
builder = StateGraph(OverallState, input=InputState, output=OutputState, config_schema=IndexConfiguration)
builder.add_node("apm_indexer",index_docs)
builder.add_edge(START, "apm_indexer")

# Compile into a graph object that you can invoke and deploy.
indexer_graph = builder.compile()
indexer_graph.name = "ea4all_bring_your_onw_data"