AnseMin commited on
Commit
f901c17
·
unverified ·
2 Parent(s): 63f3b68 3f1b4af

Merge pull request #4 from ansemin/development

Browse files

Refactor document ingestion and output file handling

src/parsers/gemini_flash_parser.py CHANGED
@@ -85,7 +85,7 @@ class GeminiFlashParser(DocumentParser):
85
 
86
  # Generate the response
87
  response = client.models.generate_content(
88
- model="gemini-2.0-flash",
89
  contents=[
90
  prompt,
91
  genai.types.Part.from_bytes(
 
85
 
86
  # Generate the response
87
  response = client.models.generate_content(
88
+ model=config.model.gemini_model,
89
  contents=[
90
  prompt,
91
  genai.types.Part.from_bytes(
src/rag/ingestion.py CHANGED
@@ -18,12 +18,11 @@ class DocumentIngestionService:
18
 
19
  def __init__(self):
20
  """Initialize the document ingestion service."""
21
- self.processed_documents = set() # Track processed document hashes
22
  logger.info("Document ingestion service initialized")
23
 
24
- def create_document_hash(self, content: str) -> str:
25
- """Create a hash for document content to avoid duplicates."""
26
- return hashlib.sha256(content.encode('utf-8')).hexdigest()[:16]
27
 
28
  def prepare_document_metadata(self,
29
  source_path: Optional[str] = None,
@@ -44,7 +43,6 @@ class DocumentIngestionService:
44
  "source": source_path or "user_upload",
45
  "doc_type": doc_type,
46
  "processed_at": datetime.now().isoformat(),
47
- "source_id": self.create_document_hash(source_path or ""),
48
  "ingestion_version": "1.0"
49
  }
50
 
@@ -53,10 +51,35 @@ class DocumentIngestionService:
53
 
54
  return metadata
55
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  def ingest_markdown_content(self,
57
  markdown_content: str,
58
  source_path: Optional[str] = None,
59
- metadata: Optional[Dict[str, Any]] = None) -> Tuple[bool, str, Dict[str, Any]]:
 
60
  """
61
  Ingest markdown content into the RAG system.
62
 
@@ -64,6 +87,7 @@ class DocumentIngestionService:
64
  markdown_content: The markdown content to ingest
65
  source_path: Optional source path/filename
66
  metadata: Optional additional metadata
 
67
 
68
  Returns:
69
  Tuple of (success, message, ingestion_stats)
@@ -72,24 +96,34 @@ class DocumentIngestionService:
72
  if not markdown_content or not markdown_content.strip():
73
  return False, "No content provided for ingestion", {}
74
 
75
- # Create document hash to check for duplicates
76
- content_hash = self.create_document_hash(markdown_content)
 
77
 
78
- if content_hash in self.processed_documents:
79
- logger.info(f"Document already processed: {content_hash}")
80
- return True, "Document already exists in the system", {"status": "duplicate"}
81
 
82
- # Prepare document metadata
 
 
 
 
 
 
 
 
83
  doc_metadata = self.prepare_document_metadata(
84
  source_path=source_path,
85
  doc_type="markdown",
86
  additional_metadata=metadata
87
  )
88
- doc_metadata["content_hash"] = content_hash
89
  doc_metadata["content_length"] = len(markdown_content)
 
90
 
91
  # Chunk the document using markdown-aware chunking
92
- logger.info(f"Chunking document: {content_hash}")
93
  chunks = document_chunker.chunk_document(markdown_content, doc_metadata)
94
 
95
  if not chunks:
@@ -102,23 +136,22 @@ class DocumentIngestionService:
102
  if not doc_ids:
103
  return False, "Failed to add documents to vector store", {}
104
 
105
- # Mark document as processed
106
- self.processed_documents.add(content_hash)
107
-
108
  # Prepare ingestion statistics
109
  ingestion_stats = {
110
  "status": "success",
111
- "content_hash": content_hash,
112
  "total_chunks": len(chunks),
113
  "document_ids": doc_ids,
114
  "content_length": len(markdown_content),
115
  "has_tables": any(chunk.metadata.get("has_table", False) for chunk in chunks),
116
  "has_code": any(chunk.metadata.get("has_code", False) for chunk in chunks),
117
- "processed_at": datetime.now().isoformat()
 
118
  }
119
 
120
- success_msg = f"Successfully ingested document with {len(chunks)} chunks"
121
- logger.info(f"{success_msg}: {content_hash}")
 
122
 
123
  return True, success_msg, ingestion_stats
124
 
@@ -147,6 +180,7 @@ class DocumentIngestionService:
147
  # Extract metadata from conversion result
148
  original_filename = conversion_result.get("original_filename", "unknown")
149
  conversion_method = conversion_result.get("conversion_method", "unknown")
 
150
 
151
  additional_metadata = {
152
  "original_filename": original_filename,
@@ -155,11 +189,12 @@ class DocumentIngestionService:
155
  "conversion_time": conversion_result.get("conversion_time", 0)
156
  }
157
 
158
- # Ingest the markdown content
159
  return self.ingest_markdown_content(
160
  markdown_content=markdown_content,
161
  source_path=original_filename,
162
- metadata=additional_metadata
 
163
  )
164
 
165
  except Exception as e:
@@ -175,7 +210,7 @@ class DocumentIngestionService:
175
  Dictionary with system status information
176
  """
177
  status = {
178
- "processed_documents": len(self.processed_documents),
179
  "embedding_model_available": False,
180
  "vector_store_available": False,
181
  "system_ready": False
@@ -202,10 +237,6 @@ class DocumentIngestionService:
202
 
203
  return status
204
 
205
- def clear_processed_documents(self) -> None:
206
- """Clear the set of processed documents."""
207
- self.processed_documents.clear()
208
- logger.info("Cleared processed documents cache")
209
 
210
  def test_ingestion_pipeline(self) -> Dict[str, Any]:
211
  """
 
18
 
19
  def __init__(self):
20
  """Initialize the document ingestion service."""
 
21
  logger.info("Document ingestion service initialized")
22
 
23
+ def create_file_hash(self, content: str) -> str:
24
+ """Create a full SHA-256 hash for file content to avoid duplicates."""
25
+ return hashlib.sha256(content.encode('utf-8')).hexdigest()
26
 
27
  def prepare_document_metadata(self,
28
  source_path: Optional[str] = None,
 
43
  "source": source_path or "user_upload",
44
  "doc_type": doc_type,
45
  "processed_at": datetime.now().isoformat(),
 
46
  "ingestion_version": "1.0"
47
  }
48
 
 
51
 
52
  return metadata
53
 
54
+ def check_duplicate_in_vector_store(self, file_hash: str) -> bool:
55
+ """Check if document with given file hash already exists in vector store."""
56
+ try:
57
+ existing_docs = vector_store_manager.get_vector_store()._collection.get(
58
+ where={"file_hash": file_hash},
59
+ limit=1
60
+ )
61
+ return len(existing_docs.get('ids', [])) > 0
62
+ except Exception as e:
63
+ logger.error(f"Error checking for duplicates: {e}")
64
+ return False
65
+
66
+ def delete_existing_document(self, file_hash: str) -> bool:
67
+ """Delete existing document with given file hash from vector store."""
68
+ try:
69
+ vector_store_manager.get_vector_store()._collection.delete(
70
+ where={"file_hash": file_hash}
71
+ )
72
+ logger.info(f"Deleted existing document with hash: {file_hash}")
73
+ return True
74
+ except Exception as e:
75
+ logger.error(f"Error deleting existing document: {e}")
76
+ return False
77
+
78
  def ingest_markdown_content(self,
79
  markdown_content: str,
80
  source_path: Optional[str] = None,
81
+ metadata: Optional[Dict[str, Any]] = None,
82
+ original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]:
83
  """
84
  Ingest markdown content into the RAG system.
85
 
 
87
  markdown_content: The markdown content to ingest
88
  source_path: Optional source path/filename
89
  metadata: Optional additional metadata
90
+ original_file_content: Original file content for hash calculation
91
 
92
  Returns:
93
  Tuple of (success, message, ingestion_stats)
 
96
  if not markdown_content or not markdown_content.strip():
97
  return False, "No content provided for ingestion", {}
98
 
99
+ # Create file hash using original content if available, otherwise use markdown content
100
+ file_content_for_hash = original_file_content or markdown_content
101
+ file_hash = self.create_file_hash(file_content_for_hash)
102
 
103
+ # Check for duplicates in vector store
104
+ is_duplicate = self.check_duplicate_in_vector_store(file_hash)
105
+ replacement_mode = False
106
 
107
+ if is_duplicate:
108
+ logger.info(f"Document with hash {file_hash} already exists, replacing...")
109
+ # Delete existing document
110
+ if self.delete_existing_document(file_hash):
111
+ replacement_mode = True
112
+ else:
113
+ return False, "Failed to replace existing document", {"status": "error"}
114
+
115
+ # Prepare document metadata with file hash
116
  doc_metadata = self.prepare_document_metadata(
117
  source_path=source_path,
118
  doc_type="markdown",
119
  additional_metadata=metadata
120
  )
121
+ doc_metadata["file_hash"] = file_hash
122
  doc_metadata["content_length"] = len(markdown_content)
123
+ doc_metadata["upload_timestamp"] = datetime.now().isoformat()
124
 
125
  # Chunk the document using markdown-aware chunking
126
+ logger.info(f"Chunking document: {file_hash}")
127
  chunks = document_chunker.chunk_document(markdown_content, doc_metadata)
128
 
129
  if not chunks:
 
136
  if not doc_ids:
137
  return False, "Failed to add documents to vector store", {}
138
 
 
 
 
139
  # Prepare ingestion statistics
140
  ingestion_stats = {
141
  "status": "success",
142
+ "file_hash": file_hash,
143
  "total_chunks": len(chunks),
144
  "document_ids": doc_ids,
145
  "content_length": len(markdown_content),
146
  "has_tables": any(chunk.metadata.get("has_table", False) for chunk in chunks),
147
  "has_code": any(chunk.metadata.get("has_code", False) for chunk in chunks),
148
+ "processed_at": datetime.now().isoformat(),
149
+ "replacement_mode": replacement_mode
150
  }
151
 
152
+ action = "Updated existing" if replacement_mode else "Successfully ingested"
153
+ success_msg = f"{action} document with {len(chunks)} chunks"
154
+ logger.info(f"{success_msg}: {file_hash}")
155
 
156
  return True, success_msg, ingestion_stats
157
 
 
180
  # Extract metadata from conversion result
181
  original_filename = conversion_result.get("original_filename", "unknown")
182
  conversion_method = conversion_result.get("conversion_method", "unknown")
183
+ original_file_content = conversion_result.get("original_file_content")
184
 
185
  additional_metadata = {
186
  "original_filename": original_filename,
 
189
  "conversion_time": conversion_result.get("conversion_time", 0)
190
  }
191
 
192
+ # Ingest the markdown content with original file content for proper hashing
193
  return self.ingest_markdown_content(
194
  markdown_content=markdown_content,
195
  source_path=original_filename,
196
+ metadata=additional_metadata,
197
+ original_file_content=original_file_content
198
  )
199
 
200
  except Exception as e:
 
210
  Dictionary with system status information
211
  """
212
  status = {
213
+ "processed_documents": 0, # Will be updated from vector store
214
  "embedding_model_available": False,
215
  "vector_store_available": False,
216
  "system_ready": False
 
237
 
238
  return status
239
 
 
 
 
 
240
 
241
  def test_ingestion_pipeline(self) -> Dict[str, Any]:
242
  """
src/services/document_service.py CHANGED
@@ -118,8 +118,8 @@ class DocumentService:
118
 
119
  return content
120
 
121
- def _create_output_file(self, content: str, output_format: str) -> str:
122
- """Create output file with proper extension."""
123
  # Determine file extension
124
  format_extensions = {
125
  "markdown": ".md",
@@ -132,18 +132,47 @@ class DocumentService:
132
  if self._check_cancellation():
133
  raise ConversionError("Conversion cancelled before output file creation")
134
 
135
- # Create temporary output file
136
- with tempfile.NamedTemporaryFile(mode="w", suffix=ext, delete=False, encoding="utf-8") as tmp:
137
- tmp_path = tmp.name
138
-
139
- # Write in chunks with cancellation checks
140
- chunk_size = 10000 # characters
141
- for i in range(0, len(content), chunk_size):
142
- if self._check_cancellation():
143
- self._safe_delete_file(tmp_path)
144
- raise ConversionError("Conversion cancelled during output file writing")
145
-
146
- tmp.write(content[i:i+chunk_size])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
 
148
  return tmp_path
149
 
@@ -218,7 +247,7 @@ class DocumentService:
218
  raise ConversionError("Conversion cancelled")
219
 
220
  # Create output file
221
- output_path = self._create_output_file(content, output_format)
222
 
223
  return content, output_path
224
 
 
118
 
119
  return content
120
 
121
+ def _create_output_file(self, content: str, output_format: str, original_file_path: Optional[str] = None) -> str:
122
+ """Create output file with proper extension and preserved filename."""
123
  # Determine file extension
124
  format_extensions = {
125
  "markdown": ".md",
 
132
  if self._check_cancellation():
133
  raise ConversionError("Conversion cancelled before output file creation")
134
 
135
+ # Create output filename based on original filename if provided
136
+ if original_file_path:
137
+ original_name = Path(original_file_path).stem # Get filename without extension
138
+ # Clean the filename to be filesystem-safe while preserving spaces and common characters
139
+ clean_name = "".join(c for c in original_name if c.isalnum() or c in (' ', '-', '_', '.', '(', ')')).strip()
140
+ # Replace multiple spaces with single spaces
141
+ clean_name = ' '.join(clean_name.split())
142
+ if not clean_name: # Fallback if cleaning removes everything
143
+ clean_name = "converted_document"
144
+
145
+ # Create output file in temp directory with proper name
146
+ temp_dir = tempfile.gettempdir()
147
+ output_filename = f"{clean_name}{ext}"
148
+ tmp_path = os.path.join(temp_dir, output_filename)
149
+
150
+ # Handle filename conflicts by adding a number suffix
151
+ counter = 1
152
+ base_path = tmp_path
153
+ while os.path.exists(tmp_path):
154
+ name_part = f"{clean_name}_{counter}"
155
+ tmp_path = os.path.join(temp_dir, f"{name_part}{ext}")
156
+ counter += 1
157
+ else:
158
+ # Fallback to random temporary file
159
+ with tempfile.NamedTemporaryFile(mode="w", suffix=ext, delete=False, encoding="utf-8") as tmp:
160
+ tmp_path = tmp.name
161
+
162
+ # Write content to file
163
+ try:
164
+ with open(tmp_path, "w", encoding="utf-8") as f:
165
+ # Write in chunks with cancellation checks
166
+ chunk_size = 10000 # characters
167
+ for i in range(0, len(content), chunk_size):
168
+ if self._check_cancellation():
169
+ self._safe_delete_file(tmp_path)
170
+ raise ConversionError("Conversion cancelled during output file writing")
171
+
172
+ f.write(content[i:i+chunk_size])
173
+ except Exception as e:
174
+ self._safe_delete_file(tmp_path)
175
+ raise ConversionError(f"Failed to write output file: {str(e)}")
176
 
177
  return tmp_path
178
 
 
247
  raise ConversionError("Conversion cancelled")
248
 
249
  # Create output file
250
+ output_path = self._create_output_file(content, output_format, file_path)
251
 
252
  return content, output_path
253
 
src/ui/ui.py CHANGED
@@ -170,12 +170,22 @@ def handle_convert(file_path, parser_name, ocr_method_name, output_format, is_ca
170
 
171
  # Auto-ingest the converted document for RAG
172
  try:
 
 
 
 
 
 
 
 
 
173
  conversion_result = {
174
  "markdown_content": content,
175
  "original_filename": Path(file_path).name if file_path else "unknown",
176
  "conversion_method": parser_name,
177
  "file_size": Path(file_path).stat().st_size if file_path and Path(file_path).exists() else 0,
178
- "conversion_time": 0 # Could be tracked if needed
 
179
  }
180
 
181
  success, ingestion_msg, stats = document_ingestion_service.ingest_from_conversion_result(conversion_result)
 
170
 
171
  # Auto-ingest the converted document for RAG
172
  try:
173
+ # Read original file content for proper deduplication hashing
174
+ original_file_content = None
175
+ if file_path and Path(file_path).exists():
176
+ try:
177
+ with open(file_path, 'rb') as f:
178
+ original_file_content = f.read().decode('utf-8', errors='ignore')
179
+ except Exception as e:
180
+ logger.warning(f"Could not read original file content: {e}")
181
+
182
  conversion_result = {
183
  "markdown_content": content,
184
  "original_filename": Path(file_path).name if file_path else "unknown",
185
  "conversion_method": parser_name,
186
  "file_size": Path(file_path).stat().st_size if file_path and Path(file_path).exists() else 0,
187
+ "conversion_time": 0, # Could be tracked if needed
188
+ "original_file_content": original_file_content
189
  }
190
 
191
  success, ingestion_msg, stats = document_ingestion_service.ingest_from_conversion_result(conversion_result)