Spaces:
Running
Running
import os | |
import json | |
import fitz # PyMuPDF | |
import pytesseract | |
from PIL import Image | |
import io | |
import nltk | |
import chromadb | |
from tqdm import tqdm | |
from sentence_transformers import SentenceTransformer | |
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
import gradio as gr | |
# --------------------------- | |
# π¦ Paths and Constants | |
# --------------------------- | |
MANUALS_DIR = "./Manuals" | |
CHROMA_PATH = "./chroma_store" | |
COLLECTION_NAME = "manual_chunks" | |
# Ensure NLTK punkt is available | |
nltk.download("punkt") | |
from nltk.tokenize import sent_tokenize | |
# --------------------------- | |
# π§Ό Text cleaning utilities | |
# --------------------------- | |
def clean(text): | |
return "\n".join([line.strip() for line in text.splitlines() if line.strip()]) | |
def split_sentences(text): | |
try: | |
return sent_tokenize(text) | |
except Exception as e: | |
print("[Tokenizer Error]", e, "\nFalling back to simple split.") | |
return text.split(". ") | |
# --------------------------- | |
# π PDF and DOCX extraction | |
# --------------------------- | |
def extract_pdf_text(pdf_path): | |
doc = fitz.open(pdf_path) | |
pages = [] | |
for i, page in enumerate(doc): | |
text = page.get_text().strip() | |
if not text: | |
try: | |
pix = page.get_pixmap(dpi=300) | |
img = Image.open(io.BytesIO(pix.tobytes("png"))) | |
text = pytesseract.image_to_string(img) | |
except pytesseract.TesseractNotFoundError: | |
print("β Tesseract not found. Skipping OCR for page.") | |
text = "" | |
pages.append((i + 1, text)) | |
return pages | |
# --------------------------- | |
# π§ Embed text using MiniLM | |
# --------------------------- | |
def embed_all(): | |
client = chromadb.PersistentClient(path=CHROMA_PATH) | |
if COLLECTION_NAME in [c.name for c in client.list_collections()]: | |
client.delete_collection(COLLECTION_NAME) | |
collection = client.create_collection(COLLECTION_NAME) | |
embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
chunk_id = 0 | |
for fname in os.listdir(MANUALS_DIR): | |
fpath = os.path.join(MANUALS_DIR, fname) | |
if fname.lower().endswith(".pdf"): | |
pages = extract_pdf_text(fpath) | |
for page_num, text in pages: | |
sents = split_sentences(clean(text)) | |
for i in range(0, len(sents), 5): | |
chunk = " ".join(sents[i:i + 5]) | |
if chunk.strip(): | |
collection.add( | |
documents=[chunk], | |
metadatas=[{"source": fname, "page": page_num}], | |
ids=[f"{fname}-{page_num}-{i}-{chunk_id}"] | |
) | |
chunk_id += 1 | |
print(f"β Embedded {chunk_id} chunks.") | |
return collection, embedder | |
# --------------------------- | |
# π€ Load model | |
# --------------------------- | |
def load_llm(): | |
model_id = "meta-llama/Llama-3.1-8B-Instruct" | |
token = os.environ.get("HF_TOKEN") | |
tokenizer = AutoTokenizer.from_pretrained(model_id, token=token) | |
model = AutoModelForCausalLM.from_pretrained( | |
model_id, token=token, torch_dtype=None, device_map="auto" | |
) | |
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512) | |
return pipe, tokenizer | |
# --------------------------- | |
# β Ask a question | |
# --------------------------- | |
def ask_question(question, db, embedder, pipe, tokenizer): | |
results = db.query(query_texts=[question], n_results=5) | |
context = "\n\n".join(results["documents"][0]) | |
prompt = f""" | |
<|begin_of_text|><|start_header_id|>system<|end_header_id|> | |
You are a helpful assistant that answers questions from technical manuals using only the provided context. | |
<context> | |
{context} | |
</context> | |
<|start_header_id|>user<|end_header_id|> | |
{question}<|start_header_id|>assistant<|end_header_id|> | |
""" | |
out = pipe(prompt)[0]["generated_text"] | |
final = out.split("<|start_header_id|>assistant<|end_header_id|>")[-1].strip() | |
return final | |
# --------------------------- | |
# π Build interface | |
# --------------------------- | |
with gr.Blocks() as demo: | |
gr.Markdown("# π€ SmartManuals-AI (Hugging Face Space Edition)") | |
with gr.Row(): | |
qbox = gr.Textbox(label="Ask a Question", placeholder="e.g. How do I access diagnostics on the SE3 console?") | |
submit = gr.Button("π Ask") | |
abox = gr.Textbox(label="Answer", lines=8) | |
db, embedder = embed_all() | |
pipe, tokenizer = load_llm() | |
submit.click(fn=lambda q: ask_question(q, db, embedder, pipe, tokenizer), inputs=qbox, outputs=abox) | |
# For Hugging Face Spaces | |
if __name__ == "__main__": | |
demo.launch() | |