KnowledgeBridge / server /smart-ingestion.ts
fazeel007's picture
Add detailed Modal usage logging for debugging
7e9dcae
/**
* Smart Document Ingestion Service
* Combines Modal's serverless compute with Nebius AI analysis
*/
import { modalClient, type DocumentProcessingTask } from './modal-client';
import { nebiusClient } from './nebius-client';
import { storage } from './storage';
import type { InsertDocument } from '@shared/schema';
interface DocumentUpload {
file: Buffer | string;
filename: string;
contentType: string;
metadata?: Record<string, any>;
}
interface IngestionResult {
documentId: number;
processingTaskId: string;
analysis: {
category: string;
summary: string;
keyPoints: string[];
qualityScore: number;
};
embeddings: number[];
status: 'processing' | 'completed' | 'failed';
}
class SmartIngestionService {
/**
* Process uploaded document with full AI pipeline
*/
async ingestDocument(upload: DocumentUpload): Promise<IngestionResult> {
try {
// Step 1: Extract text content using Modal OCR if needed
let textContent: string;
if (upload.contentType.includes('pdf') || upload.contentType.includes('image')) {
const ocrTask = await modalClient.extractTextFromDocuments([
this.uploadToTempStorage(upload)
]);
// Wait for OCR completion (simplified for demo)
textContent = await this.waitForTaskCompletion(ocrTask.taskId);
} else {
textContent = upload.file.toString();
}
// Step 2: Analyze document using Nebius AI
const [
categoryAnalysis,
summaryAnalysis,
keyPointsAnalysis,
qualityAnalysis
] = await Promise.all([
nebiusClient.analyzeDocument({
content: textContent,
analysisType: 'classification'
}),
nebiusClient.analyzeDocument({
content: textContent,
analysisType: 'summary'
}),
nebiusClient.analyzeDocument({
content: textContent,
analysisType: 'key_points'
}),
nebiusClient.analyzeDocument({
content: textContent,
analysisType: 'quality_score'
})
]);
// Step 3: Generate embeddings using Nebius
const embeddingResponse = await nebiusClient.createEmbeddings({
input: textContent.substring(0, 8000), // Limit for token constraints
model: 'text-embedding-3-large'
});
// Step 4: Create document in storage
const documentData: InsertDocument = {
title: upload.filename,
content: textContent,
sourceType: this.extractSourceType(categoryAnalysis.analysis),
source: upload.filename,
url: upload.metadata?.url,
metadata: {
...upload.metadata,
analysis: {
category: categoryAnalysis.analysis,
summary: summaryAnalysis.analysis,
keyPoints: keyPointsAnalysis.analysis,
qualityScore: this.extractQualityScore(qualityAnalysis.analysis)
},
contentType: upload.contentType,
fileSize: Buffer.isBuffer(upload.file) ? upload.file.length : upload.file.length,
processingTimestamp: new Date().toISOString()
}
};
const document = await storage.createDocument(documentData);
// Step 5: Queue vector indexing with Modal
const indexTask = await modalClient.buildVectorIndex([{
id: document.id.toString(),
content: textContent,
embeddings: embeddingResponse.data[0].embedding,
metadata: document.metadata
}]);
return {
documentId: document.id,
processingTaskId: indexTask.taskId,
analysis: {
category: this.extractCategory(categoryAnalysis.analysis),
summary: summaryAnalysis.analysis,
keyPoints: this.parseKeyPoints(keyPointsAnalysis.analysis),
qualityScore: this.extractQualityScore(qualityAnalysis.analysis)
},
embeddings: embeddingResponse.data[0].embedding,
status: 'completed'
};
} catch (error) {
console.error('Document ingestion failed:', error);
throw new Error(`Ingestion failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
}
}
/**
* Batch process multiple documents using Modal's distributed compute
*/
async batchIngestDocuments(uploads: DocumentUpload[]): Promise<{
taskId: string;
estimatedCompletion: Date;
documentsQueued: number;
}> {
// Prepare documents for batch processing
const documents = uploads.map((upload, index) => ({
id: `batch_${Date.now()}_${index}`,
content: upload.file.toString(),
metadata: {
filename: upload.filename,
contentType: upload.contentType,
...upload.metadata
}
}));
// Submit to Modal for distributed processing
const task = await modalClient.batchProcessDocuments({
documents,
modelName: 'text-embedding-3-large',
batchSize: 20
});
return {
taskId: task.taskId,
estimatedCompletion: new Date(Date.now() + uploads.length * 2000), // 2s per doc estimate
documentsQueued: uploads.length
};
}
/**
* Enhanced search using both Modal vector search and Nebius query understanding
*/
async enhancedSearch(query: string, options: {
maxResults?: number;
searchType?: 'semantic' | 'hybrid';
useQueryEnhancement?: boolean;
} = {}): Promise<{
results: any[];
enhancedQuery?: any;
searchInsights?: any;
}> {
const { maxResults = 10, searchType = 'semantic', useQueryEnhancement = true } = options;
// Step 1: Enhance query using Nebius AI
let enhancedQueryData;
let searchQuery = query;
if (useQueryEnhancement) {
enhancedQueryData = await nebiusClient.enhanceQuery(query);
searchQuery = enhancedQueryData.enhancedQuery;
}
// Step 2: Perform vector search using Modal's high-performance endpoint
let modalResults = [];
// Skip Modal if not configured properly
if (process.env.MODAL_TOKEN_ID && process.env.MODAL_TOKEN_SECRET) {
try {
console.log('🔄 Attempting Modal vector search...');
const modalResponse = await modalClient.vectorSearch(
searchQuery,
'main_index', // Assuming we have a main index
maxResults
);
modalResults = modalResponse.results || [];
console.log(`✅ Modal search returned ${modalResults.length} results`);
} catch (error) {
console.log('❌ Modal search failed:', error instanceof Error ? error.message : String(error));
console.log('🔄 Falling back to local search');
}
} else {
console.log('⚠️ Modal not configured, using local search only');
}
// Step 3: Get local results as backup/supplement
const localResults = await storage.searchDocuments({
query: searchQuery,
searchType: searchType as "semantic" | "keyword" | "hybrid",
limit: maxResults,
offset: 0
});
// Step 4: Combine and rank results using Nebius AI
const combinedResults = [...modalResults, ...localResults.results]
.slice(0, maxResults * 2); // Get more for re-ranking
// Step 5: Score relevance using Nebius AI
const scoredResults = await Promise.all(
combinedResults.map(async (result) => {
try {
const relevanceData = await nebiusClient.scoreCitationRelevance(query, {
title: result.title,
content: result.content,
snippet: result.snippet || result.content.substring(0, 200)
});
return {
...result,
relevanceScore: relevanceData.relevanceScore,
aiExplanation: relevanceData.explanation,
keyReasons: relevanceData.keyReasons
};
} catch (error) {
return { ...result, relevanceScore: result.relevanceScore || 0.5 };
}
})
);
// Step 6: Sort by AI-enhanced relevance scores
const finalResults = scoredResults
.sort((a, b) => (b.relevanceScore || 0) - (a.relevanceScore || 0))
.slice(0, maxResults);
return {
results: finalResults,
enhancedQuery: enhancedQueryData,
searchInsights: {
totalResults: finalResults.length,
avgRelevanceScore: finalResults.reduce((acc, r) => acc + (r.relevanceScore || 0), 0) / finalResults.length,
modalResultsCount: modalResults.length,
localResultsCount: localResults.results.length
}
};
}
/**
* Generate research synthesis using Nebius AI
*/
async generateResearchSynthesis(query: string, documents: any[]): Promise<any> {
if (documents.length === 0) {
return {
synthesis: 'No documents available for synthesis',
keyFindings: [],
gaps: ['Insufficient source material'],
recommendations: ['Search for more relevant documents']
};
}
return nebiusClient.generateResearchInsights(
documents.map(doc => ({
title: doc.title,
content: doc.content,
metadata: doc.metadata
})),
query
);
}
// Helper methods
private uploadToTempStorage(upload: DocumentUpload): string {
// In production, upload to cloud storage and return URL
return `temp://documents/${upload.filename}`;
}
private async waitForTaskCompletion(taskId: string): Promise<string> {
// Simplified polling for demo - in production use webhooks
const maxAttempts = 30;
let attempts = 0;
while (attempts < maxAttempts) {
const status = await modalClient.getTaskStatus(taskId);
if (status.status === 'completed') {
return status.result?.extractedText || 'Text extraction completed';
} else if (status.status === 'failed') {
throw new Error(`Task failed: ${status.error}`);
}
await new Promise(resolve => setTimeout(resolve, 2000));
attempts++;
}
throw new Error('Task timed out');
}
private extractSourceType(analysis: string): string {
const types: Record<string, string> = {
'academic_paper': 'academic',
'technical_documentation': 'technical',
'research_report': 'research',
'code_repository': 'code',
'blog_post': 'web',
'news_article': 'news'
};
for (const [key, value] of Object.entries(types)) {
if (analysis.toLowerCase().includes(key)) {
return value;
}
}
return 'general';
}
private extractCategory(analysis: string): string {
return analysis.split('\n')[0] || 'Unknown';
}
private parseKeyPoints(analysis: string): string[] {
return analysis.split('\n')
.filter(line => line.trim().startsWith('-') || line.trim().startsWith('•') || line.match(/^\d+\./))
.map(line => line.replace(/^[-•\d.]\s*/, '').trim())
.slice(0, 5);
}
private extractQualityScore(analysis: string): number {
const scoreMatch = analysis.match(/(\d+(?:\.\d+)?)\s*\/?\s*10/);
if (scoreMatch) {
return parseFloat(scoreMatch[1]);
}
return 7.0; // Default score
}
}
export const smartIngestionService = new SmartIngestionService();
export type { DocumentUpload, IngestionResult };