import fs from 'fs'; import path from 'path'; import { modalClient } from './modal-client'; import { nebiusClient } from './nebius-client'; import { FileProcessor } from './file-upload'; import { storage } from './storage'; import { type Document, type InsertDocument } from '@shared/schema'; export interface ProcessingResult { success: boolean; extractedText?: string; embeddings?: number[]; modalTaskId?: string; error?: string; processingTime: number; } export interface BatchProcessingResult { success: boolean; processedCount: number; failedCount: number; results: Array<{ documentId: number; success: boolean; extractedText?: string; embeddings?: number[]; error?: string; }>; totalProcessingTime: number; } export class DocumentProcessor { private static instance: DocumentProcessor; static getInstance(): DocumentProcessor { if (!DocumentProcessor.instance) { DocumentProcessor.instance = new DocumentProcessor(); } return DocumentProcessor.instance; } /** * Process a single document using Modal for heavy workloads */ async processDocument( document: Document, operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text'] ): Promise { const startTime = Date.now(); try { let extractedText = document.content; let embeddings: number[] | undefined; let modalTaskId: string | undefined; // Step 1: Extract text if needed (for PDFs and images) if (operations.includes('extract_text') && document.filePath) { const textResult = await this.extractText(document); if (textResult.success) { extractedText = textResult.extractedText || document.content; modalTaskId = textResult.modalTaskId; } else { console.warn(`Text extraction failed for document ${document.id}: ${textResult.error}`); } } // Step 2: Generate embeddings if requested if (operations.includes('generate_embedding') && extractedText) { const embeddingResult = await this.generateEmbeddings(extractedText); if (embeddingResult.success) { embeddings = embeddingResult.embeddings; } else { console.warn(`Embedding generation failed for document ${document.id}: ${embeddingResult.error}`); } } const processingTime = Date.now() - startTime; return { success: true, extractedText, embeddings, modalTaskId, processingTime }; } catch (error) { const processingTime = Date.now() - startTime; return { success: false, error: error instanceof Error ? error.message : String(error), processingTime }; } } /** * Process multiple documents in batch using Modal's distributed computing */ async batchProcessDocuments( documents: Document[], operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text'] ): Promise { const startTime = Date.now(); const results: BatchProcessingResult['results'] = []; try { // Separate documents by processing requirements const documentsForModal = documents.filter(doc => doc.filePath && FileProcessor.requiresOCR(doc.mimeType || '') ); const documentsForLocal = documents.filter(doc => !doc.filePath || !FileProcessor.requiresOCR(doc.mimeType || '') ); // Process Modal-required documents in batch if (documentsForModal.length > 0 && operations.includes('extract_text')) { try { const modalResults = await this.batchExtractTextModal(documentsForModal); results.push(...modalResults); } catch (error) { console.error('Modal batch processing failed:', error); // Fall back to individual processing for (const doc of documentsForModal) { const result = await this.processDocument(doc, operations); results.push({ documentId: doc.id, success: result.success, extractedText: result.extractedText, embeddings: result.embeddings, error: result.error }); } } } // Process local documents for (const doc of documentsForLocal) { const result = await this.processDocument(doc, operations); results.push({ documentId: doc.id, success: result.success, extractedText: result.extractedText, embeddings: result.embeddings, error: result.error }); } const totalProcessingTime = Date.now() - startTime; const successCount = results.filter(r => r.success).length; const failedCount = results.length - successCount; return { success: true, processedCount: successCount, failedCount, results, totalProcessingTime }; } catch (error) { const totalProcessingTime = Date.now() - startTime; return { success: false, processedCount: 0, failedCount: documents.length, results: documents.map(doc => ({ documentId: doc.id, success: false, error: error instanceof Error ? error.message : String(error) })), totalProcessingTime }; } } /** * Extract text from a document using Modal for PDFs/images or direct reading for text files */ private async extractText(document: Document): Promise<{ success: boolean; extractedText?: string; modalTaskId?: string; error?: string; }> { if (!document.filePath) { return { success: true, extractedText: document.content }; } const mimeType = document.mimeType || ''; try { // For text files, read directly if (FileProcessor.isTextFile(mimeType)) { const content = await FileProcessor.readTextFile(document.filePath); return { success: true, extractedText: content }; } // For PDFs and images, use Modal if (FileProcessor.requiresOCR(mimeType)) { return await this.extractTextModal(document); } // Fallback: return existing content return { success: true, extractedText: document.content }; } catch (error) { return { success: false, error: error instanceof Error ? error.message : String(error) }; } } /** * Extract text using Modal for OCR-required files */ private async extractTextModal(document: Document): Promise<{ success: boolean; extractedText?: string; modalTaskId?: string; error?: string; }> { try { if (!document.filePath) { throw new Error('No file path provided for Modal processing'); } // Read file and convert to base64 const fileBuffer = await fs.promises.readFile(document.filePath); const base64Content = fileBuffer.toString('base64'); // Prepare document for Modal const modalDocument = { id: document.id.toString(), content: base64Content, contentType: document.mimeType || 'application/octet-stream' }; // Call Modal extract-text endpoint const result = await modalClient.extractTextFromDocuments([modalDocument]); if (result.status === 'completed' && result.results?.length > 0) { const extractionResult = result.results[0]; if (extractionResult.status === 'completed') { return { success: true, extractedText: extractionResult.extracted_text, modalTaskId: result.task_id }; } else { return { success: false, error: extractionResult.error || 'Modal extraction failed' }; } } else { return { success: false, error: result.error || 'Modal processing failed' }; } } catch (error) { console.error('Modal text extraction failed:', error); return { success: false, error: error instanceof Error ? error.message : String(error) }; } } /** * Batch extract text using Modal */ private async batchExtractTextModal(documents: Document[]): Promise> { const modalDocuments = await Promise.all( documents.map(async (doc) => { if (!doc.filePath) return null; try { const fileBuffer = await fs.promises.readFile(doc.filePath); return { id: doc.id.toString(), content: fileBuffer.toString('base64'), contentType: doc.mimeType || 'application/octet-stream' }; } catch (error) { console.error(`Failed to read file for document ${doc.id}:`, error); return null; } }) ); const validDocuments = modalDocuments.filter(doc => doc !== null) as any[]; if (validDocuments.length === 0) { return documents.map(doc => ({ documentId: doc.id, success: false, error: 'No valid documents for processing' })); } try { const batchResult = await modalClient.batchProcessDocuments({ documents: validDocuments, modelName: 'text-embedding-3-small', batchSize: Math.min(validDocuments.length, 10) }); if (batchResult.status === 'completed' && batchResult.extraction_results) { return batchResult.extraction_results.map((result: any) => ({ documentId: parseInt(result.id), success: result.status === 'completed', extractedText: result.extracted_text, error: result.error })); } else { throw new Error(batchResult.error || 'Batch processing failed'); } } catch (error) { console.error('Modal batch processing failed:', error); return documents.map(doc => ({ documentId: doc.id, success: false, error: error instanceof Error ? error.message : String(error) })); } } /** * Generate embeddings using Nebius AI */ private async generateEmbeddings(text: string): Promise<{ success: boolean; embeddings?: number[]; error?: string; }> { try { // Truncate text if too long (most embedding models have token limits) const maxLength = 8000; // Conservative limit const truncatedText = text.length > maxLength ? text.substring(0, maxLength) : text; const result = await nebiusClient.generateEmbeddings(truncatedText); if (result.success && result.embeddings) { return { success: true, embeddings: result.embeddings }; } else { return { success: false, error: result.error || 'Embedding generation failed' }; } } catch (error) { return { success: false, error: error instanceof Error ? error.message : String(error) }; } } /** * Build vector index using Modal */ async buildVectorIndex( documents: Document[], indexName = 'research_papers_clean_v2' ): Promise<{ success: boolean; indexName?: string; documentCount?: number; error?: string; }> { try { const modalDocuments = documents.map(doc => ({ id: doc.id.toString(), content: doc.content, title: doc.title, source: doc.source })); const result = await modalClient.buildVectorIndex(modalDocuments, { indexName, dimension: 1536, // Standard OpenAI embedding dimension indexType: 'IVF', nlist: Math.min(100, Math.max(10, Math.floor(documents.length / 10))) }); if (result.status === 'completed') { return { success: true, indexName: result.index_name, documentCount: result.document_count }; } else { return { success: false, error: result.error || 'Index building failed' }; } } catch (error) { return { success: false, error: error instanceof Error ? error.message : String(error) }; } } /** * Search vector index using Modal */ async searchVectorIndex( query: string, indexName = 'research_papers_clean_v2', maxResults = 10 ): Promise<{ success: boolean; results?: Array<{ id: string; title: string; content: string; source: string; relevanceScore: number; rank: number; snippet: string; }>; error?: string; }> { try { const result = await modalClient.vectorSearch(query, indexName, maxResults); if (result.status === 'completed') { // Enrich vector search results with complete document data from database const enrichedResults = await Promise.all( result.results.map(async (vectorResult: any) => { try { // Get complete document data from database using the ID const dbDocument = await storage.getDocument(parseInt(vectorResult.id)); if (dbDocument) { // Merge vector search metadata with database document // Ensure the URL field is preserved from the database const enriched = { id: dbDocument.id, title: dbDocument.title, content: dbDocument.content, source: dbDocument.source, sourceType: dbDocument.sourceType, url: dbDocument.url, // Explicitly preserve URL metadata: dbDocument.metadata, createdAt: dbDocument.createdAt, // Add vector search specific fields relevanceScore: vectorResult.relevanceScore, rank: vectorResult.rank, snippet: vectorResult.snippet || dbDocument.content.substring(0, 200) + '...' }; return enriched; } else { // Fallback to vector result if database document not found return vectorResult; } } catch (error) { console.warn(`Failed to enrich vector result for ID ${vectorResult.id}:`, error); return vectorResult; } }) ); return { success: true, results: enrichedResults }; } else { return { success: false, error: result.error || 'Vector search failed' }; } } catch (error) { return { success: false, error: error instanceof Error ? error.message : String(error) }; } } } export const documentProcessor = DocumentProcessor.getInstance();