mickeywu520's picture
first commit
cd9bca9
raw
history blame
27.8 kB
import os
from typing import Dict, Any, List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import text, and_, or_
from backend.models.schemas import NLPAnalysisResult, DatabaseResult, QueryType
from backend.config import settings
from backend.database.connection import get_database_session, close_database_session
from backend.database.models import User, Product, Order, OrderItem, LineMessage, UserSession
import logging
logger = logging.getLogger(__name__)
class DatabaseService:
"""資料庫服務"""
def __init__(self):
self.model_mapping = {
"users": User,
"products": Product,
"orders": Order,
"order_items": OrderItem,
"line_messages": LineMessage,
"user_sessions": UserSession
}
# 定義資料表結構
self.table_schemas = {
"users": {
"model": User,
"searchable": ["name", "email", "display_name"]
},
"orders": {
"model": Order,
"searchable": ["order_id", "status"]
},
"products": {
"model": Product,
"searchable": ["name", "category", "description"]
},
"line_messages": {
"model": LineMessage,
"searchable": ["message", "user_id"]
},
"order_items": {
"model": OrderItem,
"searchable": []
},
"user_sessions": {
"model": UserSession,
"searchable": ["user_id"]
}
}
def execute_query(self, analysis_result: NLPAnalysisResult) -> DatabaseResult:
"""執行資料庫查詢"""
try:
query_type = analysis_result.query_type
parameters = analysis_result.parameters
if query_type == QueryType.SEARCH:
return self._execute_search(parameters)
elif query_type == QueryType.CREATE:
return self._execute_create(parameters)
elif query_type == QueryType.UPDATE:
return self._execute_update(parameters)
elif query_type == QueryType.DELETE:
return self._execute_delete(parameters)
elif query_type == QueryType.ANALYTICS:
return self._execute_analytics(parameters)
else:
return DatabaseResult(
success=False,
error="不支援的查詢類型"
)
except Exception as e:
logger.error(f"資料庫查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"資料庫查詢失敗: {str(e)}"
)
def _execute_search(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行搜尋查詢"""
table_name = parameters.get("table", "users")
conditions = parameters.get("conditions", {})
limit = parameters.get("limit", 10)
offset = parameters.get("offset", 0)
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立查詢
query = db.query(model)
# 添加條件
for field, value in conditions.items():
if hasattr(model, field):
column = getattr(model, field)
if isinstance(value, dict):
# 處理範圍查詢
if "gte" in value:
query = query.filter(column >= value["gte"])
if "lte" in value:
query = query.filter(column <= value["lte"])
if "gt" in value:
query = query.filter(column > value["gt"])
if "lt" in value:
query = query.filter(column < value["lt"])
else:
# 處理精確匹配或模糊搜尋
searchable_fields = self.table_schemas.get(table_name, {}).get("searchable", [])
if field in searchable_fields:
query = query.filter(column.ilike(f"%{value}%"))
else:
query = query.filter(column == value)
# 添加限制和偏移
query = query.offset(offset).limit(limit)
# 執行查詢
results = query.all()
# 轉換為字典格式
data = []
for result in results:
item = {}
for column in result.__table__.columns:
value = getattr(result, column.name)
# 處理特殊類型
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
data.append(item)
return DatabaseResult(
success=True,
data=data,
count=len(data)
)
except Exception as e:
logger.error(f"搜尋查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"搜尋失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_create(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行建立查詢"""
table_name = parameters.get("table", "users")
data = parameters.get("data", {})
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立新記錄
new_record = model(**data)
db.add(new_record)
db.commit()
db.refresh(new_record)
# 轉換為字典格式
result_data = {}
for column in new_record.__table__.columns:
value = getattr(new_record, column.name)
if hasattr(value, 'isoformat'): # datetime
result_data[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
result_data[column.name] = value
else:
result_data[column.name] = str(value)
return DatabaseResult(
success=True,
data=[result_data],
count=1
)
except Exception as e:
if db:
db.rollback()
logger.error(f"建立查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"建立失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_update(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行更新查詢"""
table_name = parameters.get("table", "users")
conditions = parameters.get("conditions", {})
data = parameters.get("data", {})
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立查詢
query = db.query(model)
# 添加條件
for field, value in conditions.items():
if hasattr(model, field):
column = getattr(model, field)
query = query.filter(column == value)
# 執行更新
updated_count = query.update(data)
db.commit()
# 取得更新後的資料
updated_records = query.all()
# 轉換為字典格式
result_data = []
for record in updated_records:
item = {}
for column in record.__table__.columns:
value = getattr(record, column.name)
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
result_data.append(item)
return DatabaseResult(
success=True,
data=result_data,
count=updated_count
)
except Exception as e:
if db:
db.rollback()
logger.error(f"更新查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"更新失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_delete(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行刪除查詢"""
table_name = parameters.get("table", "users")
conditions = parameters.get("conditions", {})
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 建立查詢
query = db.query(model)
# 添加條件
for field, value in conditions.items():
if hasattr(model, field):
column = getattr(model, field)
query = query.filter(column == value)
# 先取得要刪除的記錄
records_to_delete = query.all()
# 轉換為字典格式
result_data = []
for record in records_to_delete:
item = {}
for column in record.__table__.columns:
value = getattr(record, column.name)
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
result_data.append(item)
# 執行刪除
deleted_count = query.delete()
db.commit()
return DatabaseResult(
success=True,
data=result_data,
count=deleted_count
)
except Exception as e:
if db:
db.rollback()
logger.error(f"刪除查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"刪除失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def _execute_analytics(self, parameters: Dict[str, Any]) -> DatabaseResult:
"""執行分析查詢"""
table_name = parameters.get("table", "users")
db = None
try:
db = get_database_session()
# 取得對應的模型
model = self.table_schemas.get(table_name, {}).get("model")
if not model:
return DatabaseResult(
success=False,
error=f"不支援的資料表: {table_name}"
)
# 執行計數查詢
total_count = db.query(model).count()
# 取得範例資料
sample_records = db.query(model).limit(5).all()
# 轉換範例資料為字典格式
sample_data = []
for record in sample_records:
item = {}
for column in record.__table__.columns:
value = getattr(record, column.name)
if hasattr(value, 'isoformat'): # datetime
item[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
item[column.name] = value
else:
item[column.name] = str(value)
sample_data.append(item)
analytics_data = [{
"table": table_name,
"count": total_count,
"data_sample": sample_data
}]
return DatabaseResult(
success=True,
data=analytics_data,
count=1
)
except Exception as e:
logger.error(f"分析查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"分析失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def save_message(self, user_id: str, message: str, message_type: str = "text") -> bool:
"""儲存訊息記錄"""
db = None
try:
db = get_database_session()
new_message = LineMessage(
user_id=user_id,
message=message,
message_type=message_type
)
db.add(new_message)
db.commit()
return True
except Exception as e:
if db:
db.rollback()
logger.error(f"儲存訊息錯誤: {str(e)}")
return False
finally:
if db:
close_database_session(db)
def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]:
"""取得用戶資料"""
db = None
try:
db = get_database_session()
user = db.query(User).filter(User.user_id == user_id).first()
if not user:
return None
# 轉換為字典格式
user_data = {}
for column in user.__table__.columns:
value = getattr(user, column.name)
if hasattr(value, 'isoformat'): # datetime
user_data[column.name] = value.isoformat()
elif isinstance(value, (int, float, str, bool)) or value is None:
user_data[column.name] = value
else:
user_data[column.name] = str(value)
return user_data
except Exception as e:
logger.error(f"取得用戶資料錯誤: {str(e)}")
return None
finally:
if db:
close_database_session(db)
def create_user_profile(self, user_data: Dict[str, Any]) -> bool:
"""建立用戶資料"""
db = None
try:
db = get_database_session()
new_user = User(**user_data)
db.add(new_user)
db.commit()
return True
except Exception as e:
if db:
db.rollback()
logger.error(f"建立用戶資料錯誤: {str(e)}")
return False
finally:
if db:
close_database_session(db)
# ==================== 業務專用查詢方法 ====================
def search_products(self, query_text: str = None, category: str = None,
warehouse: str = None, limit: int = 10) -> DatabaseResult:
"""商品查詢 - 支援自然語言查詢"""
db = None
try:
db = get_database_session()
# 建立基本查詢
query = db.query(Product)
# 如果有查詢文字,進行模糊搜尋
if query_text:
search_filter = or_(
Product.name.ilike(f"%{query_text}%"),
Product.description.ilike(f"%{query_text}%")
)
query = query.filter(search_filter)
# 類別篩選
if category:
query = query.filter(Product.category.ilike(f"%{category}%"))
# 執行查詢
products = query.limit(limit).all()
# 轉換為字典格式
data = []
for product in products:
product_data = {
"id": product.id,
"name": product.name,
"description": product.description,
"price": float(product.price) if product.price else 0.0,
"category": product.category,
"stock": product.stock if hasattr(product, 'stock') else 0,
"created_at": product.created_at.isoformat() if product.created_at else None
}
data.append(product_data)
return DatabaseResult(
success=True,
data=data,
count=len(data)
)
except Exception as e:
logger.error(f"商品查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"商品查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def check_inventory(self, product_name: str = None, category: str = None) -> DatabaseResult:
"""庫存查詢"""
db = None
try:
db = get_database_session()
# 建立查詢
query = db.query(Product)
# 根據商品名稱查詢
if product_name:
query = query.filter(Product.name.ilike(f"%{product_name}%"))
# 類別篩選
if category:
query = query.filter(Product.category.ilike(f"%{category}%"))
products = query.all()
# 準備庫存資料
inventory_data = []
for product in products:
stock_info = {
"product_name": product.name,
"category": product.category,
"current_stock": product.stock if hasattr(product, 'stock') else 0,
"price": float(product.price) if product.price else 0.0,
"description": product.description,
"last_updated": product.updated_at.isoformat() if product.updated_at else None
}
inventory_data.append(stock_info)
return DatabaseResult(
success=True,
data=inventory_data,
count=len(inventory_data)
)
except Exception as e:
logger.error(f"庫存查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"庫存查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def search_orders(self, user_id: str = None, status: str = None, limit: int = 10) -> DatabaseResult:
"""訂單查詢"""
db = None
try:
db = get_database_session()
query = db.query(Order)
# 用戶篩選
if user_id:
query = query.filter(Order.user_id == user_id)
# 狀態篩選
if status:
query = query.filter(Order.status.ilike(f"%{status}%"))
orders = query.limit(limit).all()
# 轉換為字典格式
order_data = []
for order in orders:
order_info = {
"order_id": order.order_id,
"user_id": order.user_id,
"status": order.status,
"total_amount": float(order.total_amount) if order.total_amount else 0.0,
"created_at": order.created_at.isoformat() if order.created_at else None,
"updated_at": order.updated_at.isoformat() if order.updated_at else None
}
order_data.append(order_info)
return DatabaseResult(
success=True,
data=order_data,
count=len(order_data)
)
except Exception as e:
logger.error(f"訂單查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"訂單查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def get_low_stock_products(self, threshold: int = 10) -> DatabaseResult:
"""低庫存商品查詢"""
db = None
try:
db = get_database_session()
# 查詢庫存低於閾值的商品
# 注意:這裡假設 Product 模型有 stock 欄位,需要根據實際情況調整
query = db.query(Product)
if hasattr(Product, 'stock'):
query = query.filter(Product.stock <= threshold)
low_stock_products = query.all()
data = []
for product in low_stock_products:
product_data = {
"product_name": product.name,
"current_stock": product.stock if hasattr(product, 'stock') else 0,
"category": product.category,
"price": float(product.price) if product.price else 0.0,
"status": "庫存不足"
}
data.append(product_data)
return DatabaseResult(
success=True,
data=data,
count=len(data)
)
except Exception as e:
logger.error(f"低庫存查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"低庫存查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def get_business_summary(self) -> DatabaseResult:
"""業務摘要統計"""
db = None
try:
db = get_database_session()
# 基本統計
total_products = db.query(Product).count()
total_orders = db.query(Order).count()
total_users = db.query(User).count()
# 計算低庫存商品數量
low_stock_count = 0
if hasattr(Product, 'stock'):
low_stock_count = db.query(Product).filter(Product.stock <= 10).count()
# 準備摘要資料
summary_data = [{
"total_products": total_products,
"total_orders": total_orders,
"total_users": total_users,
"low_stock_items": low_stock_count,
"report_date": datetime.now().isoformat()
}]
return DatabaseResult(
success=True,
data=summary_data,
count=1
)
except Exception as e:
logger.error(f"業務摘要查詢錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"業務摘要查詢失敗: {str(e)}"
)
finally:
if db:
close_database_session(db)
def process_natural_language_query(self, user_message: str, user_id: str = None) -> DatabaseResult:
"""處理自然語言查詢的統一入口"""
try:
# 簡單的關鍵字匹配來判斷查詢意圖
message_lower = user_message.lower()
# 商品查詢
if any(keyword in message_lower for keyword in ['商品', '產品', '貨品', '物品']):
if any(keyword in message_lower for keyword in ['庫存', '存貨', '剩餘']):
# 庫存查詢
product_name = self._extract_product_name(user_message)
return self.check_inventory(product_name=product_name)
else:
# 一般商品查詢
product_name = self._extract_product_name(user_message)
return self.search_products(query_text=product_name)
# 訂單查詢
elif any(keyword in message_lower for keyword in ['訂單', '訂購', '購買']):
return self.search_orders(user_id=user_id)
# 低庫存查詢
elif any(keyword in message_lower for keyword in ['低庫存', '缺貨', '不足']):
return self.get_low_stock_products()
# 統計查詢
elif any(keyword in message_lower for keyword in ['統計', '摘要', '總計', '報表']):
return self.get_business_summary()
else:
# 預設進行商品搜尋
return self.search_products(query_text=user_message)
except Exception as e:
logger.error(f"自然語言查詢處理錯誤: {str(e)}")
return DatabaseResult(
success=False,
error=f"查詢處理失敗: {str(e)}"
)
def _extract_product_name(self, message: str) -> str:
"""從訊息中提取商品名稱"""
# 簡單的商品名稱提取邏輯
# 移除常見的查詢關鍵字
keywords_to_remove = ['查詢', '搜尋', '找', '商品', '產品', '庫存', '有沒有', '請問']
cleaned_message = message
for keyword in keywords_to_remove:
cleaned_message = cleaned_message.replace(keyword, '')
return cleaned_message.strip()