File size: 15,849 Bytes
effe6e2 43da375 effe6e2 43da375 effe6e2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
#這個文件將負責所有的數據準備工作。在運行 app.py 之前,你需要先執行這個文件一次。
import re
import os
import sqlite3
import datetime
import chromadb
from pypdf import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from together import Together
from langchain_together import TogetherEmbeddings
from dotenv import load_dotenv
from chromadb.utils import embedding_functions
from langchain_core.documents import Document
# Load environment variables from .env file
load_dotenv()
def extract_text_from_pdf(pdf_path):
"""
Extract all text content from PDF files.
"""
text = ""
try:
reader = PdfReader(pdf_path)
for page in reader.pages:
text += page.extract_text() + "\n" # 提取每頁文字並換行
except Exception as e:
print(f"Error reading PDF {pdf_path}: {e}")
return text
def clean_text_content(text):
"""
Remove irrelevant content, such as copyright information, legal notices, and long blank spaces.
"""
text = re.sub(r'\n\s*\n', '\n\n', text)
text = re.sub(r'^\s*\d+\s*$', '', text, flags=re.MULTILINE)
text = re.sub(r'\s+\d+\s*$', '', text, flags=re.MULTILINE)
text = re.sub(r'Copyright © \d{4} [^\n]*\. All Rights Reserved\.', '', text, flags=re.IGNORECASE)
text = re.sub(r'Confidential and Proprietary Information[^\n]*', '', text, flags=re.IGNORECASE)
text = re.sub(r'Disclaimer:[^.]*\.', '', text, flags=re.IGNORECASE)
text = re.sub(r'\s+', ' ', text).strip()
return text
def standardize_text_format(text):
"""
Try to organize everything into clear paragraphs.
"""
text = re.sub(r'([a-zA-Z])-(\n)([a-zA-Z])', r'\1\3', text)
text = re.sub(r'([.?!])\s*([A-Z])', r'\1 \2', text)
text = re.sub(r'\s+', ' ', text).strip()
text = re.sub(r'([^\n])\n([^\n])', r'\1 \2', text)
text = re.sub(r'\n{2,}', '\n\n', text)
return text
def create_database_table(db_path):
"""
Create a SQLite database and define the table structure.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
original_filename TEXT NOT NULL UNIQUE, -- Add UNIQUE constraint
source_type TEXT,
processed_text TEXT NOT NULL,
processed_date TEXT
)
''')
conn.commit()
conn.close()
print(f"SQLite table 'documents' ensured in {db_path}")
def create_chunks_table(db_path):
"""
Create a table to store the segmented text blocks.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id INTEGER NOT NULL,
chunk_index INTEGER NOT NULL,
chunk_content TEXT NOT NULL,
chunk_length INTEGER,
created_at TEXT,
UNIQUE(document_id, chunk_index), -- Ensure chunks are unique per document
FOREIGN KEY (document_id) REFERENCES documents(id)
)
''')
conn.commit()
conn.close()
print(f"SQLite table 'chunks' ensured in {db_path}")
def insert_document_data(db_path, original_filename, source_type, processed_text, processed_date):
"""
Insert the processed file data into the database.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 檢查是否已存在相同的原始文件
cursor.execute("SELECT id FROM documents WHERE original_filename = ?", (original_filename,))
existing_doc = cursor.fetchone()
if existing_doc:
print(f"Document '{original_filename}' already exists in DB (ID: {existing_doc[0]}). Skipping insertion.")
conn.close()
return existing_doc[0]
else:
cursor.execute('''
INSERT INTO documents (original_filename, source_type, processed_text, processed_date)
VALUES (?, ?, ?, ?)
''', (original_filename, source_type, processed_text, processed_date))
conn.commit()
doc_id = cursor.lastrowid
conn.close()
print(f"Document '{original_filename}' saved to database: {db_path} with ID: {doc_id}")
return doc_id
def get_document_text_from_db(db_path, document_id=None, limit=None):
"""
Reads the processed text contents of one or more files from a SQLite database.
"""
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
query = "SELECT id, processed_text FROM documents"
params = []
if document_id:
query += " WHERE id = ?"
params.append(document_id)
if limit:
query += " LIMIT ?"
params.append(limit)
cursor.execute(query, params)
if document_id:
row = cursor.fetchone()
if row:
return {'id': row[0], 'processed_text': row[1]}
return None
else:
return [{'id': row[0], 'processed_text': row[1]} for row in cursor.fetchall()]
except sqlite3.Error as e:
print(f"Error reading from SQLite database: {e}")
return []
finally:
if conn:
conn.close()
def chunk_text(text, chunk_size=500, chunk_overlap=100): # Adjusted chunk size and overlap for potentially smaller chunks
"""
Splits the text using RecursiveCharacterTextSplitter.
"""
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
add_start_index=True
)
chunks = text_splitter.create_documents([text])
return [chunk.page_content for chunk in chunks]
def insert_chunks_to_db(db_path, document_id, chunks):
"""
Inserts all text blocks from the specified file into the database.
Checks if chunks for this document_id already exist and skips if found.
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
current_time = datetime.datetime.now().isoformat()
# Check if chunks for this document_id already exist
cursor.execute("SELECT COUNT(*) FROM chunks WHERE document_id = ?", (document_id,))
if cursor.fetchone()[0] > 0:
print(f"Chunks for document ID {document_id} already exist. Skipping chunk insertion.")
conn.close()
return
data_to_insert = []
for i, chunk in enumerate(chunks):
data_to_insert.append((document_id, i, chunk, len(chunk), current_time))
cursor.executemany('''
INSERT INTO chunks (document_id, chunk_index, chunk_content, chunk_length, created_at)
VALUES (?, ?, ?, ?, ?)
''', data_to_insert)
conn.commit()
conn.close()
print(f"Saved {len(chunks)} chunks into the database for file ID {document_id}.")
def generate_embeddings(texts, model_name="togethercomputer/m2-bert-80M-32k-retrieval"):
"""
Generates text embeddings using Together AI's embedding model.
"""
try:
embeddings_model = TogetherEmbeddings(
model=model_name,
api_key=os.getenv("TOGETHER_API_KEY")
)
vectors = embeddings_model.embed_documents(texts)
print(f"Successfully generated embeddings for {len(texts)} text chunks using {model_name}.")
return vectors
except Exception as e:
print(f"Error generating embeddings: {e}")
return []
def get_chunks_from_db_for_embedding(db_path):
"""
Reads all split text chunks from the SQLite database, including their IDs.
"""
conn = None
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT id, document_id, chunk_index, chunk_content FROM chunks ORDER BY document_id, chunk_index")
rows = cursor.fetchall()
return [{'id': row[0], 'source_document_id': row[1], 'chunk_index': row[2], 'text': row[3]} for row in rows]
except sqlite3.Error as e:
print(f"Error reading chunks from SQLite database: {e}")
return []
finally:
if conn:
conn.close()
def load_chunks_to_vector_db(chunks_data, db_path="data/vector_db_chroma", collection_name="document_chunks", embeddings_model_name="togethercomputer/m2-bert-80M-32k-retrieval"):
"""
Loads text chunks and their embeddings into a ChromaDB vector database.
This function will now ADD chunks if they are new (based on their IDs).
"""
try:
client = chromadb.PersistentClient(path=db_path)
collection = client.get_or_create_collection(name=collection_name)
# Check existing IDs in ChromaDB to avoid adding duplicates
existing_chroma_ids = set()
if collection.count() > 0:
# Fetching all existing IDs can be slow for very large collections.
# A more efficient approach for very large databases might involve
# querying a batch of IDs or checking after insertion.
# For now, this is simpler for demonstration.
try:
all_ids_in_chroma = collection.get(ids=collection.get()['ids'])['ids']
existing_chroma_ids = set(all_ids_in_chroma)
except Exception as e:
print(f"Warning: Could not retrieve all existing IDs from ChromaDB. May attempt to add duplicates. Error: {e}")
ids_to_add = []
documents_to_add = []
embeddings_to_add = []
metadatas_to_add = []
for item in chunks_data:
chunk_id_str = str(item['id'])
if chunk_id_str not in existing_chroma_ids:
ids_to_add.append(chunk_id_str)
documents_to_add.append(item['text'])
embeddings_to_add.append(item['embedding'])
metadatas_to_add.append({"source_document_id": item.get('source_document_id', 'unknown'),
"chunk_index": item.get('chunk_index', None)})
else:
print(f"Chunk ID {chunk_id_str} already exists in ChromaDB. Skipping.")
if ids_to_add:
collection.add(
embeddings=embeddings_to_add,
documents=documents_to_add,
metadatas=metadatas_to_add,
ids=ids_to_add
)
print(f"Successfully added {len(ids_to_add)} new text chunks into ChromaDB collection '{collection_name}'.")
else:
print(f"No new chunks to add to ChromaDB collection '{collection_name}'.")
except Exception as e:
print(f"Error loading data into ChromaDB: {e}")
if __name__ == "__main__":
# Define the directory containing your PDF files
current_dir = os.path.dirname(os.path.abspath(__file__))
pdf_input_directory = os.path.join(current_dir, "input")
#pdf_input_directory = "input" # Ensure this directory exists and contains your PDFs
db_directory = "database"
db_path = os.path.join(db_directory, "processed_documents.db")
vector_db_dir = "data/vector_db_chroma"
collection_name = "my_instrument_manual_chunks"
embeddings_model_name = "togethercomputer/m2-bert-80M-32k-retrieval" # 確保與 main.py 中使用的一致
# Create necessary directories
os.makedirs(db_directory, exist_ok=True)
os.makedirs(vector_db_dir, exist_ok=True)
print("--- Starting knowledge base setup ---")
# 1. 確保 SQLite 表格存在
create_database_table(db_path)
create_chunks_table(db_path)
# 2. 遍歷指定目錄下的所有 PDF 文件
pdf_files = [f for f in os.listdir(pdf_input_directory) if f.lower().endswith('.pdf')]
if not pdf_files:
print(f"No PDF files found in '{pdf_input_directory}'. Please place your PDF documents there.")
for pdf_filename in pdf_files:
pdf_full_path = os.path.join(pdf_input_directory, pdf_filename)
print(f"\n--- Processing PDF: {pdf_full_path} ---")
# 檢查文件是否已在 documents 表中處理過
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT id FROM documents WHERE original_filename = ?", (pdf_filename,))
existing_doc_id = cursor.fetchone()
conn.close()
if existing_doc_id:
doc_id = existing_doc_id[0]
print(f"Document '{pdf_filename}' already in 'documents' table with ID: {doc_id}. Skipping PDF extraction and text processing.")
# Still process chunks if they aren't in 'chunks' table or ChromaDB
else:
pdf_content = extract_text_from_pdf(pdf_full_path)
if pdf_content:
cleaned_text = clean_text_content(pdf_content)
final_processed_text = standardize_text_format(cleaned_text)
current_date = datetime.date.today().isoformat()
doc_id = insert_document_data(db_path, pdf_filename, "PDF", final_processed_text, current_date)
else:
print(f"No content extracted from {pdf_full_path}. Skipping document insertion for this PDF.")
continue # Move to the next PDF if no content
# 如果 doc_id 存在(無論是新插入還是已存在的),則進行分塊和嵌入處理
if doc_id is not None:
# 從 SQLite documents 表讀取並分塊,存儲到 SQLite chunks 表
document_from_db = get_document_text_from_db(db_path, document_id=doc_id)
if document_from_db:
full_text = document_from_db['processed_text']
print(f"--- Chunking document ID {doc_id} ('{pdf_filename}') ---")
chunks = chunk_text(full_text, chunk_size=500, chunk_overlap=100) # Re-evaluate chunk_size
if chunks:
insert_chunks_to_db(db_path, doc_id, chunks)
else:
print(f"Document ID {doc_id} unable to split any chunks.")
else:
print(f"Could not retrieve processed text for document ID {doc_id}.")
# 3. 從 SQLite chunks 表讀取所有塊並生成嵌入,載入到 ChromaDB
# 我們需要重新從 DB 獲取所有 chunks,因為可能有多個 PDF 的 chunks
chunks_from_db_for_embedding = get_chunks_from_db_for_embedding(db_path)
if chunks_from_db_for_embedding:
# Filter out chunks that already have an embedding in ChromaDB if checking was efficient
# For simplicity, we'll try to generate embeddings for all chunks retrieved from SQLite
# and let load_chunks_to_vector_db handle duplicates in ChromaDB.
texts_to_embed = [item['text'] for item in chunks_from_db_for_embedding]
print(f"\n--- Generating embeddings for {len(texts_to_embed)} total chunks ---")
text_embeddings = generate_embeddings(texts_to_embed, model_name=embeddings_model_name)
if text_embeddings:
data_for_vector_db = []
for i, chunk_data in enumerate(chunks_from_db_for_embedding):
if i < len(text_embeddings):
chunk_data['embedding'] = text_embeddings[i]
data_for_vector_db.append(chunk_data)
else:
print(f"Warning: Missing embedding for chunk {chunk_data['id']}. Skipping this chunk for ChromaDB.")
print(f"--- Loading/Updating {len(data_for_vector_db)} chunks into ChromaDB ---")
load_chunks_to_vector_db(data_for_vector_db, db_path=vector_db_dir, collection_name=collection_name, embeddings_model_name=embeddings_model_name)
else:
print("No embeddings generated. Skipping ChromaDB loading.")
else:
print("No text chunks read from the database for embedding and loading into ChromaDB.")
print("\n--- Knowledge base setup complete ---") |