|
import fs from 'fs'; |
|
import path from 'path'; |
|
import { modalClient } from './modal-client'; |
|
import { nebiusClient } from './nebius-client'; |
|
import { FileProcessor } from './file-upload'; |
|
import { storage } from './storage'; |
|
import { type Document, type InsertDocument } from '@shared/schema'; |
|
|
|
export interface ProcessingResult { |
|
success: boolean; |
|
extractedText?: string; |
|
embeddings?: number[]; |
|
modalTaskId?: string; |
|
error?: string; |
|
processingTime: number; |
|
} |
|
|
|
export interface BatchProcessingResult { |
|
success: boolean; |
|
processedCount: number; |
|
failedCount: number; |
|
results: Array<{ |
|
documentId: number; |
|
success: boolean; |
|
extractedText?: string; |
|
embeddings?: number[]; |
|
error?: string; |
|
}>; |
|
totalProcessingTime: number; |
|
} |
|
|
|
export class DocumentProcessor { |
|
private static instance: DocumentProcessor; |
|
|
|
static getInstance(): DocumentProcessor { |
|
if (!DocumentProcessor.instance) { |
|
DocumentProcessor.instance = new DocumentProcessor(); |
|
} |
|
return DocumentProcessor.instance; |
|
} |
|
|
|
|
|
|
|
|
|
async processDocument( |
|
document: Document, |
|
operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text'] |
|
): Promise<ProcessingResult> { |
|
const startTime = Date.now(); |
|
|
|
try { |
|
let extractedText = document.content; |
|
let embeddings: number[] | undefined; |
|
let modalTaskId: string | undefined; |
|
|
|
|
|
if (operations.includes('extract_text') && document.filePath) { |
|
const textResult = await this.extractText(document); |
|
if (textResult.success) { |
|
extractedText = textResult.extractedText || document.content; |
|
modalTaskId = textResult.modalTaskId; |
|
} else { |
|
console.warn(`Text extraction failed for document ${document.id}: ${textResult.error}`); |
|
} |
|
} |
|
|
|
|
|
if (operations.includes('generate_embedding') && extractedText) { |
|
const embeddingResult = await this.generateEmbeddings(extractedText); |
|
if (embeddingResult.success) { |
|
embeddings = embeddingResult.embeddings; |
|
} else { |
|
console.warn(`Embedding generation failed for document ${document.id}: ${embeddingResult.error}`); |
|
} |
|
} |
|
|
|
const processingTime = Date.now() - startTime; |
|
|
|
return { |
|
success: true, |
|
extractedText, |
|
embeddings, |
|
modalTaskId, |
|
processingTime |
|
}; |
|
|
|
} catch (error) { |
|
const processingTime = Date.now() - startTime; |
|
return { |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error), |
|
processingTime |
|
}; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
async batchProcessDocuments( |
|
documents: Document[], |
|
operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text'] |
|
): Promise<BatchProcessingResult> { |
|
const startTime = Date.now(); |
|
const results: BatchProcessingResult['results'] = []; |
|
|
|
try { |
|
|
|
const documentsForModal = documents.filter(doc => |
|
doc.filePath && FileProcessor.requiresOCR(doc.mimeType || '') |
|
); |
|
|
|
const documentsForLocal = documents.filter(doc => |
|
!doc.filePath || !FileProcessor.requiresOCR(doc.mimeType || '') |
|
); |
|
|
|
|
|
if (documentsForModal.length > 0 && operations.includes('extract_text')) { |
|
try { |
|
const modalResults = await this.batchExtractTextModal(documentsForModal); |
|
results.push(...modalResults); |
|
} catch (error) { |
|
console.error('Modal batch processing failed:', error); |
|
|
|
for (const doc of documentsForModal) { |
|
const result = await this.processDocument(doc, operations); |
|
results.push({ |
|
documentId: doc.id, |
|
success: result.success, |
|
extractedText: result.extractedText, |
|
embeddings: result.embeddings, |
|
error: result.error |
|
}); |
|
} |
|
} |
|
} |
|
|
|
|
|
for (const doc of documentsForLocal) { |
|
const result = await this.processDocument(doc, operations); |
|
results.push({ |
|
documentId: doc.id, |
|
success: result.success, |
|
extractedText: result.extractedText, |
|
embeddings: result.embeddings, |
|
error: result.error |
|
}); |
|
} |
|
|
|
const totalProcessingTime = Date.now() - startTime; |
|
const successCount = results.filter(r => r.success).length; |
|
const failedCount = results.length - successCount; |
|
|
|
return { |
|
success: true, |
|
processedCount: successCount, |
|
failedCount, |
|
results, |
|
totalProcessingTime |
|
}; |
|
|
|
} catch (error) { |
|
const totalProcessingTime = Date.now() - startTime; |
|
return { |
|
success: false, |
|
processedCount: 0, |
|
failedCount: documents.length, |
|
results: documents.map(doc => ({ |
|
documentId: doc.id, |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error) |
|
})), |
|
totalProcessingTime |
|
}; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
private async extractText(document: Document): Promise<{ |
|
success: boolean; |
|
extractedText?: string; |
|
modalTaskId?: string; |
|
error?: string; |
|
}> { |
|
if (!document.filePath) { |
|
return { success: true, extractedText: document.content }; |
|
} |
|
|
|
const mimeType = document.mimeType || ''; |
|
|
|
try { |
|
|
|
if (FileProcessor.isTextFile(mimeType)) { |
|
const content = await FileProcessor.readTextFile(document.filePath); |
|
return { success: true, extractedText: content }; |
|
} |
|
|
|
|
|
if (FileProcessor.requiresOCR(mimeType)) { |
|
return await this.extractTextModal(document); |
|
} |
|
|
|
|
|
return { success: true, extractedText: document.content }; |
|
|
|
} catch (error) { |
|
return { |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error) |
|
}; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
private async extractTextModal(document: Document): Promise<{ |
|
success: boolean; |
|
extractedText?: string; |
|
modalTaskId?: string; |
|
error?: string; |
|
}> { |
|
try { |
|
if (!document.filePath) { |
|
throw new Error('No file path provided for Modal processing'); |
|
} |
|
|
|
|
|
const fileBuffer = await fs.promises.readFile(document.filePath); |
|
const base64Content = fileBuffer.toString('base64'); |
|
|
|
|
|
const modalDocument = { |
|
id: document.id.toString(), |
|
content: base64Content, |
|
contentType: document.mimeType || 'application/octet-stream' |
|
}; |
|
|
|
|
|
const result = await modalClient.extractTextFromDocuments([modalDocument]); |
|
|
|
if (result.status === 'completed' && result.results?.length > 0) { |
|
const extractionResult = result.results[0]; |
|
if (extractionResult.status === 'completed') { |
|
return { |
|
success: true, |
|
extractedText: extractionResult.extracted_text, |
|
modalTaskId: result.task_id |
|
}; |
|
} else { |
|
return { |
|
success: false, |
|
error: extractionResult.error || 'Modal extraction failed' |
|
}; |
|
} |
|
} else { |
|
return { |
|
success: false, |
|
error: result.error || 'Modal processing failed' |
|
}; |
|
} |
|
|
|
} catch (error) { |
|
console.error('Modal text extraction failed:', error); |
|
return { |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error) |
|
}; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
private async batchExtractTextModal(documents: Document[]): Promise<Array<{ |
|
documentId: number; |
|
success: boolean; |
|
extractedText?: string; |
|
error?: string; |
|
}>> { |
|
const modalDocuments = await Promise.all( |
|
documents.map(async (doc) => { |
|
if (!doc.filePath) return null; |
|
|
|
try { |
|
const fileBuffer = await fs.promises.readFile(doc.filePath); |
|
return { |
|
id: doc.id.toString(), |
|
content: fileBuffer.toString('base64'), |
|
contentType: doc.mimeType || 'application/octet-stream' |
|
}; |
|
} catch (error) { |
|
console.error(`Failed to read file for document ${doc.id}:`, error); |
|
return null; |
|
} |
|
}) |
|
); |
|
|
|
const validDocuments = modalDocuments.filter(doc => doc !== null) as any[]; |
|
|
|
if (validDocuments.length === 0) { |
|
return documents.map(doc => ({ |
|
documentId: doc.id, |
|
success: false, |
|
error: 'No valid documents for processing' |
|
})); |
|
} |
|
|
|
try { |
|
const batchResult = await modalClient.batchProcessDocuments({ |
|
documents: validDocuments, |
|
modelName: 'text-embedding-3-small', |
|
batchSize: Math.min(validDocuments.length, 10) |
|
}); |
|
|
|
if (batchResult.status === 'completed' && batchResult.extraction_results) { |
|
return batchResult.extraction_results.map((result: any) => ({ |
|
documentId: parseInt(result.id), |
|
success: result.status === 'completed', |
|
extractedText: result.extracted_text, |
|
error: result.error |
|
})); |
|
} else { |
|
throw new Error(batchResult.error || 'Batch processing failed'); |
|
} |
|
|
|
} catch (error) { |
|
console.error('Modal batch processing failed:', error); |
|
return documents.map(doc => ({ |
|
documentId: doc.id, |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error) |
|
})); |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
private async generateEmbeddings(text: string): Promise<{ |
|
success: boolean; |
|
embeddings?: number[]; |
|
error?: string; |
|
}> { |
|
try { |
|
|
|
const maxLength = 8000; |
|
const truncatedText = text.length > maxLength ? text.substring(0, maxLength) : text; |
|
|
|
const result = await nebiusClient.generateEmbeddings(truncatedText); |
|
|
|
if (result.success && result.embeddings) { |
|
return { |
|
success: true, |
|
embeddings: result.embeddings |
|
}; |
|
} else { |
|
return { |
|
success: false, |
|
error: result.error || 'Embedding generation failed' |
|
}; |
|
} |
|
|
|
} catch (error) { |
|
return { |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error) |
|
}; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
async buildVectorIndex( |
|
documents: Document[], |
|
indexName = 'research_papers_clean_v2' |
|
): Promise<{ |
|
success: boolean; |
|
indexName?: string; |
|
documentCount?: number; |
|
error?: string; |
|
}> { |
|
try { |
|
const modalDocuments = documents.map(doc => ({ |
|
id: doc.id.toString(), |
|
content: doc.content, |
|
title: doc.title, |
|
source: doc.source |
|
})); |
|
|
|
const result = await modalClient.buildVectorIndex(modalDocuments, { |
|
indexName, |
|
dimension: 1536, |
|
indexType: 'IVF', |
|
nlist: Math.min(100, Math.max(10, Math.floor(documents.length / 10))) |
|
}); |
|
|
|
if (result.status === 'completed') { |
|
return { |
|
success: true, |
|
indexName: result.index_name, |
|
documentCount: result.document_count |
|
}; |
|
} else { |
|
return { |
|
success: false, |
|
error: result.error || 'Index building failed' |
|
}; |
|
} |
|
|
|
} catch (error) { |
|
return { |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error) |
|
}; |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
async searchVectorIndex( |
|
query: string, |
|
indexName = 'research_papers_clean_v2', |
|
maxResults = 10 |
|
): Promise<{ |
|
success: boolean; |
|
results?: Array<{ |
|
id: string; |
|
title: string; |
|
content: string; |
|
source: string; |
|
relevanceScore: number; |
|
rank: number; |
|
snippet: string; |
|
}>; |
|
error?: string; |
|
}> { |
|
try { |
|
const result = await modalClient.vectorSearch(query, indexName, maxResults); |
|
if (result.status === 'completed') { |
|
|
|
const enrichedResults = await Promise.all( |
|
result.results.map(async (vectorResult: any) => { |
|
try { |
|
|
|
const dbDocument = await storage.getDocument(parseInt(vectorResult.id)); |
|
if (dbDocument) { |
|
|
|
|
|
const enriched = { |
|
id: dbDocument.id, |
|
title: dbDocument.title, |
|
content: dbDocument.content, |
|
source: dbDocument.source, |
|
sourceType: dbDocument.sourceType, |
|
url: dbDocument.url, |
|
metadata: dbDocument.metadata, |
|
createdAt: dbDocument.createdAt, |
|
|
|
relevanceScore: vectorResult.relevanceScore, |
|
rank: vectorResult.rank, |
|
snippet: vectorResult.snippet || dbDocument.content.substring(0, 200) + '...' |
|
}; |
|
return enriched; |
|
} else { |
|
|
|
return vectorResult; |
|
} |
|
} catch (error) { |
|
console.warn(`Failed to enrich vector result for ID ${vectorResult.id}:`, error); |
|
return vectorResult; |
|
} |
|
}) |
|
); |
|
|
|
return { |
|
success: true, |
|
results: enrichedResults |
|
}; |
|
} else { |
|
return { |
|
success: false, |
|
error: result.error || 'Vector search failed' |
|
}; |
|
} |
|
|
|
} catch (error) { |
|
return { |
|
success: false, |
|
error: error instanceof Error ? error.message : String(error) |
|
}; |
|
} |
|
} |
|
} |
|
|
|
export const documentProcessor = DocumentProcessor.getInstance(); |