|
import os |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from linebot import LineBotApi, WebhookHandler |
|
from linebot.exceptions import InvalidSignatureError |
|
from linebot.models import MessageEvent, TextMessage, TextSendMessage |
|
import logging |
|
from backend.services.nlp_service import NLPService |
|
from backend.services.database_service import DatabaseService |
|
from backend.services.message_router import MessageRouter |
|
from backend.config import settings |
|
from backend.database.connection import test_database_connection |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
app = FastAPI(title="LINE Bot API", version="1.0.0") |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
line_bot_api = LineBotApi(settings.LINE_CHANNEL_ACCESS_TOKEN) |
|
handler = WebhookHandler(settings.LINE_CHANNEL_SECRET) |
|
|
|
|
|
nlp_service = NLPService() |
|
db_service = DatabaseService() |
|
message_router = MessageRouter() |
|
|
|
@app.get("/") |
|
def greet_json(): |
|
return {"Hello": "LINE Bot API is running!"} |
|
|
|
@app.get("/health") |
|
def health_check(): |
|
|
|
db_status = "connected" if test_database_connection() else "disconnected" |
|
|
|
return { |
|
"status": "healthy", |
|
"message": "LINE Bot API is operational", |
|
"database": db_status, |
|
"database_url": f"postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" |
|
} |
|
|
|
@app.post("/webhook") |
|
async def callback(request: Request): |
|
"""LINE Bot Webhook 端點""" |
|
signature = request.headers.get('X-Line-Signature', '') |
|
body = await request.body() |
|
|
|
try: |
|
handler.handle(body.decode('utf-8'), signature) |
|
except InvalidSignatureError: |
|
logger.error("Invalid signature") |
|
raise HTTPException(status_code=400, detail="Invalid signature") |
|
|
|
return {"status": "ok"} |
|
|
|
@handler.add(MessageEvent, message=TextMessage) |
|
def handle_message(event): |
|
"""處理 LINE 訊息 - 使用新的路由系統""" |
|
try: |
|
user_message = event.message.text |
|
user_id = event.source.user_id |
|
|
|
logger.info(f"收到訊息: {user_message} from {user_id}") |
|
|
|
|
|
response_data = message_router.route_message(user_message, user_id) |
|
|
|
|
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text=response_data["text"]) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"處理訊息時發生錯誤: {str(e)}") |
|
line_bot_api.reply_message( |
|
event.reply_token, |
|
TextSendMessage(text="抱歉,處理您的訊息時發生錯誤,請稍後再試。") |
|
) |
|
|
|
|
|
@app.post("/chat") |
|
async def direct_chat(request: dict): |
|
"""直接聊天 API""" |
|
try: |
|
message = request.get("message", "") |
|
user_id = request.get("user_id", "api_user") |
|
|
|
if not message: |
|
return { |
|
"success": False, |
|
"error": "訊息內容不能為空" |
|
} |
|
|
|
|
|
result = message_router._handle_chat_mode(message, user_id) |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"直接聊天 API 錯誤: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": str(e), |
|
"text": "聊天服務暫時無法使用" |
|
} |
|
|
|
@app.post("/search") |
|
async def direct_search(request: dict): |
|
"""直接搜尋 API""" |
|
try: |
|
message = request.get("message", "") |
|
user_id = request.get("user_id", "api_user") |
|
|
|
if not message: |
|
return { |
|
"success": False, |
|
"error": "查詢內容不能為空" |
|
} |
|
|
|
|
|
result = message_router._handle_search_mode(message, user_id) |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"直接搜尋 API 錯誤: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": str(e), |
|
"text": "搜尋服務暫時無法使用" |
|
} |
|
|
|
@app.post("/route") |
|
async def smart_route(request: dict): |
|
"""智能路由 API""" |
|
try: |
|
message = request.get("message", "") |
|
user_id = request.get("user_id", "api_user") |
|
|
|
if not message: |
|
return { |
|
"success": False, |
|
"error": "訊息內容不能為空" |
|
} |
|
|
|
|
|
result = message_router.route_message(message, user_id) |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"智能路由 API 錯誤: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": str(e), |
|
"text": "路由服務暫時無法使用" |
|
} |
|
|
|
@app.post("/product-query") |
|
async def product_query(request: dict): |
|
"""專門的商品查詢 API - 使用 Pydantic AI""" |
|
try: |
|
message = request.get("message", "") |
|
user_id = request.get("user_id", "api_user") |
|
|
|
if not message: |
|
return { |
|
"success": False, |
|
"error": "查詢內容不能為空" |
|
} |
|
|
|
|
|
from backend.services.pydantic_ai_service import ProductQueryService |
|
product_service = ProductQueryService() |
|
|
|
if not product_service.is_available(): |
|
return { |
|
"success": False, |
|
"error": "商品查詢服務不可用,請檢查 GROQ_API_KEY 設定", |
|
"text": "商品查詢服務暫時無法使用" |
|
} |
|
|
|
result = product_service.process_product_query_sync(message, user_id) |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"商品查詢 API 錯誤: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": str(e), |
|
"text": "商品查詢服務暫時無法使用" |
|
} |
|
|
|
@app.get("/stats") |
|
async def get_stats(): |
|
"""取得路由統計""" |
|
try: |
|
stats = message_router.get_route_statistics() |
|
return { |
|
"success": True, |
|
"data": stats |
|
} |
|
except Exception as e: |
|
logger.error(f"統計 API 錯誤: {str(e)}") |
|
return { |
|
"success": False, |
|
"error": str(e) |
|
} |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |