KnowledgeBridge / server /document-processor.ts
fazeel007's picture
Fix index
f36d1f9
import fs from 'fs';
import path from 'path';
import { modalClient } from './modal-client';
import { nebiusClient } from './nebius-client';
import { FileProcessor } from './file-upload';
import { storage } from './storage';
import { type Document, type InsertDocument } from '@shared/schema';
export interface ProcessingResult {
success: boolean;
extractedText?: string;
embeddings?: number[];
modalTaskId?: string;
error?: string;
processingTime: number;
}
export interface BatchProcessingResult {
success: boolean;
processedCount: number;
failedCount: number;
results: Array<{
documentId: number;
success: boolean;
extractedText?: string;
embeddings?: number[];
error?: string;
}>;
totalProcessingTime: number;
}
export class DocumentProcessor {
private static instance: DocumentProcessor;
static getInstance(): DocumentProcessor {
if (!DocumentProcessor.instance) {
DocumentProcessor.instance = new DocumentProcessor();
}
return DocumentProcessor.instance;
}
/**
* Process a single document using Modal for heavy workloads
*/
async processDocument(
document: Document,
operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text']
): Promise<ProcessingResult> {
const startTime = Date.now();
try {
let extractedText = document.content;
let embeddings: number[] | undefined;
let modalTaskId: string | undefined;
// Step 1: Extract text if needed (for PDFs and images)
if (operations.includes('extract_text') && document.filePath) {
const textResult = await this.extractText(document);
if (textResult.success) {
extractedText = textResult.extractedText || document.content;
modalTaskId = textResult.modalTaskId;
} else {
console.warn(`Text extraction failed for document ${document.id}: ${textResult.error}`);
}
}
// Step 2: Generate embeddings if requested
if (operations.includes('generate_embedding') && extractedText) {
const embeddingResult = await this.generateEmbeddings(extractedText);
if (embeddingResult.success) {
embeddings = embeddingResult.embeddings;
} else {
console.warn(`Embedding generation failed for document ${document.id}: ${embeddingResult.error}`);
}
}
const processingTime = Date.now() - startTime;
return {
success: true,
extractedText,
embeddings,
modalTaskId,
processingTime
};
} catch (error) {
const processingTime = Date.now() - startTime;
return {
success: false,
error: error instanceof Error ? error.message : String(error),
processingTime
};
}
}
/**
* Process multiple documents in batch using Modal's distributed computing
*/
async batchProcessDocuments(
documents: Document[],
operations: Array<'extract_text' | 'generate_embedding' | 'build_index'> = ['extract_text']
): Promise<BatchProcessingResult> {
const startTime = Date.now();
const results: BatchProcessingResult['results'] = [];
try {
// Separate documents by processing requirements
const documentsForModal = documents.filter(doc =>
doc.filePath && FileProcessor.requiresOCR(doc.mimeType || '')
);
const documentsForLocal = documents.filter(doc =>
!doc.filePath || !FileProcessor.requiresOCR(doc.mimeType || '')
);
// Process Modal-required documents in batch
if (documentsForModal.length > 0 && operations.includes('extract_text')) {
try {
const modalResults = await this.batchExtractTextModal(documentsForModal);
results.push(...modalResults);
} catch (error) {
console.error('Modal batch processing failed:', error);
// Fall back to individual processing
for (const doc of documentsForModal) {
const result = await this.processDocument(doc, operations);
results.push({
documentId: doc.id,
success: result.success,
extractedText: result.extractedText,
embeddings: result.embeddings,
error: result.error
});
}
}
}
// Process local documents
for (const doc of documentsForLocal) {
const result = await this.processDocument(doc, operations);
results.push({
documentId: doc.id,
success: result.success,
extractedText: result.extractedText,
embeddings: result.embeddings,
error: result.error
});
}
const totalProcessingTime = Date.now() - startTime;
const successCount = results.filter(r => r.success).length;
const failedCount = results.length - successCount;
return {
success: true,
processedCount: successCount,
failedCount,
results,
totalProcessingTime
};
} catch (error) {
const totalProcessingTime = Date.now() - startTime;
return {
success: false,
processedCount: 0,
failedCount: documents.length,
results: documents.map(doc => ({
documentId: doc.id,
success: false,
error: error instanceof Error ? error.message : String(error)
})),
totalProcessingTime
};
}
}
/**
* Extract text from a document using Modal for PDFs/images or direct reading for text files
*/
private async extractText(document: Document): Promise<{
success: boolean;
extractedText?: string;
modalTaskId?: string;
error?: string;
}> {
if (!document.filePath) {
return { success: true, extractedText: document.content };
}
const mimeType = document.mimeType || '';
try {
// For text files, read directly
if (FileProcessor.isTextFile(mimeType)) {
const content = await FileProcessor.readTextFile(document.filePath);
return { success: true, extractedText: content };
}
// For PDFs and images, use Modal
if (FileProcessor.requiresOCR(mimeType)) {
return await this.extractTextModal(document);
}
// Fallback: return existing content
return { success: true, extractedText: document.content };
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : String(error)
};
}
}
/**
* Extract text using Modal for OCR-required files
*/
private async extractTextModal(document: Document): Promise<{
success: boolean;
extractedText?: string;
modalTaskId?: string;
error?: string;
}> {
try {
if (!document.filePath) {
throw new Error('No file path provided for Modal processing');
}
// Read file and convert to base64
const fileBuffer = await fs.promises.readFile(document.filePath);
const base64Content = fileBuffer.toString('base64');
// Prepare document for Modal
const modalDocument = {
id: document.id.toString(),
content: base64Content,
contentType: document.mimeType || 'application/octet-stream'
};
// Call Modal extract-text endpoint
const result = await modalClient.extractTextFromDocuments([modalDocument]);
if (result.status === 'completed' && result.results?.length > 0) {
const extractionResult = result.results[0];
if (extractionResult.status === 'completed') {
return {
success: true,
extractedText: extractionResult.extracted_text,
modalTaskId: result.task_id
};
} else {
return {
success: false,
error: extractionResult.error || 'Modal extraction failed'
};
}
} else {
return {
success: false,
error: result.error || 'Modal processing failed'
};
}
} catch (error) {
console.error('Modal text extraction failed:', error);
return {
success: false,
error: error instanceof Error ? error.message : String(error)
};
}
}
/**
* Batch extract text using Modal
*/
private async batchExtractTextModal(documents: Document[]): Promise<Array<{
documentId: number;
success: boolean;
extractedText?: string;
error?: string;
}>> {
const modalDocuments = await Promise.all(
documents.map(async (doc) => {
if (!doc.filePath) return null;
try {
const fileBuffer = await fs.promises.readFile(doc.filePath);
return {
id: doc.id.toString(),
content: fileBuffer.toString('base64'),
contentType: doc.mimeType || 'application/octet-stream'
};
} catch (error) {
console.error(`Failed to read file for document ${doc.id}:`, error);
return null;
}
})
);
const validDocuments = modalDocuments.filter(doc => doc !== null) as any[];
if (validDocuments.length === 0) {
return documents.map(doc => ({
documentId: doc.id,
success: false,
error: 'No valid documents for processing'
}));
}
try {
const batchResult = await modalClient.batchProcessDocuments({
documents: validDocuments,
modelName: 'text-embedding-3-small',
batchSize: Math.min(validDocuments.length, 10)
});
if (batchResult.status === 'completed' && batchResult.extraction_results) {
return batchResult.extraction_results.map((result: any) => ({
documentId: parseInt(result.id),
success: result.status === 'completed',
extractedText: result.extracted_text,
error: result.error
}));
} else {
throw new Error(batchResult.error || 'Batch processing failed');
}
} catch (error) {
console.error('Modal batch processing failed:', error);
return documents.map(doc => ({
documentId: doc.id,
success: false,
error: error instanceof Error ? error.message : String(error)
}));
}
}
/**
* Generate embeddings using Nebius AI
*/
private async generateEmbeddings(text: string): Promise<{
success: boolean;
embeddings?: number[];
error?: string;
}> {
try {
// Truncate text if too long (most embedding models have token limits)
const maxLength = 8000; // Conservative limit
const truncatedText = text.length > maxLength ? text.substring(0, maxLength) : text;
const result = await nebiusClient.generateEmbeddings(truncatedText);
if (result.success && result.embeddings) {
return {
success: true,
embeddings: result.embeddings
};
} else {
return {
success: false,
error: result.error || 'Embedding generation failed'
};
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : String(error)
};
}
}
/**
* Build vector index using Modal
*/
async buildVectorIndex(
documents: Document[],
indexName = 'research_papers_clean_v2'
): Promise<{
success: boolean;
indexName?: string;
documentCount?: number;
error?: string;
}> {
try {
const modalDocuments = documents.map(doc => ({
id: doc.id.toString(),
content: doc.content,
title: doc.title,
source: doc.source
}));
const result = await modalClient.buildVectorIndex(modalDocuments, {
indexName,
dimension: 1536, // Standard OpenAI embedding dimension
indexType: 'IVF',
nlist: Math.min(100, Math.max(10, Math.floor(documents.length / 10)))
});
if (result.status === 'completed') {
return {
success: true,
indexName: result.index_name,
documentCount: result.document_count
};
} else {
return {
success: false,
error: result.error || 'Index building failed'
};
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : String(error)
};
}
}
/**
* Search vector index using Modal
*/
async searchVectorIndex(
query: string,
indexName = 'research_papers_clean_v2',
maxResults = 10
): Promise<{
success: boolean;
results?: Array<{
id: string;
title: string;
content: string;
source: string;
relevanceScore: number;
rank: number;
snippet: string;
}>;
error?: string;
}> {
try {
const result = await modalClient.vectorSearch(query, indexName, maxResults);
if (result.status === 'completed') {
// Enrich vector search results with complete document data from database
const enrichedResults = await Promise.all(
result.results.map(async (vectorResult: any) => {
try {
// Get complete document data from database using the ID
const dbDocument = await storage.getDocument(parseInt(vectorResult.id));
if (dbDocument) {
// Merge vector search metadata with database document
// Ensure the URL field is preserved from the database
const enriched = {
id: dbDocument.id,
title: dbDocument.title,
content: dbDocument.content,
source: dbDocument.source,
sourceType: dbDocument.sourceType,
url: dbDocument.url, // Explicitly preserve URL
metadata: dbDocument.metadata,
createdAt: dbDocument.createdAt,
// Add vector search specific fields
relevanceScore: vectorResult.relevanceScore,
rank: vectorResult.rank,
snippet: vectorResult.snippet || dbDocument.content.substring(0, 200) + '...'
};
return enriched;
} else {
// Fallback to vector result if database document not found
return vectorResult;
}
} catch (error) {
console.warn(`Failed to enrich vector result for ID ${vectorResult.id}:`, error);
return vectorResult;
}
})
);
return {
success: true,
results: enrichedResults
};
} else {
return {
success: false,
error: result.error || 'Vector search failed'
};
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : String(error)
};
}
}
}
export const documentProcessor = DocumentProcessor.getInstance();