|
import os |
|
from typing import Dict, Any, List, Optional |
|
from sqlalchemy.orm import Session |
|
from sqlalchemy import text, and_, or_ |
|
from backend.models.schemas import NLPAnalysisResult, DatabaseResult, QueryType |
|
from backend.config import settings |
|
from backend.database.connection import get_database_session, close_database_session |
|
from backend.database.models import User, Product, PurchaseOrder, SalesOrder, LineMessage, Category |
|
from datetime import datetime |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class DatabaseService: |
|
"""資料庫服務""" |
|
|
|
def __init__(self): |
|
self.model_mapping = { |
|
"users": User, |
|
"products": Product, |
|
"purchase_orders": PurchaseOrder, |
|
"sales_orders": SalesOrder, |
|
"line_messages": LineMessage |
|
} |
|
|
|
|
|
self.table_schemas = { |
|
"users": { |
|
"model": User, |
|
"searchable": ["name", "email"] |
|
}, |
|
"purchase_orders": { |
|
"model": PurchaseOrder, |
|
"searchable": ["po_number", "status"] |
|
}, |
|
"sales_orders": { |
|
"model": SalesOrder, |
|
"searchable": ["so_number", "status"] |
|
}, |
|
"products": { |
|
"model": Product, |
|
"searchable": ["productName", "productCode", "barcode"] |
|
}, |
|
"line_messages": { |
|
"model": LineMessage, |
|
"searchable": ["message", "user_id"] |
|
} |
|
} |
|
|
|
def execute_query(self, analysis_result: NLPAnalysisResult) -> DatabaseResult: |
|
"""執行資料庫查詢""" |
|
try: |
|
query_type = analysis_result.query_type |
|
parameters = analysis_result.parameters |
|
|
|
if query_type == QueryType.SEARCH: |
|
return self._execute_search(parameters) |
|
elif query_type == QueryType.CREATE: |
|
return self._execute_create(parameters) |
|
elif query_type == QueryType.UPDATE: |
|
return self._execute_update(parameters) |
|
elif query_type == QueryType.DELETE: |
|
return self._execute_delete(parameters) |
|
elif query_type == QueryType.ANALYTICS: |
|
return self._execute_analytics(parameters) |
|
else: |
|
return DatabaseResult( |
|
success=False, |
|
error="不支援的查詢類型" |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"資料庫查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"資料庫查詢失敗: {str(e)}" |
|
) |
|
|
|
def _execute_search(self, parameters: Dict[str, Any]) -> DatabaseResult: |
|
"""執行搜尋查詢""" |
|
table_name = parameters.get("table", "users") |
|
conditions = parameters.get("conditions", {}) |
|
limit = parameters.get("limit", 10) |
|
offset = parameters.get("offset", 0) |
|
|
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
model = self.table_schemas.get(table_name, {}).get("model") |
|
if not model: |
|
return DatabaseResult( |
|
success=False, |
|
error=f"不支援的資料表: {table_name}" |
|
) |
|
|
|
|
|
query = db.query(model) |
|
|
|
|
|
for field, value in conditions.items(): |
|
if hasattr(model, field): |
|
column = getattr(model, field) |
|
|
|
if isinstance(value, dict): |
|
|
|
if "gte" in value: |
|
query = query.filter(column >= value["gte"]) |
|
if "lte" in value: |
|
query = query.filter(column <= value["lte"]) |
|
if "gt" in value: |
|
query = query.filter(column > value["gt"]) |
|
if "lt" in value: |
|
query = query.filter(column < value["lt"]) |
|
else: |
|
|
|
searchable_fields = self.table_schemas.get(table_name, {}).get("searchable", []) |
|
if field in searchable_fields: |
|
query = query.filter(column.ilike(f"%{value}%")) |
|
else: |
|
query = query.filter(column == value) |
|
|
|
|
|
query = query.offset(offset).limit(limit) |
|
|
|
|
|
results = query.all() |
|
|
|
|
|
data = [] |
|
for result in results: |
|
item = {} |
|
for column in result.__table__.columns: |
|
value = getattr(result, column.name) |
|
|
|
if hasattr(value, 'isoformat'): |
|
item[column.name] = value.isoformat() |
|
elif isinstance(value, (int, float, str, bool)) or value is None: |
|
item[column.name] = value |
|
else: |
|
item[column.name] = str(value) |
|
data.append(item) |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=data, |
|
count=len(data) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"搜尋查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"搜尋失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def _execute_create(self, parameters: Dict[str, Any]) -> DatabaseResult: |
|
"""執行建立查詢""" |
|
table_name = parameters.get("table", "users") |
|
data = parameters.get("data", {}) |
|
|
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
model = self.table_schemas.get(table_name, {}).get("model") |
|
if not model: |
|
return DatabaseResult( |
|
success=False, |
|
error=f"不支援的資料表: {table_name}" |
|
) |
|
|
|
|
|
new_record = model(**data) |
|
db.add(new_record) |
|
db.commit() |
|
db.refresh(new_record) |
|
|
|
|
|
result_data = {} |
|
for column in new_record.__table__.columns: |
|
value = getattr(new_record, column.name) |
|
if hasattr(value, 'isoformat'): |
|
result_data[column.name] = value.isoformat() |
|
elif isinstance(value, (int, float, str, bool)) or value is None: |
|
result_data[column.name] = value |
|
else: |
|
result_data[column.name] = str(value) |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=[result_data], |
|
count=1 |
|
) |
|
|
|
except Exception as e: |
|
if db: |
|
db.rollback() |
|
logger.error(f"建立查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"建立失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def _execute_update(self, parameters: Dict[str, Any]) -> DatabaseResult: |
|
"""執行更新查詢""" |
|
table_name = parameters.get("table", "users") |
|
conditions = parameters.get("conditions", {}) |
|
data = parameters.get("data", {}) |
|
|
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
model = self.table_schemas.get(table_name, {}).get("model") |
|
if not model: |
|
return DatabaseResult( |
|
success=False, |
|
error=f"不支援的資料表: {table_name}" |
|
) |
|
|
|
|
|
query = db.query(model) |
|
|
|
|
|
for field, value in conditions.items(): |
|
if hasattr(model, field): |
|
column = getattr(model, field) |
|
query = query.filter(column == value) |
|
|
|
|
|
updated_count = query.update(data) |
|
db.commit() |
|
|
|
|
|
updated_records = query.all() |
|
|
|
|
|
result_data = [] |
|
for record in updated_records: |
|
item = {} |
|
for column in record.__table__.columns: |
|
value = getattr(record, column.name) |
|
if hasattr(value, 'isoformat'): |
|
item[column.name] = value.isoformat() |
|
elif isinstance(value, (int, float, str, bool)) or value is None: |
|
item[column.name] = value |
|
else: |
|
item[column.name] = str(value) |
|
result_data.append(item) |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=result_data, |
|
count=updated_count |
|
) |
|
|
|
except Exception as e: |
|
if db: |
|
db.rollback() |
|
logger.error(f"更新查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"更新失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def _execute_delete(self, parameters: Dict[str, Any]) -> DatabaseResult: |
|
"""執行刪除查詢""" |
|
table_name = parameters.get("table", "users") |
|
conditions = parameters.get("conditions", {}) |
|
|
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
model = self.table_schemas.get(table_name, {}).get("model") |
|
if not model: |
|
return DatabaseResult( |
|
success=False, |
|
error=f"不支援的資料表: {table_name}" |
|
) |
|
|
|
|
|
query = db.query(model) |
|
|
|
|
|
for field, value in conditions.items(): |
|
if hasattr(model, field): |
|
column = getattr(model, field) |
|
query = query.filter(column == value) |
|
|
|
|
|
records_to_delete = query.all() |
|
|
|
|
|
result_data = [] |
|
for record in records_to_delete: |
|
item = {} |
|
for column in record.__table__.columns: |
|
value = getattr(record, column.name) |
|
if hasattr(value, 'isoformat'): |
|
item[column.name] = value.isoformat() |
|
elif isinstance(value, (int, float, str, bool)) or value is None: |
|
item[column.name] = value |
|
else: |
|
item[column.name] = str(value) |
|
result_data.append(item) |
|
|
|
|
|
deleted_count = query.delete() |
|
db.commit() |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=result_data, |
|
count=deleted_count |
|
) |
|
|
|
except Exception as e: |
|
if db: |
|
db.rollback() |
|
logger.error(f"刪除查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"刪除失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def _execute_analytics(self, parameters: Dict[str, Any]) -> DatabaseResult: |
|
"""執行分析查詢""" |
|
table_name = parameters.get("table", "users") |
|
|
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
model = self.table_schemas.get(table_name, {}).get("model") |
|
if not model: |
|
return DatabaseResult( |
|
success=False, |
|
error=f"不支援的資料表: {table_name}" |
|
) |
|
|
|
|
|
total_count = db.query(model).count() |
|
|
|
|
|
sample_records = db.query(model).limit(5).all() |
|
|
|
|
|
sample_data = [] |
|
for record in sample_records: |
|
item = {} |
|
for column in record.__table__.columns: |
|
value = getattr(record, column.name) |
|
if hasattr(value, 'isoformat'): |
|
item[column.name] = value.isoformat() |
|
elif isinstance(value, (int, float, str, bool)) or value is None: |
|
item[column.name] = value |
|
else: |
|
item[column.name] = str(value) |
|
sample_data.append(item) |
|
|
|
analytics_data = [{ |
|
"table": table_name, |
|
"count": total_count, |
|
"data_sample": sample_data |
|
}] |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=analytics_data, |
|
count=1 |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"分析查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"分析失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def save_message(self, user_id: str, message: str, message_type: str = "text") -> bool: |
|
"""儲存訊息記錄 - 暫時停用,僅記錄到日誌""" |
|
try: |
|
|
|
logger.info(f"訊息記錄 - 用戶: {user_id[:10]}..., 類型: {message_type}, 內容: {message[:50]}...") |
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"訊息記錄錯誤: {str(e)}") |
|
return False |
|
|
|
def get_user_profile(self, user_id: str) -> Optional[Dict[str, Any]]: |
|
"""取得用戶資料""" |
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
user = db.query(User).filter(User.user_id == user_id).first() |
|
if not user: |
|
return None |
|
|
|
|
|
user_data = {} |
|
for column in user.__table__.columns: |
|
value = getattr(user, column.name) |
|
if hasattr(value, 'isoformat'): |
|
user_data[column.name] = value.isoformat() |
|
elif isinstance(value, (int, float, str, bool)) or value is None: |
|
user_data[column.name] = value |
|
else: |
|
user_data[column.name] = str(value) |
|
|
|
return user_data |
|
|
|
except Exception as e: |
|
logger.error(f"取得用戶資料錯誤: {str(e)}") |
|
return None |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def create_user_profile(self, user_data: Dict[str, Any]) -> bool: |
|
"""建立用戶資料""" |
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
new_user = User(**user_data) |
|
db.add(new_user) |
|
db.commit() |
|
return True |
|
|
|
except Exception as e: |
|
if db: |
|
db.rollback() |
|
logger.error(f"建立用戶資料錯誤: {str(e)}") |
|
return False |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
|
|
|
|
def search_products(self, query_text: str = None, category: str = None, |
|
warehouse: str = None, limit: int = 10) -> DatabaseResult: |
|
"""商品查詢 - 支援自然語言查詢""" |
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
query = db.query(Product) |
|
|
|
|
|
if query_text: |
|
search_filter = or_( |
|
Product.productName.ilike(f"%{query_text}%"), |
|
Product.productCode.ilike(f"%{query_text}%") |
|
) |
|
query = query.filter(search_filter) |
|
|
|
|
|
if category: |
|
query = query.filter(Product.category.ilike(f"%{category}%")) |
|
|
|
|
|
products = query.limit(limit).all() |
|
|
|
|
|
data = [] |
|
for product in products: |
|
product_data = { |
|
"id": product.id, |
|
"product_code": product.productCode, |
|
"product_name": product.productName, |
|
"unit": product.unit, |
|
"warehouse": product.warehouse, |
|
"stock": product.stock, |
|
"category_name": product.category.name if product.category else None, |
|
"created_at": product.createdAt.isoformat() if product.createdAt else None |
|
} |
|
data.append(product_data) |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=data, |
|
count=len(data) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"商品查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"商品查詢失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def check_inventory(self, product_name: str = None, category: str = None) -> DatabaseResult: |
|
"""庫存查詢""" |
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
query = db.query(Product) |
|
|
|
|
|
if product_name: |
|
query = query.filter(Product.productName.ilike(f"%{product_name}%")) |
|
|
|
|
|
if category: |
|
query = query.join(Product.category).filter(Category.name.ilike(f"%{category}%")) |
|
|
|
products = query.all() |
|
|
|
|
|
inventory_data = [] |
|
for product in products: |
|
stock_info = { |
|
"product_name": product.productName, |
|
"product_code": product.productCode, |
|
"category": product.category.name if product.category else None, |
|
"current_stock": product.stock, |
|
"unit": product.unit, |
|
"warehouse": product.warehouse, |
|
"last_updated": product.updatedAt.isoformat() if product.updatedAt else None |
|
} |
|
inventory_data.append(stock_info) |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=inventory_data, |
|
count=len(inventory_data) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"庫存查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"庫存查詢失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def search_orders(self, user_id: str = None, status: str = None, order_number: str = None, |
|
customer_id: str = None, date_from: str = None, date_to: str = None, |
|
limit: int = 10) -> DatabaseResult: |
|
"""增強的訂單查詢功能""" |
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
query = db.query(SalesOrder).options( |
|
joinedload(SalesOrder.customer), |
|
joinedload(SalesOrder.salesperson) |
|
) |
|
|
|
|
|
if order_number: |
|
if not order_number.startswith('SO-'): |
|
|
|
order_number = f"SO-{order_number}" |
|
query = query.filter(SalesOrder.so_number.ilike(f"%{order_number}%")) |
|
|
|
|
|
if user_id: |
|
query = query.filter(SalesOrder.salesperson_id == user_id) |
|
|
|
|
|
if customer_id: |
|
query = query.filter(SalesOrder.customer_id == customer_id) |
|
|
|
|
|
if status: |
|
status_mapping = { |
|
'已交付': 'DELIVERED', |
|
'已出貨': 'SHIPPED', |
|
'處理中': 'PROCESSING', |
|
'已取消': 'CANCELLED', |
|
'待處理': 'PENDING' |
|
} |
|
|
|
english_status = status_mapping.get(status, status) |
|
query = query.filter(SalesOrder.status.ilike(f"%{english_status}%")) |
|
|
|
|
|
if date_from: |
|
query = query.filter(SalesOrder.sales_date >= date_from) |
|
if date_to: |
|
query = query.filter(SalesOrder.sales_date <= date_to) |
|
|
|
|
|
query = query.order_by(SalesOrder.sales_date.desc()) |
|
|
|
orders = query.limit(limit).all() |
|
|
|
|
|
order_data = [] |
|
for order in orders: |
|
|
|
status_display = self._get_order_status_display(order.status) |
|
|
|
order_info = { |
|
"order_id": order.so_number, |
|
"sales_date": order.sales_date.isoformat() if order.sales_date else None, |
|
"customer_id": order.customer_id, |
|
"customer_name": order.customer.customerName if order.customer else "未知客戶", |
|
"salesperson_id": order.salesperson_id, |
|
"salesperson_name": order.salesperson.name if order.salesperson else "未指定", |
|
"status": order.status, |
|
"status_display": status_display, |
|
"payment_term": order.payment_term, |
|
"subtotal": float(order.subtotal) if order.subtotal else 0.0, |
|
"tax_amount": float(order.tax_amount) if order.tax_amount else 0.0, |
|
"discount_amount": float(order.discount_amount) if order.discount_amount else 0.0, |
|
"total_amount": float(order.total_amount) if order.total_amount else 0.0, |
|
"notes": order.notes or "", |
|
"created_at": order.created_at.isoformat() if order.created_at else None, |
|
"updated_at": order.updated_at.isoformat() if order.updated_at else None |
|
} |
|
order_data.append(order_info) |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=order_data, |
|
count=len(order_data) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"訂單查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"訂單查詢失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def _get_order_status_display(self, status) -> str: |
|
"""將訂單狀態轉換為中文顯示""" |
|
status_mapping = { |
|
'DELIVERED': '已交付', |
|
'SHIPPED': '已出貨', |
|
'PROCESSING': '處理中', |
|
'CANCELLED': '已取消', |
|
'PENDING': '待處理', |
|
'CONFIRMED': '已確認', |
|
'COMPLETED': '已完成' |
|
} |
|
|
|
if hasattr(status, 'value'): |
|
return status_mapping.get(status.value, str(status.value)) |
|
else: |
|
return status_mapping.get(str(status), str(status)) |
|
|
|
def get_low_stock_products(self, threshold: int = 10) -> DatabaseResult: |
|
"""低庫存商品查詢""" |
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
|
|
query = db.query(Product) |
|
if hasattr(Product, 'stock'): |
|
query = query.filter(Product.stock <= threshold) |
|
|
|
low_stock_products = query.all() |
|
|
|
data = [] |
|
for product in low_stock_products: |
|
product_data = { |
|
"product_name": product.productName, |
|
"product_code": product.productCode, |
|
"current_stock": product.stock, |
|
"unit": product.unit, |
|
"warehouse": product.warehouse, |
|
"category": product.category.name if product.category else None, |
|
"status": "庫存不足" |
|
} |
|
data.append(product_data) |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=data, |
|
count=len(data) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"低庫存查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"低庫存查詢失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def get_business_summary(self) -> DatabaseResult: |
|
"""業務摘要統計""" |
|
db = None |
|
try: |
|
db = get_database_session() |
|
|
|
|
|
total_products = db.query(Product).count() |
|
total_sales_orders = db.query(SalesOrder).count() |
|
total_purchase_orders = db.query(PurchaseOrder).count() |
|
total_users = db.query(User).count() |
|
|
|
|
|
low_stock_count = db.query(Product).filter(Product.stock <= 10).count() |
|
|
|
|
|
summary_data = [{ |
|
"total_products": total_products, |
|
"total_sales_orders": total_sales_orders, |
|
"total_purchase_orders": total_purchase_orders, |
|
"total_users": total_users, |
|
"low_stock_items": low_stock_count, |
|
"report_date": datetime.now().isoformat() |
|
}] |
|
|
|
return DatabaseResult( |
|
success=True, |
|
data=summary_data, |
|
count=1 |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"業務摘要查詢錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"業務摘要查詢失敗: {str(e)}" |
|
) |
|
finally: |
|
if db: |
|
close_database_session(db) |
|
|
|
def process_natural_language_query(self, user_message: str, user_id: str = None) -> DatabaseResult: |
|
"""處理自然語言查詢的統一入口""" |
|
try: |
|
|
|
message_lower = user_message.lower() |
|
|
|
|
|
if any(keyword in message_lower for keyword in ['商品', '產品', '貨品', '物品']): |
|
if any(keyword in message_lower for keyword in ['庫存', '存貨', '剩餘']): |
|
|
|
product_name = self._extract_product_name(user_message) |
|
return self.check_inventory(product_name=product_name) |
|
else: |
|
|
|
product_name = self._extract_product_name(user_message) |
|
return self.search_products(query_text=product_name) |
|
|
|
|
|
elif any(keyword in message_lower for keyword in ['訂單', '訂購', '購買', 'so-']): |
|
|
|
order_number = self._extract_order_number(user_message) |
|
|
|
status = self._extract_order_status(user_message) |
|
return self.search_orders(user_id=user_id, order_number=order_number, status=status) |
|
|
|
|
|
elif any(keyword in message_lower for keyword in ['低庫存', '缺貨', '不足']): |
|
return self.get_low_stock_products() |
|
|
|
|
|
elif any(keyword in message_lower for keyword in ['統計', '摘要', '總計', '報表']): |
|
return self.get_business_summary() |
|
|
|
else: |
|
|
|
return self.search_products(query_text=user_message) |
|
|
|
except Exception as e: |
|
logger.error(f"自然語言查詢處理錯誤: {str(e)}") |
|
return DatabaseResult( |
|
success=False, |
|
error=f"查詢處理失敗: {str(e)}" |
|
) |
|
|
|
def _extract_product_name(self, message: str) -> str: |
|
"""從訊息中提取商品名稱""" |
|
|
|
|
|
keywords_to_remove = ['查詢', '搜尋', '找', '商品', '產品', '庫存', '有沒有', '請問'] |
|
|
|
cleaned_message = message |
|
for keyword in keywords_to_remove: |
|
cleaned_message = cleaned_message.replace(keyword, '') |
|
|
|
return cleaned_message.strip() |
|
|
|
def _extract_order_number(self, message: str) -> str: |
|
"""從訊息中提取訂單編號""" |
|
import re |
|
|
|
|
|
so_pattern = r'SO-\d{8}-\d{3}' |
|
match = re.search(so_pattern, message.upper()) |
|
if match: |
|
return match.group() |
|
|
|
|
|
date_pattern = r'\d{8}-\d{3}' |
|
match = re.search(date_pattern, message) |
|
if match: |
|
return f"SO-{match.group()}" |
|
|
|
|
|
number_pattern = r'\d{6,}' |
|
match = re.search(number_pattern, message) |
|
if match: |
|
return match.group() |
|
|
|
return None |
|
|
|
def _extract_order_status(self, message: str) -> str: |
|
"""從訊息中提取訂單狀態""" |
|
message_lower = message.lower() |
|
|
|
|
|
status_keywords = { |
|
'已交付': 'DELIVERED', |
|
'已出貨': 'SHIPPED', |
|
'出貨': 'SHIPPED', |
|
'交付': 'DELIVERED', |
|
'處理中': 'PROCESSING', |
|
'已取消': 'CANCELLED', |
|
'取消': 'CANCELLED', |
|
'待處理': 'PENDING', |
|
'已確認': 'CONFIRMED', |
|
'已完成': 'COMPLETED' |
|
} |
|
|
|
for chinese, english in status_keywords.items(): |
|
if chinese in message_lower: |
|
return english |
|
|
|
|
|
english_keywords = ['delivered', 'shipped', 'processing', 'cancelled', 'pending', 'confirmed', 'completed'] |
|
for keyword in english_keywords: |
|
if keyword in message_lower: |
|
return keyword.upper() |
|
|
|
return None |