|
""" |
|
KnowledgeBridge Modal App |
|
Provides distributed computing capabilities for document processing and vector search |
|
""" |
|
import modal |
|
from typing import List, Dict, Any, Optional |
|
import os |
|
|
|
|
|
app = modal.App("knowledgebridge-main") |
|
|
|
|
|
image = ( |
|
modal.Image.debian_slim(python_version="3.11") |
|
.pip_install([ |
|
"fastapi[standard]", |
|
"numpy", |
|
"faiss-cpu", |
|
"PyPDF2", |
|
"pillow", |
|
"pytesseract", |
|
"requests", |
|
"scikit-learn", |
|
"sentence-transformers", |
|
"openai", |
|
"tiktoken" |
|
]) |
|
.apt_install(["tesseract-ocr", "tesseract-ocr-eng", "poppler-utils"]) |
|
) |
|
|
|
|
|
volume = modal.Volume.from_name("knowledgebridge-storage", create_if_missing=True) |
|
|
|
@app.function( |
|
image=image, |
|
volumes={"/storage": volume}, |
|
timeout=300, |
|
memory=2048 |
|
) |
|
def extract_text_from_documents(documents: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
""" |
|
Extract text from documents using OCR and PDF parsing |
|
""" |
|
import json |
|
import base64 |
|
from io import BytesIO |
|
import PyPDF2 |
|
import pytesseract |
|
from PIL import Image |
|
|
|
results = [] |
|
|
|
for doc in documents: |
|
try: |
|
doc_id = doc.get('id', f"doc_{len(results)}") |
|
content_type = doc.get('contentType', 'text/plain') |
|
content = doc.get('content', '') |
|
|
|
extracted_text = "" |
|
|
|
if content_type == 'application/pdf': |
|
|
|
try: |
|
|
|
pdf_data = base64.b64decode(content) |
|
pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_data)) |
|
|
|
for page_num, page in enumerate(pdf_reader.pages): |
|
page_text = page.extract_text() |
|
extracted_text += f"Page {page_num + 1}:\n{page_text}\n\n" |
|
|
|
except Exception as pdf_error: |
|
extracted_text = f"PDF extraction failed: {str(pdf_error)}" |
|
|
|
elif content_type.startswith('image/'): |
|
|
|
try: |
|
image_data = base64.b64decode(content) |
|
image = Image.open(BytesIO(image_data)) |
|
extracted_text = pytesseract.image_to_string(image) |
|
except Exception as ocr_error: |
|
extracted_text = f"OCR extraction failed: {str(ocr_error)}" |
|
|
|
else: |
|
|
|
extracted_text = content |
|
|
|
results.append({ |
|
'id': doc_id, |
|
'extracted_text': extracted_text, |
|
'original_type': content_type, |
|
'status': 'completed' |
|
}) |
|
|
|
except Exception as e: |
|
results.append({ |
|
'id': doc.get('id', f"doc_{len(results)}"), |
|
'extracted_text': "", |
|
'original_type': doc.get('contentType', 'unknown'), |
|
'status': 'failed', |
|
'error': str(e) |
|
}) |
|
|
|
import hashlib |
|
task_id = f"extract_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" |
|
|
|
return { |
|
'task_id': task_id, |
|
'status': 'completed', |
|
'results': results, |
|
'processed_count': len(results) |
|
} |
|
|
|
@app.function( |
|
image=image, |
|
volumes={"/storage": volume}, |
|
timeout=600, |
|
memory=4096, |
|
cpu=2 |
|
) |
|
def build_vector_index(documents: List[Dict[str, Any]], index_name: str = "main_index") -> Dict[str, Any]: |
|
""" |
|
Build FAISS vector index from documents |
|
""" |
|
import numpy as np |
|
import faiss |
|
import pickle |
|
import hashlib |
|
|
|
try: |
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
texts = [] |
|
doc_metadata = [] |
|
|
|
for doc in documents: |
|
text = doc.get('content', doc.get('extracted_text', '')) |
|
if text and len(text.strip()) > 10: |
|
texts.append(text[:8000]) |
|
doc_metadata.append({ |
|
'id': doc.get('id'), |
|
'title': doc.get('title', 'Untitled'), |
|
'source': doc.get('source', 'Unknown'), |
|
'content': text |
|
}) |
|
|
|
if not texts: |
|
task_id = f"index_{index_name}_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" |
|
return { |
|
'task_id': task_id, |
|
'status': 'failed', |
|
'error': 'No valid texts to index' |
|
} |
|
|
|
|
|
embeddings = model.encode(texts, show_progress_bar=False) |
|
embeddings = np.array(embeddings).astype('float32') |
|
|
|
|
|
dimension = embeddings.shape[1] |
|
index = faiss.IndexFlatIP(dimension) |
|
|
|
|
|
faiss.normalize_L2(embeddings) |
|
index.add(embeddings) |
|
|
|
|
|
storage_paths = ["/storage", "/tmp", "."] |
|
index_path = None |
|
metadata_path = None |
|
|
|
for storage_dir in storage_paths: |
|
try: |
|
os.makedirs(storage_dir, exist_ok=True) |
|
test_index_path = f"{storage_dir}/{index_name}.index" |
|
test_metadata_path = f"{storage_dir}/{index_name}_metadata.pkl" |
|
|
|
|
|
test_file = f"{storage_dir}/test_write_{index_name}.tmp" |
|
with open(test_file, 'w') as f: |
|
f.write("test") |
|
os.remove(test_file) |
|
|
|
|
|
index_path = test_index_path |
|
metadata_path = test_metadata_path |
|
print(f"Using storage directory: {storage_dir}") |
|
break |
|
|
|
except Exception as e: |
|
print(f"Cannot write to {storage_dir}: {e}") |
|
continue |
|
|
|
if not index_path: |
|
raise Exception("No writable storage directory found") |
|
|
|
print(f"Writing index to: {index_path}") |
|
faiss.write_index(index, index_path) |
|
|
|
print(f"Writing metadata to: {metadata_path}") |
|
with open(metadata_path, 'wb') as f: |
|
pickle.dump(doc_metadata, f) |
|
|
|
|
|
if index_path.startswith("/storage"): |
|
volume.commit() |
|
|
|
task_id = f"index_{index_name}_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" |
|
return { |
|
'task_id': task_id, |
|
'status': 'completed', |
|
'index_name': index_name, |
|
'document_count': len(doc_metadata), |
|
'dimension': dimension, |
|
'index_path': index_path |
|
} |
|
|
|
except Exception as e: |
|
task_id = f"index_{index_name}_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" |
|
return { |
|
'task_id': task_id, |
|
'status': 'failed', |
|
'error': str(e) |
|
} |
|
|
|
@app.function( |
|
image=image, |
|
volumes={"/storage": volume}, |
|
timeout=60, |
|
memory=2048 |
|
) |
|
def vector_search(query: str, index_name: str = "main_index", max_results: int = 10) -> Dict[str, Any]: |
|
""" |
|
Perform vector search using FAISS index |
|
""" |
|
import numpy as np |
|
import faiss |
|
import pickle |
|
|
|
try: |
|
from sentence_transformers import SentenceTransformer |
|
|
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
storage_paths = ["/storage", "/tmp", "."] |
|
index_path = None |
|
metadata_path = None |
|
|
|
for storage_dir in storage_paths: |
|
test_index_path = f"{storage_dir}/{index_name}.index" |
|
test_metadata_path = f"{storage_dir}/{index_name}_metadata.pkl" |
|
|
|
if os.path.exists(test_index_path) and os.path.exists(test_metadata_path): |
|
index_path = test_index_path |
|
metadata_path = test_metadata_path |
|
print(f"Found index in: {storage_dir}") |
|
break |
|
|
|
if not index_path or not metadata_path: |
|
return { |
|
'status': 'failed', |
|
'error': f'Index {index_name} not found in any storage location. Please build index first.', |
|
'results': [] |
|
} |
|
|
|
|
|
index = faiss.read_index(index_path) |
|
|
|
|
|
with open(metadata_path, 'rb') as f: |
|
doc_metadata = pickle.load(f) |
|
|
|
|
|
query_embedding = model.encode([query]) |
|
query_embedding = np.array(query_embedding).astype('float32') |
|
faiss.normalize_L2(query_embedding) |
|
|
|
|
|
scores, indices = index.search(query_embedding, min(max_results, len(doc_metadata))) |
|
|
|
|
|
results = [] |
|
for i, (score, idx) in enumerate(zip(scores[0], indices[0])): |
|
if idx >= 0 and idx < len(doc_metadata): |
|
doc = doc_metadata[idx] |
|
results.append({ |
|
'id': doc['id'], |
|
'title': doc['title'], |
|
'content': doc['content'], |
|
'source': doc['source'], |
|
'relevanceScore': float(score), |
|
'rank': i + 1, |
|
'snippet': doc['content'][:200] + '...' if len(doc['content']) > 200 else doc['content'] |
|
}) |
|
|
|
return { |
|
'status': 'completed', |
|
'results': results, |
|
'query': query, |
|
'total_found': len(results) |
|
} |
|
|
|
except Exception as e: |
|
return { |
|
'status': 'failed', |
|
'error': str(e), |
|
'results': [] |
|
} |
|
|
|
@app.function( |
|
image=image, |
|
timeout=300, |
|
memory=2048 |
|
) |
|
def batch_process_documents(request: Dict[str, Any]) -> Dict[str, Any]: |
|
""" |
|
Process multiple documents in batch |
|
""" |
|
import hashlib |
|
|
|
try: |
|
documents = request.get('documents', []) |
|
operations = request.get('operations', ['extract_text']) |
|
|
|
task_id = f"batch_{hashlib.md5(str(request).encode()).hexdigest()[:8]}" |
|
results = { |
|
'task_id': task_id, |
|
'status': 'completed', |
|
'operations_completed': [], |
|
'document_count': len(documents) |
|
} |
|
|
|
|
|
if 'extract_text' in operations: |
|
extraction_result = extract_text_from_documents(documents) |
|
results['operations_completed'].append('extract_text') |
|
results['extraction_results'] = extraction_result.get('results', []) |
|
|
|
|
|
if 'build_index' in operations: |
|
index_name = request.get('index_name', 'batch_index') |
|
index_result = build_vector_index(documents, index_name) |
|
results['operations_completed'].append('build_index') |
|
results['index_results'] = index_result |
|
|
|
return results |
|
|
|
except Exception as e: |
|
task_id = f"batch_{hashlib.md5(str(request).encode()).hexdigest()[:8]}" |
|
return { |
|
'task_id': task_id, |
|
'status': 'failed', |
|
'error': str(e) |
|
} |
|
|
|
|
|
task_statuses = {} |
|
|
|
@app.function(timeout=30) |
|
def get_task_status(task_id: str) -> Dict[str, Any]: |
|
""" |
|
Get status of a processing task |
|
""" |
|
|
|
|
|
return { |
|
'task_id': task_id, |
|
'status': 'completed', |
|
'progress': 100, |
|
'message': 'Task completed successfully' |
|
} |
|
|
|
|
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.responses import JSONResponse |
|
from pydantic import BaseModel |
|
from typing import List, Dict, Any, Optional |
|
import datetime |
|
|
|
|
|
class VectorSearchRequest(BaseModel): |
|
query: str |
|
index_name: str = "main_index" |
|
max_results: int = 10 |
|
|
|
class DocumentRequest(BaseModel): |
|
documents: List[Dict[str, Any]] |
|
|
|
class IndexRequest(BaseModel): |
|
documents: List[Dict[str, Any]] |
|
index_name: str = "main_index" |
|
|
|
class BatchRequest(BaseModel): |
|
documents: List[Dict[str, Any]] |
|
operations: List[str] = ["extract_text"] |
|
index_name: str = "batch_index" |
|
|
|
web_app = FastAPI(title="KnowledgeBridge Modal API") |
|
|
|
@web_app.post("/vector-search") |
|
async def api_vector_search(request: VectorSearchRequest): |
|
try: |
|
result = vector_search.remote(request.query, request.index_name, request.max_results) |
|
return result |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@web_app.post("/extract-text") |
|
async def api_extract_text(request: DocumentRequest): |
|
try: |
|
result = extract_text_from_documents.remote(request.documents) |
|
return result |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@web_app.post("/build-index") |
|
async def api_build_index(request: IndexRequest): |
|
try: |
|
result = build_vector_index.remote(request.documents, request.index_name) |
|
return result |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@web_app.post("/batch-process") |
|
async def api_batch_process(request: BatchRequest): |
|
try: |
|
result = batch_process_documents.remote({ |
|
"documents": request.documents, |
|
"operations": request.operations, |
|
"index_name": request.index_name |
|
}) |
|
return result |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@web_app.get("/task-status/{task_id}") |
|
async def api_task_status(task_id: str): |
|
try: |
|
return { |
|
'task_id': task_id, |
|
'status': 'completed', |
|
'progress': 100, |
|
'message': 'Task completed successfully' |
|
} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@web_app.get("/health") |
|
async def api_health(): |
|
return { |
|
'status': 'healthy', |
|
'service': 'KnowledgeBridge Modal App', |
|
'version': '1.0.0', |
|
'timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat() |
|
} |
|
|
|
@app.function(image=image) |
|
@modal.asgi_app() |
|
def fastapi_app(): |
|
return web_app |
|
|
|
if __name__ == "__main__": |
|
print("KnowledgeBridge Modal App") |
|
print("Available functions:") |
|
print("- extract_text_from_documents") |
|
print("- build_vector_index") |
|
print("- vector_search") |
|
print("- batch_process_documents") |
|
print("- get_task_status") |