|
|
|
import streamlit as st |
|
import logging |
|
import os |
|
|
|
from annotated_text import annotation |
|
from json import JSONDecodeError |
|
from markdown import markdown |
|
from utils.config import parser |
|
from utils.haystack import start_document_store, start_haystack_extractive, start_haystack_rag, query, start_preprocessor_node, start_retriever, start_reader |
|
from utils.ui import reset_results, set_initial_state, upload_doc |
|
|
|
|
|
DEFAULT_DOCS_FROM_RETRIEVER = int(os.getenv("DEFAULT_DOCS_FROM_RETRIEVER", "3")) |
|
DEFAULT_NUMBER_OF_ANSWERS = int(os.getenv("DEFAULT_NUMBER_OF_ANSWERS", "3")) |
|
|
|
|
|
|
|
|
|
|
|
DISABLE_FILE_UPLOAD = bool(os.getenv("DISABLE_FILE_UPLOAD")) |
|
UPLOAD_DOCUMENTS = [] |
|
|
|
|
|
|
|
|
|
def upload_files(): |
|
uploaded_files = st.sidebar.file_uploader( |
|
"upload", type=["pdf", "txt", "docx"], accept_multiple_files=True, label_visibility="hidden" |
|
) |
|
return uploaded_files |
|
|
|
|
|
|
|
def process_file(data_file, preprocesor, document_store): |
|
|
|
file_contents = data_file.read() |
|
docs = [{ |
|
'content': str(file_contents), |
|
'meta': {'name': str(data_file.name)} |
|
}] |
|
try: |
|
names = [item.meta.get('name') for item in document_store.get_all_documents()] |
|
|
|
|
|
if data_file.name in names: |
|
print(f"{data_file.name} already processed") |
|
else: |
|
print(f'preprocessing uploaded doc {data_file.name}.......') |
|
preprocessed_docs = preprocesor.process(docs) |
|
print('writing to document store.......') |
|
document_store.write_documents(preprocessed_docs) |
|
print('updating emebdding.......') |
|
document_store.update_embeddings(retriever) |
|
except Exception as e: |
|
print(e) |
|
|
|
try: |
|
args = parser.parse_args() |
|
|
|
set_initial_state() |
|
st.write('# '+args.name) |
|
session_state = st.session_state |
|
|
|
preprocesor = start_preprocessor_node() |
|
document_store = start_document_store(args.store) |
|
retriever = start_retriever(document_store) |
|
reader = start_reader() |
|
if args.task == 'extractive': |
|
pipeline = start_haystack_extractive(document_store, retriever, reader) |
|
else: |
|
pipeline = start_haystack_rag(document_store, retriever) |
|
|
|
|
|
|
|
|
|
|
|
|
|
if not DISABLE_FILE_UPLOAD: |
|
st.sidebar.write("## File Upload:") |
|
|
|
|
|
|
|
data_files = upload_files() |
|
if data_files is not None: |
|
for data_file in data_files: |
|
|
|
if data_file: |
|
try: |
|
|
|
|
|
if args.store == 'inmemory': |
|
processed_data = process_file(data_file, preprocesor, document_store) |
|
st.sidebar.write(str(data_file.name) + " β
") |
|
except Exception as e: |
|
st.sidebar.write(str(data_file.name) + " β ") |
|
st.sidebar.write("_This file could not be parsed, see the logs for more information._") |
|
|
|
|
|
|
|
|
|
question = st.text_input("Ask a question", value=st.session_state.question, max_chars=100, on_change=reset_results) |
|
|
|
|
|
run_pressed = st.button("Run") |
|
|
|
|
|
run_query = ( |
|
run_pressed or question != st.session_state.question |
|
) |
|
|
|
|
|
if run_query and question: |
|
reset_results() |
|
st.session_state.question = question |
|
with st.spinner("π Running your pipeline"): |
|
try: |
|
st.session_state.results = query(pipeline, question) |
|
except JSONDecodeError as je: |
|
st.error( |
|
"π An error occurred reading the results. Is the document store working?" |
|
) |
|
except Exception as e: |
|
logging.exception(e) |
|
st.error("π An error occurred during the request.") |
|
|
|
|
|
|
|
if st.session_state.results: |
|
results = st.session_state.results |
|
|
|
if args.task == 'extractive': |
|
answers = results['answers'] |
|
for count, answer in enumerate(answers): |
|
if answer.answer: |
|
text, context = answer.answer, answer.context |
|
start_idx = context.find(text) |
|
end_idx = start_idx + len(text) |
|
st.write( |
|
f" Answer: {markdown(context[:start_idx] + str(annotation(body=text, label='ANSWER', background='#964448', color='#ffffff')) + context[end_idx:])}", |
|
unsafe_allow_html=True, |
|
) |
|
else: |
|
st.info( |
|
"π€ Haystack is unsure whether any of the documents contain an answer to your question. Try to reformulate it!" |
|
) |
|
elif args.task == 'rag': |
|
st.write(f" Answer: {results['results'][0]}") |
|
|
|
|
|
retrieved_documents = results['documents'] |
|
st.subheader("Retriever Results:") |
|
for document in retrieved_documents: |
|
st.write(f"Document Name: {document.meta['name']}") |
|
st.write(f"Score: {document.score}") |
|
st.write(f"Text: {document.content}") |
|
except SystemExit as e: |
|
|
|
|
|
|
|
os._exit(e.code) |