import os from typing import Dict, Any, List, Optional from sqlalchemy.orm import Session from sqlalchemy import text, and_, or_ from backend.models.schemas import NLPAnalysisResult, DatabaseResult, QueryType from backend.config import settings from backend.database.connection import get_database_session, close_database_session from backend.database.models import User, Product, PurchaseOrder, SalesOrder, LineMessage, Category from datetime import datetime import logging logger = logging.getLogger(__name__) class DatabaseService: """資料庫服務""" def __init__(self): self.model_mapping = { "users": User, "products": Product, "purchase_orders": PurchaseOrder, "sales_orders": SalesOrder, "line_messages": LineMessage } # 定義資料表結構 self.table_schemas = { "users": { "model": User, "searchable": ["name", "email"] }, "purchase_orders": { "model": PurchaseOrder, "searchable": ["po_number", "status"] }, "sales_orders": { "model": SalesOrder, "searchable": ["so_number", "status"] }, "products": { "model": Product, "searchable": ["productName", "productCode", "barcode"] }, "line_messages": { "model": LineMessage, "searchable": ["message", "user_id"] } } def execute_query(self, analysis_result: NLPAnalysisResult) -> DatabaseResult: """執行資料庫查詢""" try: query_type = analysis_result.query_type parameters = analysis_result.parameters if query_type == QueryType.SEARCH: return self._execute_search(parameters) elif query_type == QueryType.CREATE: return self._execute_create(parameters) elif query_type == QueryType.UPDATE: return self._execute_update(parameters) elif query_type == QueryType.DELETE: return self._execute_delete(parameters) elif query_type == QueryType.ANALYTICS: return self._execute_analytics(parameters) else: return DatabaseResult( success=False, error="不支援的查詢類型" ) except Exception as e: logger.error(f"資料庫查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"資料庫查詢失敗: {str(e)}" ) def _execute_search(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行搜尋查詢""" table_name = parameters.get("table", "users") conditions = parameters.get("conditions", {}) limit = parameters.get("limit", 10) offset = parameters.get("offset", 0) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立查詢 query = db.query(model) # 添加條件 for field, value in conditions.items(): if hasattr(model, field): column = getattr(model, field) if isinstance(value, dict): # 處理範圍查詢 if "gte" in value: query = query.filter(column >= value["gte"]) if "lte" in value: query = query.filter(column <= value["lte"]) if "gt" in value: query = query.filter(column > value["gt"]) if "lt" in value: query = query.filter(column < value["lt"]) else: # 處理精確匹配或模糊搜尋 searchable_fields = self.table_schemas.get(table_name, {}).get("searchable", []) if field in searchable_fields: query = query.filter(column.ilike(f"%{value}%")) else: query = query.filter(column == value) # 添加限制和偏移 query = query.offset(offset).limit(limit) # 執行查詢 results = query.all() # 轉換為字典格式 data = [] for result in results: item = {} for column in result.__table__.columns: value = getattr(result, column.name) # 處理特殊類型 if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) data.append(item) return DatabaseResult( success=True, data=data, count=len(data) ) except Exception as e: logger.error(f"搜尋查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"搜尋失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_create(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行建立查詢""" table_name = parameters.get("table", "users") data = parameters.get("data", {}) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立新記錄 new_record = model(**data) db.add(new_record) db.commit() db.refresh(new_record) # 轉換為字典格式 result_data = {} for column in new_record.__table__.columns: value = getattr(new_record, column.name) if hasattr(value, 'isoformat'): # datetime result_data[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: result_data[column.name] = value else: result_data[column.name] = str(value) return DatabaseResult( success=True, data=[result_data], count=1 ) except Exception as e: if db: db.rollback() logger.error(f"建立查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"建立失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_update(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行更新查詢""" table_name = parameters.get("table", "users") conditions = parameters.get("conditions", {}) data = parameters.get("data", {}) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立查詢 query = db.query(model) # 添加條件 for field, value in conditions.items(): if hasattr(model, field): column = getattr(model, field) query = query.filter(column == value) # 執行更新 updated_count = query.update(data) db.commit() # 取得更新後的資料 updated_records = query.all() # 轉換為字典格式 result_data = [] for record in updated_records: item = {} for column in record.__table__.columns: value = getattr(record, column.name) if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) result_data.append(item) return DatabaseResult( success=True, data=result_data, count=updated_count ) except Exception as e: if db: db.rollback() logger.error(f"更新查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"更新失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_delete(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行刪除查詢""" table_name = parameters.get("table", "users") conditions = parameters.get("conditions", {}) db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 建立查詢 query = db.query(model) # 添加條件 for field, value in conditions.items(): if hasattr(model, field): column = getattr(model, field) query = query.filter(column == value) # 先取得要刪除的記錄 records_to_delete = query.all() # 轉換為字典格式 result_data = [] for record in records_to_delete: item = {} for column in record.__table__.columns: value = getattr(record, column.name) if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) result_data.append(item) # 執行刪除 deleted_count = query.delete() db.commit() return DatabaseResult( success=True, data=result_data, count=deleted_count ) except Exception as e: if db: db.rollback() logger.error(f"刪除查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"刪除失敗: {str(e)}" ) finally: if db: close_database_session(db) def _execute_analytics(self, parameters: Dict[str, Any]) -> DatabaseResult: """執行分析查詢""" table_name = parameters.get("table", "users") db = None try: db = get_database_session() # 取得對應的模型 model = self.table_schemas.get(table_name, {}).get("model") if not model: return DatabaseResult( success=False, error=f"不支援的資料表: {table_name}" ) # 執行計數查詢 total_count = db.query(model).count() # 取得範例資料 sample_records = db.query(model).limit(5).all() # 轉換範例資料為字典格式 sample_data = [] for record in sample_records: item = {} for column in record.__table__.columns: value = getattr(record, column.name) if hasattr(value, 'isoformat'): # datetime item[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: item[column.name] = value else: item[column.name] = str(value) sample_data.append(item) analytics_data = [{ "table": table_name, "count": total_count, "data_sample": sample_data }] return DatabaseResult( success=True, data=analytics_data, count=1 ) except Exception as e: logger.error(f"分析查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"分析失敗: {str(e)}" ) finally: if db: close_database_session(db) def save_message(self, user_id: str, message: str, message_type: str = "text") -> bool: """儲存訊息記錄 - 暫時停用,僅記錄到日誌""" try: # 暫時停用資料庫記錄,只記錄到日誌 logger.info(f"訊息記錄 - 用戶: {user_id[:10]}..., 類型: {message_type}, 內容: {message[:50]}...") return True except Exception as e: logger.error(f"訊息記錄錯誤: {str(e)}") return False def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]: """取得用戶資料""" db = None try: db = get_database_session() user = db.query(User).filter(User.user_id == user_id).first() if not user: return None # 轉換為字典格式 user_data = {} for column in user.__table__.columns: value = getattr(user, column.name) if hasattr(value, 'isoformat'): # datetime user_data[column.name] = value.isoformat() elif isinstance(value, (int, float, str, bool)) or value is None: user_data[column.name] = value else: user_data[column.name] = str(value) return user_data except Exception as e: logger.error(f"取得用戶資料錯誤: {str(e)}") return None finally: if db: close_database_session(db) def create_user_profile(self, user_data: Dict[str, Any]) -> bool: """建立用戶資料""" db = None try: db = get_database_session() new_user = User(**user_data) db.add(new_user) db.commit() return True except Exception as e: if db: db.rollback() logger.error(f"建立用戶資料錯誤: {str(e)}") return False finally: if db: close_database_session(db) # ==================== 業務專用查詢方法 ==================== def search_products(self, query_text: str = None, category: str = None, warehouse: str = None, limit: int = 10) -> DatabaseResult: """商品查詢 - 支援自然語言查詢""" db = None try: db = get_database_session() # 建立基本查詢 query = db.query(Product) # 如果有查詢文字,進行模糊搜尋 if query_text: search_filter = or_( Product.productName.ilike(f"%{query_text}%"), Product.productCode.ilike(f"%{query_text}%") ) query = query.filter(search_filter) # 類別篩選 if category: query = query.filter(Product.category.ilike(f"%{category}%")) # 執行查詢 products = query.limit(limit).all() # 轉換為字典格式 data = [] for product in products: product_data = { "id": product.id, "product_code": product.productCode, "product_name": product.productName, "unit": product.unit, "warehouse": product.warehouse, "stock": product.stock, "category_name": product.category.name if product.category else None, "created_at": product.createdAt.isoformat() if product.createdAt else None } data.append(product_data) return DatabaseResult( success=True, data=data, count=len(data) ) except Exception as e: logger.error(f"商品查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"商品查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def check_inventory(self, product_name: str = None, category: str = None) -> DatabaseResult: """庫存查詢""" db = None try: db = get_database_session() # 建立查詢 query = db.query(Product) # 根據商品名稱查詢 if product_name: query = query.filter(Product.productName.ilike(f"%{product_name}%")) # 類別篩選 (通過關聯查詢) if category: query = query.join(Product.category).filter(Category.name.ilike(f"%{category}%")) products = query.all() # 準備庫存資料 inventory_data = [] for product in products: stock_info = { "product_name": product.productName, "product_code": product.productCode, "category": product.category.name if product.category else None, "current_stock": product.stock, "unit": product.unit, "warehouse": product.warehouse, "last_updated": product.updatedAt.isoformat() if product.updatedAt else None } inventory_data.append(stock_info) return DatabaseResult( success=True, data=inventory_data, count=len(inventory_data) ) except Exception as e: logger.error(f"庫存查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"庫存查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def search_orders(self, user_id: str = None, status: str = None, order_number: str = None, customer_id: str = None, date_from: str = None, date_to: str = None, limit: int = 10) -> DatabaseResult: """增強的訂單查詢功能""" db = None try: db = get_database_session() # 使用 SalesOrder 作為主要的訂單查詢,預載入關聯資料 query = db.query(SalesOrder).options( joinedload(SalesOrder.customer), joinedload(SalesOrder.salesperson) ) # 訂單編號查詢 (支援 SO- 格式) if order_number: if not order_number.startswith('SO-'): # 如果用戶只輸入部分編號,自動補全 order_number = f"SO-{order_number}" query = query.filter(SalesOrder.so_number.ilike(f"%{order_number}%")) # 用戶篩選 (銷售人員) if user_id: query = query.filter(SalesOrder.salesperson_id == user_id) # 客戶篩選 if customer_id: query = query.filter(SalesOrder.customer_id == customer_id) # 狀態篩選 (支援中文和英文) if status: status_mapping = { '已交付': 'DELIVERED', '已出貨': 'SHIPPED', '處理中': 'PROCESSING', '已取消': 'CANCELLED', '待處理': 'PENDING' } # 檢查是否為中文狀態,轉換為英文 english_status = status_mapping.get(status, status) query = query.filter(SalesOrder.status.ilike(f"%{english_status}%")) # 日期範圍篩選 if date_from: query = query.filter(SalesOrder.sales_date >= date_from) if date_to: query = query.filter(SalesOrder.sales_date <= date_to) # 按日期降序排列,最新的在前面 query = query.order_by(SalesOrder.sales_date.desc()) orders = query.limit(limit).all() # 轉換為字典格式 order_data = [] for order in orders: # 狀態中文化 status_display = self._get_order_status_display(order.status) order_info = { "order_id": order.so_number, "sales_date": order.sales_date.isoformat() if order.sales_date else None, "customer_id": order.customer_id, "customer_name": order.customer.customerName if order.customer else "未知客戶", "salesperson_id": order.salesperson_id, "salesperson_name": order.salesperson.name if order.salesperson else "未指定", "status": order.status, "status_display": status_display, "payment_term": order.payment_term, "subtotal": float(order.subtotal) if order.subtotal else 0.0, "tax_amount": float(order.tax_amount) if order.tax_amount else 0.0, "discount_amount": float(order.discount_amount) if order.discount_amount else 0.0, "total_amount": float(order.total_amount) if order.total_amount else 0.0, "notes": order.notes or "", "created_at": order.created_at.isoformat() if order.created_at else None, "updated_at": order.updated_at.isoformat() if order.updated_at else None } order_data.append(order_info) return DatabaseResult( success=True, data=order_data, count=len(order_data) ) except Exception as e: logger.error(f"訂單查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"訂單查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def _get_order_status_display(self, status) -> str: """將訂單狀態轉換為中文顯示""" status_mapping = { 'DELIVERED': '已交付', 'SHIPPED': '已出貨', 'PROCESSING': '處理中', 'CANCELLED': '已取消', 'PENDING': '待處理', 'CONFIRMED': '已確認', 'COMPLETED': '已完成' } if hasattr(status, 'value'): return status_mapping.get(status.value, str(status.value)) else: return status_mapping.get(str(status), str(status)) def get_low_stock_products(self, threshold: int = 10) -> DatabaseResult: """低庫存商品查詢""" db = None try: db = get_database_session() # 查詢庫存低於閾值的商品 # 注意:這裡假設 Product 模型有 stock 欄位,需要根據實際情況調整 query = db.query(Product) if hasattr(Product, 'stock'): query = query.filter(Product.stock <= threshold) low_stock_products = query.all() data = [] for product in low_stock_products: product_data = { "product_name": product.productName, "product_code": product.productCode, "current_stock": product.stock, "unit": product.unit, "warehouse": product.warehouse, "category": product.category.name if product.category else None, "status": "庫存不足" } data.append(product_data) return DatabaseResult( success=True, data=data, count=len(data) ) except Exception as e: logger.error(f"低庫存查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"低庫存查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def get_business_summary(self) -> DatabaseResult: """業務摘要統計""" db = None try: db = get_database_session() # 基本統計 total_products = db.query(Product).count() total_sales_orders = db.query(SalesOrder).count() total_purchase_orders = db.query(PurchaseOrder).count() total_users = db.query(User).count() # 計算低庫存商品數量 low_stock_count = db.query(Product).filter(Product.stock <= 10).count() # 準備摘要資料 summary_data = [{ "total_products": total_products, "total_sales_orders": total_sales_orders, "total_purchase_orders": total_purchase_orders, "total_users": total_users, "low_stock_items": low_stock_count, "report_date": datetime.now().isoformat() }] return DatabaseResult( success=True, data=summary_data, count=1 ) except Exception as e: logger.error(f"業務摘要查詢錯誤: {str(e)}") return DatabaseResult( success=False, error=f"業務摘要查詢失敗: {str(e)}" ) finally: if db: close_database_session(db) def process_natural_language_query(self, user_message: str, user_id: str = None) -> DatabaseResult: """處理自然語言查詢的統一入口""" try: # 簡單的關鍵字匹配來判斷查詢意圖 message_lower = user_message.lower() # 商品查詢 if any(keyword in message_lower for keyword in ['商品', '產品', '貨品', '物品']): if any(keyword in message_lower for keyword in ['庫存', '存貨', '剩餘']): # 庫存查詢 product_name = self._extract_product_name(user_message) return self.check_inventory(product_name=product_name) else: # 一般商品查詢 product_name = self._extract_product_name(user_message) return self.search_products(query_text=product_name) # 訂單查詢 elif any(keyword in message_lower for keyword in ['訂單', '訂購', '購買', 'so-']): # 提取訂單編號 order_number = self._extract_order_number(user_message) # 提取狀態 status = self._extract_order_status(user_message) return self.search_orders(user_id=user_id, order_number=order_number, status=status) # 低庫存查詢 elif any(keyword in message_lower for keyword in ['低庫存', '缺貨', '不足']): return self.get_low_stock_products() # 統計查詢 elif any(keyword in message_lower for keyword in ['統計', '摘要', '總計', '報表']): return self.get_business_summary() else: # 預設進行商品搜尋 return self.search_products(query_text=user_message) except Exception as e: logger.error(f"自然語言查詢處理錯誤: {str(e)}") return DatabaseResult( success=False, error=f"查詢處理失敗: {str(e)}" ) def _extract_product_name(self, message: str) -> str: """從訊息中提取商品名稱""" # 簡單的商品名稱提取邏輯 # 移除常見的查詢關鍵字 keywords_to_remove = ['查詢', '搜尋', '找', '商品', '產品', '庫存', '有沒有', '請問'] cleaned_message = message for keyword in keywords_to_remove: cleaned_message = cleaned_message.replace(keyword, '') return cleaned_message.strip() def _extract_order_number(self, message: str) -> str: """從訊息中提取訂單編號""" import re # 尋找 SO- 格式的訂單編號 so_pattern = r'SO-\d{8}-\d{3}' match = re.search(so_pattern, message.upper()) if match: return match.group() # 尋找日期格式的編號 (20250706-001) date_pattern = r'\d{8}-\d{3}' match = re.search(date_pattern, message) if match: return f"SO-{match.group()}" # 尋找純數字編號 number_pattern = r'\d{6,}' match = re.search(number_pattern, message) if match: return match.group() return None def _extract_order_status(self, message: str) -> str: """從訊息中提取訂單狀態""" message_lower = message.lower() # 中文狀態關鍵字 status_keywords = { '已交付': 'DELIVERED', '已出貨': 'SHIPPED', '出貨': 'SHIPPED', '交付': 'DELIVERED', '處理中': 'PROCESSING', '已取消': 'CANCELLED', '取消': 'CANCELLED', '待處理': 'PENDING', '已確認': 'CONFIRMED', '已完成': 'COMPLETED' } for chinese, english in status_keywords.items(): if chinese in message_lower: return english # 英文狀態關鍵字 english_keywords = ['delivered', 'shipped', 'processing', 'cancelled', 'pending', 'confirmed', 'completed'] for keyword in english_keywords: if keyword in message_lower: return keyword.upper() return None