Spaces:
Running
Running
File size: 6,101 Bytes
df365ca 18069c2 bc25066 df365ca bc25066 df365ca 6f368e7 835a614 df365ca fcbea64 835a614 bc25066 835a614 df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 6f368e7 c76542a df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 df365ca bc25066 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
import os
import json
import fitz # PyMuPDF
import re
from tqdm import tqdm
from docx import Document
from PIL import Image
import pytesseract
import io
import torch
import chromadb
from sentence_transformers import SentenceTransformer, util
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import gradio as gr
# ---------------------------
# π Configuration
# ---------------------------
MANUALS_FOLDER = "./Manuals"
CHROMA_PATH = "./chroma_store"
COLLECTION_NAME = "manual_chunks"
CHUNK_SIZE = 750
CHUNK_OVERLAP = 100
MAX_CONTEXT_CHUNKS = 3
HF_MODEL = "meta-llama/Llama-3.1-8B-Instruct"
HF_TOKEN = os.environ.get("HF_TOKEN")
# ---------------------------
# π§Ή Helpers
# ---------------------------
def clean(text):
lines = text.splitlines()
return "\n".join(line.strip() for line in lines if line.strip())
def split_sentences(text):
return re.split(r'(?<=[.!?])\s+', text.strip())
def chunk_sentences(sentences, max_len=CHUNK_SIZE, overlap=CHUNK_OVERLAP):
chunks, chunk, length = [], [], 0
for sent in sentences:
tokens = len(sent.split())
if length + tokens > max_len and chunk:
chunks.append(" ".join(chunk))
chunk = chunk[-overlap:] if overlap else []
length = sum(len(s.split()) for s in chunk)
chunk.append(sent)
length += tokens
if chunk:
chunks.append(" ".join(chunk))
return chunks
def extract_text_from_pdf(path):
doc = fitz.open(path)
full_text = []
for page in doc:
text = page.get_text().strip()
if not text:
try:
pix = page.get_pixmap(dpi=300)
img_data = pix.tobytes("png")
img = Image.open(io.BytesIO(img_data))
text = pytesseract.image_to_string(img).strip()
except Exception:
text = ""
full_text.append(text)
return "\n".join(full_text)
def extract_text_from_docx(path):
doc = Document(path)
return "\n".join([para.text for para in doc.paragraphs if para.text.strip()])
def extract_metadata(filename):
name = filename.lower()
model = next((m for m in ["se3hd", "se3", "se4", "symbio", "explore", "integrity x", "integrity sl", "everest", "engage", "inspire", "discover", "95t", "95x", "95c", "95r", "97c"] if m in name), "unknown")
if "om" in name or "owner" in name:
doc_type = "owner manual"
elif "sm" in name or "service" in name:
doc_type = "service manual"
elif "assembly" in name:
doc_type = "assembly instructions"
elif "alert" in name:
doc_type = "installer alert"
elif "parts" in name:
doc_type = "parts manual"
elif "bulletin" in name:
doc_type = "service bulletin"
else:
doc_type = "unknown"
return model, doc_type
# ---------------------------
# π Build ChromaDB at Startup
# ---------------------------
def embed_all():
client = chromadb.PersistentClient(path=CHROMA_PATH)
if COLLECTION_NAME in [c.name for c in client.list_collections()]:
client.delete_collection(COLLECTION_NAME)
collection = client.create_collection(COLLECTION_NAME)
embedder = SentenceTransformer("all-MiniLM-L6-v2")
records = []
for fname in os.listdir(MANUALS_FOLDER):
path = os.path.join(MANUALS_FOLDER, fname)
if not fname.lower().endswith((".pdf", ".docx")):
continue
text = extract_text_from_pdf(path) if fname.endswith(".pdf") else extract_text_from_docx(path)
sents = split_sentences(clean(text))
chunks = chunk_sentences(sents)
model, doc_type = extract_metadata(fname)
for i, chunk in enumerate(chunks):
records.append({
"id": f"{fname}::chunk_{i+1}",
"text": chunk,
"metadata": {"source_file": fname, "model": model, "doc_type": doc_type}
})
for i in range(0, len(records), 16):
batch = records[i:i+16]
texts = [r["text"] for r in batch]
ids = [r["id"] for r in batch]
metas = [r["metadata"] for r in batch]
embeddings = embedder.encode(texts).tolist()
collection.add(documents=texts, ids=ids, metadatas=metas, embeddings=embeddings)
return collection, embedder
# ---------------------------
# π¬ Load HF Model
# ---------------------------
llm_pipe = None
if HF_TOKEN:
tokenizer = AutoTokenizer.from_pretrained(HF_MODEL, token=HF_TOKEN)
model = AutoModelForCausalLM.from_pretrained(HF_MODEL, token=HF_TOKEN, torch_dtype=torch.float32)
llm_pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=-1)
# ---------------------------
# π RAG Function
# ---------------------------
def run_query(question):
if not question.strip():
return "Please enter a question."
if not db or not embedder:
return "Chroma or embedder not ready."
q_embed = embedder.encode(question).tolist()
res = db.query(query_embeddings=[q_embed], n_results=MAX_CONTEXT_CHUNKS)
contexts = res["documents"][0]
prompt = """
You are a technical assistant.
Answer only using the context below.
Say 'I don't know' if not found.
"""
context_text = "\n\n".join(contexts)
final_prompt = prompt + f"Context:\n{context_text}\n\nQuestion: {question}\nAnswer:"
if llm_pipe:
result = llm_pipe(final_prompt, max_new_tokens=300)[0]['generated_text']
return result.split("Answer:")[-1].strip()
return "Model not loaded."
# ---------------------------
# π§ Init embeddings once
# ---------------------------
db, embedder = embed_all()
# ---------------------------
# ποΈ Gradio Interface
# ---------------------------
with gr.Blocks() as demo:
gr.Markdown("# π€ SmartManuals-AI: Ask Technical Questions about Your Manuals")
question = gr.Textbox(placeholder="e.g. How do I reset the treadmill console?", label="Enter Question")
submit = gr.Button("Get Answer")
output = gr.Textbox(label="Answer")
submit.click(fn=run_query, inputs=question, outputs=output)
demo.launch()
|