import os from typing import Dict, Any, List, Optional from sqlalchemy.orm import Session from sqlalchemy import text, and_, or_ from backend.models.schemas import NLPAnalysisResult, DatabaseResult, QueryType from backend.config import settings from backend.database.connection import get_database_session, close_database_session from backend.database.models import User, Product, Order, OrderItem, LineMessage, UserSession import logging logger = logging.getLogger(__name__) class DatabaseService: """資料庫服務""" def __init__(self): self.model_mapping = { "users": User, "products": Product, "orders": Order, "order_items": OrderItem, "line_messages": LineMessage, "user_sessions": UserSession } # 定義資料表結構 self.table_schemas = { "users": { "model": User, "searchable": ["name", "email", "display_name"] }, "orders": { "model": Order, "searchable": ["order_id", "status"] }, "products": { "model": Product, "searchable": ["name", "category", "description"] }, "line_messages": { "model": LineMessage, "searchable": ["message", "user_id"] }, "order_items": { "model": OrderItem, "searchable": [] }, "user_sessions": { "model": UserSession, "searchable": ["user_id"] } } def execute_query(self, analysis_result: NLPAnalysisResult) -> DatabaseResult: """執行資料庫查詢""" try: query_type = analysis_result.query_type parameters = analysis_result.parameters if query_type == QueryType.SEARCH: return self._execute_search(parameters) elif query_type == QueryType.CREATE: return self._execute_create(parameters) elif query_type == QueryType.UPDATE: return self._execute_update(parameters) elif query_type == QueryType.DELETE: return self._execute_delete(parameters) elif query_type == QueryType.ANALYTICS: return self._execute_analytics(parameters) else: return DatabaseResult( success=False, error="不支援的查詢類型" ) except Exception as e: logger.error(f"資料庫查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"資料庫查詢失敗: {str(e)}" ) def _execute_search(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行搜尋查詢""" table_name = parameters.get("table", "users") conditions = parameters.get("conditions", {}) limit = parameters.get("limit", 10) offset = parameters.get("offset", 0) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立查詢 query = db.query(model) # 添加條件 for field, value in conditions.items(): if hasattr(model, field): column = getattr(model, field) if isinstance(value, dict): # 處理範圍查詢 if "gte" in value: query = query.filter(column >= value["gte"]) if "lte" in value: query = query.filter(column <= value["lte"]) if "gt" in value: query = query.filter(column > value["gt"]) if "lt" in value: query = query.filter(column < value["lt"]) else: # 處理精確匹配或模糊搜尋 searchable_fields = self.table_schemas.get(table_name, {}).get("searchable", []) if field in searchable_fields: query = query.filter(column.ilike(f"%{value}%")) else: query = query.filter(column == value) # 添加限制和偏移 query = query.offset(offset).limit(limit) # 執行查詢 results = query.all() # 轉換為字典格式 data = [] for result in results: item = {} for column in result.__table__.columns: value = getattr(result, column.name) # 處理特殊類型 if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) data.append(item) return DatabaseResult( success=True, data=data, count=len(data) ) except Exception as e: logger.error(f"搜尋查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"搜尋失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_create(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行建立查詢""" table_name = parameters.get("table", "users") data = parameters.get("data", {}) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立新記錄 new_record = model(**data) db.add(new_record) db.commit() db.refresh(new_record) # 轉換為字典格式 result_data = {} for column in new_record.__table__.columns: value = getattr(new_record, column.name) if hasattr(value, 'isoformat'): # datetime result_data[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: result_data[column.name] = value else: result_data[column.name] = str(value) return DatabaseResult( success=True, data=[result_data], count=1 ) except Exception as e: if db: db.rollback() logger.error(f"建立查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"建立失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_update(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行更新查詢""" table_name = parameters.get("table", "users") conditions = parameters.get("conditions", {}) data = parameters.get("data", {}) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立查詢 query = db.query(model) # 添加條件 for field, value in conditions.items(): if hasattr(model, field): column = getattr(model, field) query = query.filter(column == value) # 執行更新 updated_count = query.update(data) db.commit() # 取得更新後的資料 updated_records = query.all() # 轉換為字典格式 result_data = [] for record in updated_records: item = {} for column in record.__table__.columns: value = getattr(record, column.name) if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) result_data.append(item) return DatabaseResult( success=True, data=result_data, count=updated_count ) except Exception as e: if db: db.rollback() logger.error(f"更新查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"更新失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_delete(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行刪除查詢""" table_name = parameters.get("table", "users") conditions = parameters.get("conditions", {}) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立查詢 query = db.query(model) # 添加條件 for field, value in conditions.items(): if hasattr(model, field): column = getattr(model, field) query = query.filter(column == value) # 先取得要刪除的記錄 records_to_delete = query.all() # 轉換為字典格式 result_data = [] for record in records_to_delete: item = {} for column in record.__table__.columns: value = getattr(record, column.name) if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) result_data.append(item) # 執行刪除 deleted_count = query.delete() db.commit() return DatabaseResult( success=True, data=result_data, count=deleted_count ) except Exception as e: if db: db.rollback() logger.error(f"刪除查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"刪除失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_analytics(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行分析查詢""" table_name = parameters.get("table", "users") db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 執行計數查詢 total_count = db.query(model).count() # 取得範例資料 sample_records = db.query(model).limit(5).all() # 轉換範例資料為字典格式 sample_data = [] for record in sample_records: item = {} for column in record.__table__.columns: value = getattr(record, column.name) if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) sample_data.append(item) analytics_data = [{ "table": table_name, "count": total_count, "data_sample": sample_data }] return DatabaseResult( success=True, data=analytics_data, count=1 ) except Exception as e: logger.error(f"分析查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"分析失敗: {str(e)}" ) finally: if db: close_database_session(db) def save_message(self, user_id: str, message: str, message_type: str = "text") -> bool: """儲存訊息記錄""" db = None try: db = get_database_session() new_message = LineMessage( user_id=user_id, message=message, message_type=message_type ) db.add(new_message) db.commit() return True except Exception as e: if db: db.rollback() logger.error(f"儲存訊息錯誤: {str(e)}") return False finally: if db: close_database_session(db) def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]: """取得用戶資料""" db = None try: db = get_database_session() user = db.query(User).filter(User.user_id == user_id).first() if not user: return None # 轉換為字典格式 user_data = {} for column in user.__table__.columns: value = getattr(user, column.name) if hasattr(value, 'isoformat'): # datetime user_data[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: user_data[column.name] = value else: user_data[column.name] = str(value) return user_data except Exception as e: logger.error(f"取得用戶資料錯誤: {str(e)}") return None finally: if db: close_database_session(db) def create_user_profile(self, user_data: Dict[str, Any]) -> bool: """建立用戶資料""" db = None try: db = get_database_session() new_user = User(**user_data) db.add(new_user) db.commit() return True except Exception as e: if db: db.rollback() logger.error(f"建立用戶資料錯誤: {str(e)}") return False finally: if db: close_database_session(db) # ==================== 業務專用查詢方法 ==================== def search_products(self, query_text: str = None, category: str = None, warehouse: str = None, limit: int = 10) -> DatabaseResult: """商品查詢 - 支援自然語言查詢""" db = None try: db = get_database_session() # 建立基本查詢 query = db.query(Product) # 如果有查詢文字,進行模糊搜尋 if query_text: search_filter = or_( Product.name.ilike(f"%{query_text}%"), Product.description.ilike(f"%{query_text}%") ) query = query.filter(search_filter) # 類別篩選 if category: query = query.filter(Product.category.ilike(f"%{category}%")) # 執行查詢 products = query.limit(limit).all() # 轉換為字典格式 data = [] for product in products: product_data = { "id": product.id, "name": product.name, "description": product.description, "price": float(product.price) if product.price else 0.0, "category": product.category, "stock": product.stock if hasattr(product, 'stock') else 0, "created_at": product.created_at.isoformat() if product.created_at else None } data.append(product_data) return DatabaseResult( success=True, data=data, count=len(data) ) except Exception as e: logger.error(f"商品查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"商品查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def check_inventory(self, product_name: str = None, category: str = None) -> DatabaseResult: """庫存查詢""" db = None try: db = get_database_session() # 建立查詢 query = db.query(Product) # 根據商品名稱查詢 if product_name: query = query.filter(Product.name.ilike(f"%{product_name}%")) # 類別篩選 if category: query = query.filter(Product.category.ilike(f"%{category}%")) products = query.all() # 準備庫存資料 inventory_data = [] for product in products: stock_info = { "product_name": product.name, "category": product.category, "current_stock": product.stock if hasattr(product, 'stock') else 0, "price": float(product.price) if product.price else 0.0, "description": product.description, "last_updated": product.updated_at.isoformat() if product.updated_at else None } inventory_data.append(stock_info) return DatabaseResult( success=True, data=inventory_data, count=len(inventory_data) ) except Exception as e: logger.error(f"庫存查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"庫存查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def search_orders(self, user_id: str = None, status: str = None, limit: int = 10) -> DatabaseResult: """訂單查詢""" db = None try: db = get_database_session() query = db.query(Order) # 用戶篩選 if user_id: query = query.filter(Order.user_id == user_id) # 狀態篩選 if status: query = query.filter(Order.status.ilike(f"%{status}%")) orders = query.limit(limit).all() # 轉換為字典格式 order_data = [] for order in orders: order_info = { "order_id": order.order_id, "user_id": order.user_id, "status": order.status, "total_amount": float(order.total_amount) if order.total_amount else 0.0, "created_at": order.created_at.isoformat() if order.created_at else None, "updated_at": order.updated_at.isoformat() if order.updated_at else None } order_data.append(order_info) return DatabaseResult( success=True, data=order_data, count=len(order_data) ) except Exception as e: logger.error(f"訂單查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"訂單查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def get_low_stock_products(self, threshold: int = 10) -> DatabaseResult: """低庫存商品查詢""" db = None try: db = get_database_session() # 查詢庫存低於閾值的商品 # 注意:這裡假設 Product 模型有 stock 欄位,需要根據實際情況調整 query = db.query(Product) if hasattr(Product, 'stock'): query = query.filter(Product.stock <= threshold) low_stock_products = query.all() data = [] for product in low_stock_products: product_data = { "product_name": product.name, "current_stock": product.stock if hasattr(product, 'stock') else 0, "category": product.category, "price": float(product.price) if product.price else 0.0, "status": "庫存不足" } data.append(product_data) return DatabaseResult( success=True, data=data, count=len(data) ) except Exception as e: logger.error(f"低庫存查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"低庫存查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def get_business_summary(self) -> DatabaseResult: """業務摘要統計""" db = None try: db = get_database_session() # 基本統計 total_products = db.query(Product).count() total_orders = db.query(Order).count() total_users = db.query(User).count() # 計算低庫存商品數量 low_stock_count = 0 if hasattr(Product, 'stock'): low_stock_count = db.query(Product).filter(Product.stock <= 10).count() # 準備摘要資料 summary_data = [{ "total_products": total_products, "total_orders": total_orders, "total_users": total_users, "low_stock_items": low_stock_count, "report_date": datetime.now().isoformat() }] return DatabaseResult( success=True, data=summary_data, count=1 ) except Exception as e: logger.error(f"業務摘要查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"業務摘要查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def process_natural_language_query(self, user_message: str, user_id: str = None) -> DatabaseResult: """處理自然語言查詢的統一入口""" try: # 簡單的關鍵字匹配來判斷查詢意圖 message_lower = user_message.lower() # 商品查詢 if any(keyword in message_lower for keyword in ['商品', '產品', '貨品', '物品']): if any(keyword in message_lower for keyword in ['庫存', '存貨', '剩餘']): # 庫存查詢 product_name = self._extract_product_name(user_message) return self.check_inventory(product_name=product_name) else: # 一般商品查詢 product_name = self._extract_product_name(user_message) return self.search_products(query_text=product_name) # 訂單查詢 elif any(keyword in message_lower for keyword in ['訂單', '訂購', '購買']): return self.search_orders(user_id=user_id) # 低庫存查詢 elif any(keyword in message_lower for keyword in ['低庫存', '缺貨', '不足']): return self.get_low_stock_products() # 統計查詢 elif any(keyword in message_lower for keyword in ['統計', '摘要', '總計', '報表']): return self.get_business_summary() else: # 預設進行商品搜尋 return self.search_products(query_text=user_message) except Exception as e: logger.error(f"自然語言查詢處理錯誤: {str(e)}") return DatabaseResult( success=False, error=f"查詢處理失敗: {str(e)}" ) def _extract_product_name(self, message: str) -> str: """從訊息中提取商品名稱""" # 簡單的商品名稱提取邏輯 # 移除常見的查詢關鍵字 keywords_to_remove = ['查詢', '搜尋', '找', '商品', '產品', '庫存', '有沒有', '請問'] cleaned_message = message for keyword in keywords_to_remove: cleaned_message = cleaned_message.replace(keyword, '') return cleaned_message.strip()