""" KnowledgeBridge Modal App Provides distributed computing capabilities for document processing and vector search """ import modal from typing import List, Dict, Any, Optional import os # Create Modal app app = modal.App("knowledgebridge-main") # Define the image with required dependencies image = ( modal.Image.debian_slim(python_version="3.11") .pip_install([ "fastapi[standard]", "numpy", "faiss-cpu", "PyPDF2", "pillow", "pytesseract", "requests", "scikit-learn", "sentence-transformers", "openai", "tiktoken" ]) .apt_install(["tesseract-ocr", "tesseract-ocr-eng", "poppler-utils"]) ) # Shared volume for storing vector indices volume = modal.Volume.from_name("knowledgebridge-storage", create_if_missing=True) @app.function( image=image, volumes={"/storage": volume}, timeout=300, memory=2048 ) def extract_text_from_documents(documents: List[Dict[str, Any]]) -> Dict[str, Any]: """ Extract text from documents using OCR and PDF parsing """ import json import base64 from io import BytesIO import PyPDF2 import pytesseract from PIL import Image results = [] for doc in documents: try: doc_id = doc.get('id', f"doc_{len(results)}") content_type = doc.get('contentType', 'text/plain') content = doc.get('content', '') extracted_text = "" if content_type == 'application/pdf': # Handle PDF content try: # Assume content is base64 encoded PDF pdf_data = base64.b64decode(content) pdf_reader = PyPDF2.PdfReader(BytesIO(pdf_data)) for page_num, page in enumerate(pdf_reader.pages): page_text = page.extract_text() extracted_text += f"Page {page_num + 1}:\n{page_text}\n\n" except Exception as pdf_error: extracted_text = f"PDF extraction failed: {str(pdf_error)}" elif content_type.startswith('image/'): # Handle image content with OCR try: image_data = base64.b64decode(content) image = Image.open(BytesIO(image_data)) extracted_text = pytesseract.image_to_string(image) except Exception as ocr_error: extracted_text = f"OCR extraction failed: {str(ocr_error)}" else: # Plain text or other formats extracted_text = content results.append({ 'id': doc_id, 'extracted_text': extracted_text, 'original_type': content_type, 'status': 'completed' }) except Exception as e: results.append({ 'id': doc.get('id', f"doc_{len(results)}"), 'extracted_text': "", 'original_type': doc.get('contentType', 'unknown'), 'status': 'failed', 'error': str(e) }) import hashlib task_id = f"extract_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" return { 'task_id': task_id, 'status': 'completed', 'results': results, 'processed_count': len(results) } @app.function( image=image, volumes={"/storage": volume}, timeout=600, memory=4096, cpu=2 ) def build_vector_index(documents: List[Dict[str, Any]], index_name: str = "main_index") -> Dict[str, Any]: """ Build FAISS vector index from documents """ import numpy as np import faiss import pickle import hashlib try: from sentence_transformers import SentenceTransformer # Load embedding model model = SentenceTransformer('all-MiniLM-L6-v2') # Extract texts and create embeddings texts = [] doc_metadata = [] for doc in documents: text = doc.get('content', doc.get('extracted_text', '')) if text and len(text.strip()) > 10: # Only process non-empty texts texts.append(text[:8000]) # Limit text length doc_metadata.append({ 'id': doc.get('id'), 'title': doc.get('title', 'Untitled'), 'source': doc.get('source', 'Unknown'), 'content': text }) if not texts: task_id = f"index_{index_name}_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" return { 'task_id': task_id, 'status': 'failed', 'error': 'No valid texts to index' } # Generate embeddings embeddings = model.encode(texts, show_progress_bar=False) embeddings = np.array(embeddings).astype('float32') # Create FAISS index dimension = embeddings.shape[1] index = faiss.IndexFlatIP(dimension) # Inner product for cosine similarity # Normalize embeddings for cosine similarity faiss.normalize_L2(embeddings) index.add(embeddings) # Try multiple storage locations with fallbacks storage_paths = ["/storage", "/tmp", "."] index_path = None metadata_path = None for storage_dir in storage_paths: try: os.makedirs(storage_dir, exist_ok=True) test_index_path = f"{storage_dir}/{index_name}.index" test_metadata_path = f"{storage_dir}/{index_name}_metadata.pkl" # Test write permissions test_file = f"{storage_dir}/test_write_{index_name}.tmp" with open(test_file, 'w') as f: f.write("test") os.remove(test_file) # If we get here, we can write to this directory index_path = test_index_path metadata_path = test_metadata_path print(f"Using storage directory: {storage_dir}") break except Exception as e: print(f"Cannot write to {storage_dir}: {e}") continue if not index_path: raise Exception("No writable storage directory found") print(f"Writing index to: {index_path}") faiss.write_index(index, index_path) print(f"Writing metadata to: {metadata_path}") with open(metadata_path, 'wb') as f: pickle.dump(doc_metadata, f) # Only commit volume if we used /storage if index_path.startswith("/storage"): volume.commit() task_id = f"index_{index_name}_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" return { 'task_id': task_id, 'status': 'completed', 'index_name': index_name, 'document_count': len(doc_metadata), 'dimension': dimension, 'index_path': index_path } except Exception as e: task_id = f"index_{index_name}_{hashlib.md5(str(documents).encode()).hexdigest()[:8]}" return { 'task_id': task_id, 'status': 'failed', 'error': str(e) } @app.function( image=image, volumes={"/storage": volume}, timeout=60, memory=2048 ) def vector_search(query: str, index_name: str = "main_index", max_results: int = 10) -> Dict[str, Any]: """ Perform vector search using FAISS index """ import numpy as np import faiss import pickle try: from sentence_transformers import SentenceTransformer # Load embedding model model = SentenceTransformer('all-MiniLM-L6-v2') # Try to find index in multiple storage locations storage_paths = ["/storage", "/tmp", "."] index_path = None metadata_path = None for storage_dir in storage_paths: test_index_path = f"{storage_dir}/{index_name}.index" test_metadata_path = f"{storage_dir}/{index_name}_metadata.pkl" if os.path.exists(test_index_path) and os.path.exists(test_metadata_path): index_path = test_index_path metadata_path = test_metadata_path print(f"Found index in: {storage_dir}") break if not index_path or not metadata_path: return { 'status': 'failed', 'error': f'Index {index_name} not found in any storage location. Please build index first.', 'results': [] } # Load FAISS index index = faiss.read_index(index_path) # Load metadata with open(metadata_path, 'rb') as f: doc_metadata = pickle.load(f) # Generate query embedding query_embedding = model.encode([query]) query_embedding = np.array(query_embedding).astype('float32') faiss.normalize_L2(query_embedding) # Search scores, indices = index.search(query_embedding, min(max_results, len(doc_metadata))) # Format results results = [] for i, (score, idx) in enumerate(zip(scores[0], indices[0])): if idx >= 0 and idx < len(doc_metadata): # Valid index doc = doc_metadata[idx] results.append({ 'id': doc['id'], 'title': doc['title'], 'content': doc['content'], 'source': doc['source'], 'relevanceScore': float(score), 'rank': i + 1, 'snippet': doc['content'][:200] + '...' if len(doc['content']) > 200 else doc['content'] }) return { 'status': 'completed', 'results': results, 'query': query, 'total_found': len(results) } except Exception as e: return { 'status': 'failed', 'error': str(e), 'results': [] } @app.function( image=image, timeout=300, memory=2048 ) def batch_process_documents(request: Dict[str, Any]) -> Dict[str, Any]: """ Process multiple documents in batch """ import hashlib try: documents = request.get('documents', []) operations = request.get('operations', ['extract_text']) task_id = f"batch_{hashlib.md5(str(request).encode()).hexdigest()[:8]}" results = { 'task_id': task_id, 'status': 'completed', 'operations_completed': [], 'document_count': len(documents) } # Extract text if requested if 'extract_text' in operations: extraction_result = extract_text_from_documents(documents) results['operations_completed'].append('extract_text') results['extraction_results'] = extraction_result.get('results', []) # Build index if requested if 'build_index' in operations: index_name = request.get('index_name', 'batch_index') index_result = build_vector_index(documents, index_name) results['operations_completed'].append('build_index') results['index_results'] = index_result return results except Exception as e: task_id = f"batch_{hashlib.md5(str(request).encode()).hexdigest()[:8]}" return { 'task_id': task_id, 'status': 'failed', 'error': str(e) } # Simple task status tracking (in-memory for demo) task_statuses = {} @app.function(timeout=30) def get_task_status(task_id: str) -> Dict[str, Any]: """ Get status of a processing task """ # In a real implementation, this would check a database # For now, return a simple status return { 'task_id': task_id, 'status': 'completed', # Simplified for demo 'progress': 100, 'message': 'Task completed successfully' } # Web endpoints using FastAPI from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from typing import List, Dict, Any, Optional import datetime # Pydantic models class VectorSearchRequest(BaseModel): query: str index_name: str = "main_index" max_results: int = 10 class DocumentRequest(BaseModel): documents: List[Dict[str, Any]] class IndexRequest(BaseModel): documents: List[Dict[str, Any]] index_name: str = "main_index" class BatchRequest(BaseModel): documents: List[Dict[str, Any]] operations: List[str] = ["extract_text"] index_name: str = "batch_index" web_app = FastAPI(title="KnowledgeBridge Modal API") @web_app.post("/vector-search") async def api_vector_search(request: VectorSearchRequest): try: result = vector_search.remote(request.query, request.index_name, request.max_results) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @web_app.post("/extract-text") async def api_extract_text(request: DocumentRequest): try: result = extract_text_from_documents.remote(request.documents) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @web_app.post("/build-index") async def api_build_index(request: IndexRequest): try: result = build_vector_index.remote(request.documents, request.index_name) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @web_app.post("/batch-process") async def api_batch_process(request: BatchRequest): try: result = batch_process_documents.remote({ "documents": request.documents, "operations": request.operations, "index_name": request.index_name }) return result except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @web_app.get("/task-status/{task_id}") async def api_task_status(task_id: str): try: return { 'task_id': task_id, 'status': 'completed', 'progress': 100, 'message': 'Task completed successfully' } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @web_app.get("/health") async def api_health(): return { 'status': 'healthy', 'service': 'KnowledgeBridge Modal App', 'version': '1.0.0', 'timestamp': datetime.datetime.now(datetime.timezone.utc).isoformat() } @app.function(image=image) @modal.asgi_app() def fastapi_app(): return web_app if __name__ == "__main__": print("KnowledgeBridge Modal App") print("Available functions:") print("- extract_text_from_documents") print("- build_vector_index") print("- vector_search") print("- batch_process_documents") print("- get_task_status")