mickeywu520's picture
修正訂單查詢問題
68c8519
import os
from typing import Dict, Any, List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import text, and_, or_
from backend.models.schemas import NLPAnalysisResult, DatabaseResult, QueryType
from backend.config import settings
from backend.database.connection import get_database_session, close_database_session
from backend.database.models import User, Product, PurchaseOrder, SalesOrder, LineMessage, Category
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class DatabaseService:
"""資料庫服務"""
def __init__(self):
self.model_mapping = {
"users": User,
"products": Product,
"purchase_orders": PurchaseOrder,
"sales_orders": SalesOrder,
"line_messages": LineMessage
}
# 定義資料表結構
self.table_schemas = {
"users": {
"model": User,
"searchable": ["name", "email"]
},
"purchase_orders": {
"model": PurchaseOrder,
"searchable": ["po_number", "status"]
},
"sales_orders": {
"model": SalesOrder,
"searchable": ["so_number", "status"]
},
"products": {
"model": Product,
"searchable": ["productName", "productCode", "barcode"]
},
"line_messages": {
"model": LineMessage,
"searchable": ["message", "user_id"]
}
}
def execute_query(self, analysis_result: NLPAnalysisResult) -> DatabaseResult:
"""執行資料庫查詢"""
try:
query_type = analysis_result.query_type
parameters = analysis_result.parameters
if query_type == QueryType.SEARCH:
return self._execute_search(parameters)
elif query_type == QueryType.CREATE:
return self._execute_create(parameters)
elif query_type == QueryType.UPDATE:
return self._execute_update(parameters)
elif query_type == QueryType.DELETE:
return self._execute_delete(parameters)
elif query_type == QueryType.ANALYTICS:
return self._execute_analytics(parameters)
else:
return DatabaseResult(
success=False,
error="不支援的查詢類型"
)
except Exception as e:
logger.error(f"資料庫查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"資料庫查詢失敗: {str(e)}"
)
def _execute_search(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行搜尋查詢"""
table_name = parameters.get("table", "users")
conditions = parameters.get("conditions", {})
limit = parameters.get("limit", 10)
offset = parameters.get("offset", 0)
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立查詢
query = db.query(model)
# 添加條件
for field, value in conditions.items():
if hasattr(model, field):
column = getattr(model, field)
if isinstance(value, dict):
# 處理範圍查詢
if "gte" in value:
query = query.filter(column >= value["gte"])
if "lte" in value:
query = query.filter(column <= value["lte"])
if "gt" in value:
query = query.filter(column > value["gt"])
if "lt" in value:
query = query.filter(column < value["lt"])
else:
# 處理精確匹配或模糊搜尋
searchable_fields = self.table_schemas.get(table_name, {}).get("searchable", [])
if field in searchable_fields:
query = query.filter(column.ilike(f"%{value}%"))
else:
query = query.filter(column == value)
# 添加限制和偏移
query = query.offset(offset).limit(limit)
# 執行查詢
results = query.all()
# 轉換為字典格式
data = []
for result in results:
item = {}
for column in result.__table__.columns:
value = getattr(result, column.name)
# 處理特殊類型
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
data.append(item)
return DatabaseResult(
success=True,
data=data,
count=len(data)
)
except Exception as e:
logger.error(f"搜尋查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"搜尋失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_create(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行建立查詢"""
table_name = parameters.get("table", "users")
data = parameters.get("data", {})
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立新記錄
new_record = model(**data)
db.add(new_record)
db.commit()
db.refresh(new_record)
# 轉換為字典格式
result_data = {}
for column in new_record.__table__.columns:
value = getattr(new_record, column.name)
if hasattr(value, 'isoformat'): # datetime
result_data[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
result_data[column.name] = value
else:
result_data[column.name] = str(value)
return DatabaseResult(
success=True,
data=[result_data],
count=1
)
except Exception as e:
if db:
db.rollback()
logger.error(f"建立查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"建立失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_update(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行更新查詢"""
table_name = parameters.get("table", "users")
conditions = parameters.get("conditions", {})
data = parameters.get("data", {})
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立查詢
query = db.query(model)
# 添加條件
for field, value in conditions.items():
if hasattr(model, field):
column = getattr(model, field)
query = query.filter(column == value)
# 執行更新
updated_count = query.update(data)
db.commit()
# 取得更新後的資料
updated_records = query.all()
# 轉換為字典格式
result_data = []
for record in updated_records:
item = {}
for column in record.__table__.columns:
value = getattr(record, column.name)
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
result_data.append(item)
return DatabaseResult(
success=True,
data=result_data,
count=updated_count
)
except Exception as e:
if db:
db.rollback()
logger.error(f"更新查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"更新失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_delete(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行刪除查詢"""
table_name = parameters.get("table", "users")
conditions = parameters.get("conditions", {})
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立查詢
query = db.query(model)
# 添加條件
for field, value in conditions.items():
if hasattr(model, field):
column = getattr(model, field)
query = query.filter(column == value)
# 先取得要刪除的記錄
records_to_delete = query.all()
# 轉換為字典格式
result_data = []
for record in records_to_delete:
item = {}
for column in record.__table__.columns:
value = getattr(record, column.name)
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
result_data.append(item)
# 執行刪除
deleted_count = query.delete()
db.commit()
return DatabaseResult(
success=True,
data=result_data,
count=deleted_count
)
except Exception as e:
if db:
db.rollback()
logger.error(f"刪除查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"刪除失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_analytics(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行分析查詢"""
table_name = parameters.get("table", "users")
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 執行計數查詢
total_count = db.query(model).count()
# 取得範例資料
sample_records = db.query(model).limit(5).all()
# 轉換範例資料為字典格式
sample_data = []
for record in sample_records:
item = {}
for column in record.__table__.columns:
value = getattr(record, column.name)
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
sample_data.append(item)
analytics_data = [{
"table": table_name,
"count": total_count,
"data_sample": sample_data
}]
return DatabaseResult(
success=True,
data=analytics_data,
count=1
)
except Exception as e:
logger.error(f"分析查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"分析失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def save_message(self, user_id: str, message: str, message_type: str = "text") -> bool:
"""儲存訊息記錄 - 暫時停用,僅記錄到日誌"""
try:
# 暫時停用資料庫記錄,只記錄到日誌
logger.info(f"訊息記錄 - 用戶: {user_id[:10]}..., 類型: {message_type}, 內容: {message[:50]}...")
return True
except Exception as e:
logger.error(f"訊息記錄錯誤: {str(e)}")
return False
def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]:
"""取得用戶資料"""
db = None
try:
db = get_database_session()
user = db.query(User).filter(User.user_id == user_id).first()
if not user:
return None
# 轉換為字典格式
user_data = {}
for column in user.__table__.columns:
value = getattr(user, column.name)
if hasattr(value, 'isoformat'): # datetime
user_data[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
user_data[column.name] = value
else:
user_data[column.name] = str(value)
return user_data
except Exception as e:
logger.error(f"取得用戶資料錯誤: {str(e)}")
return None
finally:
if db:
close_database_session(db)
def create_user_profile(self, user_data: Dict[str, Any]) -> bool:
"""建立用戶資料"""
db = None
try:
db = get_database_session()
new_user = User(**user_data)
db.add(new_user)
db.commit()
return True
except Exception as e:
if db:
db.rollback()
logger.error(f"建立用戶資料錯誤: {str(e)}")
return False
finally:
if db:
close_database_session(db)
# ==================== 業務專用查詢方法 ====================
def search_products(self, query_text: str = None, category: str = None,
warehouse: str = None, limit: int = 10) -> DatabaseResult:
"""商品查詢 - 支援自然語言查詢"""
db = None
try:
db = get_database_session()
# 建立基本查詢
query = db.query(Product)
# 如果有查詢文字,進行模糊搜尋
if query_text:
search_filter = or_(
Product.productName.ilike(f"%{query_text}%"),
Product.productCode.ilike(f"%{query_text}%")
)
query = query.filter(search_filter)
# 類別篩選
if category:
query = query.filter(Product.category.ilike(f"%{category}%"))
# 執行查詢
products = query.limit(limit).all()
# 轉換為字典格式
data = []
for product in products:
product_data = {
"id": product.id,
"product_code": product.productCode,
"product_name": product.productName,
"unit": product.unit,
"warehouse": product.warehouse,
"stock": product.stock,
"category_name": product.category.name if product.category else None,
"created_at": product.createdAt.isoformat() if product.createdAt else None
}
data.append(product_data)
return DatabaseResult(
success=True,
data=data,
count=len(data)
)
except Exception as e:
logger.error(f"商品查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"商品查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def check_inventory(self, product_name: str = None, category: str = None) -> DatabaseResult:
"""庫存查詢"""
db = None
try:
db = get_database_session()
# 建立查詢
query = db.query(Product)
# 根據商品名稱查詢
if product_name:
query = query.filter(Product.productName.ilike(f"%{product_name}%"))
# 類別篩選 (通過關聯查詢)
if category:
query = query.join(Product.category).filter(Category.name.ilike(f"%{category}%"))
products = query.all()
# 準備庫存資料
inventory_data = []
for product in products:
stock_info = {
"product_name": product.productName,
"product_code": product.productCode,
"category": product.category.name if product.category else None,
"current_stock": product.stock,
"unit": product.unit,
"warehouse": product.warehouse,
"last_updated": product.updatedAt.isoformat() if product.updatedAt else None
}
inventory_data.append(stock_info)
return DatabaseResult(
success=True,
data=inventory_data,
count=len(inventory_data)
)
except Exception as e:
logger.error(f"庫存查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"庫存查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def search_orders(self, user_id: str = None, status: str = None, order_number: str = None,
customer_id: str = None, date_from: str = None, date_to: str = None,
limit: int = 10) -> DatabaseResult:
"""增強的訂單查詢功能"""
db = None
try:
db = get_database_session()
# 使用 SalesOrder 作為主要的訂單查詢,預載入關聯資料
query = db.query(SalesOrder).options(
joinedload(SalesOrder.customer),
joinedload(SalesOrder.salesperson)
)
# 訂單編號查詢 (支援 SO- 格式)
if order_number:
if not order_number.startswith('SO-'):
# 如果用戶只輸入部分編號,自動補全
order_number = f"SO-{order_number}"
query = query.filter(SalesOrder.so_number.ilike(f"%{order_number}%"))
# 用戶篩選 (銷售人員)
if user_id:
query = query.filter(SalesOrder.salesperson_id == user_id)
# 客戶篩選
if customer_id:
query = query.filter(SalesOrder.customer_id == customer_id)
# 狀態篩選 (支援中文和英文)
if status:
status_mapping = {
'已交付': 'DELIVERED',
'已出貨': 'SHIPPED',
'處理中': 'PROCESSING',
'已取消': 'CANCELLED',
'待處理': 'PENDING'
}
# 檢查是否為中文狀態,轉換為英文
english_status = status_mapping.get(status, status)
query = query.filter(SalesOrder.status.ilike(f"%{english_status}%"))
# 日期範圍篩選
if date_from:
query = query.filter(SalesOrder.sales_date >= date_from)
if date_to:
query = query.filter(SalesOrder.sales_date <= date_to)
# 按日期降序排列,最新的在前面
query = query.order_by(SalesOrder.sales_date.desc())
orders = query.limit(limit).all()
# 轉換為字典格式
order_data = []
for order in orders:
# 狀態中文化
status_display = self._get_order_status_display(order.status)
order_info = {
"order_id": order.so_number,
"sales_date": order.sales_date.isoformat() if order.sales_date else None,
"customer_id": order.customer_id,
"customer_name": order.customer.customerName if order.customer else "未知客戶",
"salesperson_id": order.salesperson_id,
"salesperson_name": order.salesperson.name if order.salesperson else "未指定",
"status": order.status,
"status_display": status_display,
"payment_term": order.payment_term,
"subtotal": float(order.subtotal) if order.subtotal else 0.0,
"tax_amount": float(order.tax_amount) if order.tax_amount else 0.0,
"discount_amount": float(order.discount_amount) if order.discount_amount else 0.0,
"total_amount": float(order.total_amount) if order.total_amount else 0.0,
"notes": order.notes or "",
"created_at": order.created_at.isoformat() if order.created_at else None,
"updated_at": order.updated_at.isoformat() if order.updated_at else None
}
order_data.append(order_info)
return DatabaseResult(
success=True,
data=order_data,
count=len(order_data)
)
except Exception as e:
logger.error(f"訂單查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"訂單查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _get_order_status_display(self, status) -> str:
"""將訂單狀態轉換為中文顯示"""
status_mapping = {
'DELIVERED': '已交付',
'SHIPPED': '已出貨',
'PROCESSING': '處理中',
'CANCELLED': '已取消',
'PENDING': '待處理',
'CONFIRMED': '已確認',
'COMPLETED': '已完成'
}
if hasattr(status, 'value'):
return status_mapping.get(status.value, str(status.value))
else:
return status_mapping.get(str(status), str(status))
def get_low_stock_products(self, threshold: int = 10) -> DatabaseResult:
"""低庫存商品查詢"""
db = None
try:
db = get_database_session()
# 查詢庫存低於閾值的商品
# 注意:這裡假設 Product 模型有 stock 欄位,需要根據實際情況調整
query = db.query(Product)
if hasattr(Product, 'stock'):
query = query.filter(Product.stock <= threshold)
low_stock_products = query.all()
data = []
for product in low_stock_products:
product_data = {
"product_name": product.productName,
"product_code": product.productCode,
"current_stock": product.stock,
"unit": product.unit,
"warehouse": product.warehouse,
"category": product.category.name if product.category else None,
"status": "庫存不足"
}
data.append(product_data)
return DatabaseResult(
success=True,
data=data,
count=len(data)
)
except Exception as e:
logger.error(f"低庫存查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"低庫存查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def get_business_summary(self) -> DatabaseResult:
"""業務摘要統計"""
db = None
try:
db = get_database_session()
# 基本統計
total_products = db.query(Product).count()
total_sales_orders = db.query(SalesOrder).count()
total_purchase_orders = db.query(PurchaseOrder).count()
total_users = db.query(User).count()
# 計算低庫存商品數量
low_stock_count = db.query(Product).filter(Product.stock <= 10).count()
# 準備摘要資料
summary_data = [{
"total_products": total_products,
"total_sales_orders": total_sales_orders,
"total_purchase_orders": total_purchase_orders,
"total_users": total_users,
"low_stock_items": low_stock_count,
"report_date": datetime.now().isoformat()
}]
return DatabaseResult(
success=True,
data=summary_data,
count=1
)
except Exception as e:
logger.error(f"業務摘要查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"業務摘要查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def process_natural_language_query(self, user_message: str, user_id: str = None) -> DatabaseResult:
"""處理自然語言查詢的統一入口"""
try:
# 簡單的關鍵字匹配來判斷查詢意圖
message_lower = user_message.lower()
# 商品查詢
if any(keyword in message_lower for keyword in ['商品', '產品', '貨品', '物品']):
if any(keyword in message_lower for keyword in ['庫存', '存貨', '剩餘']):
# 庫存查詢
product_name = self._extract_product_name(user_message)
return self.check_inventory(product_name=product_name)
else:
# 一般商品查詢
product_name = self._extract_product_name(user_message)
return self.search_products(query_text=product_name)
# 訂單查詢
elif any(keyword in message_lower for keyword in ['訂單', '訂購', '購買', 'so-']):
# 提取訂單編號
order_number = self._extract_order_number(user_message)
# 提取狀態
status = self._extract_order_status(user_message)
return self.search_orders(user_id=user_id, order_number=order_number, status=status)
# 低庫存查詢
elif any(keyword in message_lower for keyword in ['低庫存', '缺貨', '不足']):
return self.get_low_stock_products()
# 統計查詢
elif any(keyword in message_lower for keyword in ['統計', '摘要', '總計', '報表']):
return self.get_business_summary()
else:
# 預設進行商品搜尋
return self.search_products(query_text=user_message)
except Exception as e:
logger.error(f"自然語言查詢處理錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"查詢處理失敗: {str(e)}"
)
def _extract_product_name(self, message: str) -> str:
"""從訊息中提取商品名稱"""
# 簡單的商品名稱提取邏輯
# 移除常見的查詢關鍵字
keywords_to_remove = ['查詢', '搜尋', '找', '商品', '產品', '庫存', '有沒有', '請問']
cleaned_message = message
for keyword in keywords_to_remove:
cleaned_message = cleaned_message.replace(keyword, '')
return cleaned_message.strip()
def _extract_order_number(self, message: str) -> str:
"""從訊息中提取訂單編號"""
import re
# 尋找 SO- 格式的訂單編號
so_pattern = r'SO-\d{8}-\d{3}'
match = re.search(so_pattern, message.upper())
if match:
return match.group()
# 尋找日期格式的編號 (20250706-001)
date_pattern = r'\d{8}-\d{3}'
match = re.search(date_pattern, message)
if match:
return f"SO-{match.group()}"
# 尋找純數字編號
number_pattern = r'\d{6,}'
match = re.search(number_pattern, message)
if match:
return match.group()
return None
def _extract_order_status(self, message: str) -> str:
"""從訊息中提取訂單狀態"""
message_lower = message.lower()
# 中文狀態關鍵字
status_keywords = {
'已交付': 'DELIVERED',
'已出貨': 'SHIPPED',
'出貨': 'SHIPPED',
'交付': 'DELIVERED',
'處理中': 'PROCESSING',
'已取消': 'CANCELLED',
'取消': 'CANCELLED',
'待處理': 'PENDING',
'已確認': 'CONFIRMED',
'已完成': 'COMPLETED'
}
for chinese, english in status_keywords.items():
if chinese in message_lower:
return english
# 英文狀態關鍵字
english_keywords = ['delivered', 'shipped', 'processing', 'cancelled', 'pending', 'confirmed', 'completed']
for keyword in english_keywords:
if keyword in message_lower:
return keyword.upper()
return None