my_model / api_server.py
bihan3876's picture
Add Korean secondhand market AI classification models with Git LFS
e310878
# ์—ญํ• : ํ›ˆ๋ จ๋œ ๋ชจ๋ธ์„ ์‹ค์ œ ์„œ๋น„์Šค๋กœ ์ œ๊ณตํ•˜๋Š” API ์„œ๋ฒ„
# POST /predict - ์ด๋ฏธ์ง€ ๋ถ„๋ฅ˜
# GET /health - ์„œ๋ฒ„ ์ƒํƒœ ํ™•์ธ
# 70.61% ์„ฑ๋Šฅ์„ ์œ„ํ•ด์„œ๋Š” ์•™์ƒ๋ธ”์ด ํ•„์ˆ˜
# Python API ์„œ๋ฒ„๋กœ๋งŒ ๊ฐ€๋Šฅ
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
from PIL import Image
import io
import os
from datetime import datetime
import json
from typing import List, Dict, Any
import logging
# ๋กœ๊น… ์„ค์ •
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ์„ค์ •
MODEL_PATH = "models/mega_ensemble_80" # ๋กœ์ปฌ ๋ชจ๋ธ ๊ฒฝ๋กœ
HUGGINGFACE_REPO = "bihan3876/my_model" # Hugging Face ์ €์žฅ์†Œ
CLASS_NAMES = ["๊ฐ€๊ตฌ", "์ƒํ™œ์šฉํ’ˆ", "์ „์ž๊ธฐ๊ธฐ_๋„์„œ", "์ทจ๋ฏธ_๊ฒŒ์ž„", "ํŒจ์…˜_๋ทฐํ‹ฐ"]
IMG_SIZE = (224, 224)
# Hugging Face Hub ์‚ฌ์šฉ ์—ฌ๋ถ€
USE_HUGGINGFACE = os.getenv("USE_HUGGINGFACE", "false").lower() == "true"
# ๊ฒฝ๋Ÿ‰ ๋ชจ๋“œ ์‚ฌ์šฉ ์—ฌ๋ถ€ (TensorFlow Lite ๋ชจ๋ธ ์‚ฌ์šฉ)
USE_LIGHTWEIGHT = os.getenv("USE_LIGHTWEIGHT", "false").lower() == "true"
LIGHTWEIGHT_MODEL_PATH = "models/serving/model_optimized.tflite"
# FastAPI ์•ฑ ์ƒ์„ฑ
app = FastAPI(
title="AI ์ƒํ’ˆ ๋ถ„๋ฅ˜ API",
description="70.61% ์ •ํ™•๋„ ๋‹ฌ์„ฑํ•œ AI ๋ชจ๋ธ๋กœ ์ค‘๊ณ ๊ฑฐ๋ž˜ ์ƒํ’ˆ ์ž๋™ ๋ถ„๋ฅ˜",
version="1.0.0"
)
# CORS ์„ค์ •
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ์ „์—ญ ๋ณ€์ˆ˜
models = {}
model_info = {}
def download_from_huggingface():
"""Hugging Face Hub์—์„œ ๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ"""
try:
from huggingface_hub import snapshot_download
logger.info(f"Hugging Face์—์„œ ๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ ์ค‘: {HUGGINGFACE_REPO}")
# ์•™์ƒ๋ธ” ๋ชจ๋ธ๋งŒ ๋‹ค์šด๋กœ๋“œ (349MB)
local_dir = snapshot_download(
repo_id=HUGGINGFACE_REPO,
cache_dir="./cache",
local_dir="./models_hf"
)
logger.info(f"๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ ์™„๋ฃŒ: {local_dir}")
return local_dir
except ImportError:
logger.error("huggingface_hub ํŒจํ‚ค์ง€๊ฐ€ ์„ค์น˜๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค.")
logger.error("์„ค์น˜: pip install huggingface_hub")
return None
except Exception as e:
logger.error(f"Hugging Face ๋‹ค์šด๋กœ๋“œ ์‹คํŒจ: {e}")
return None
def load_lightweight_model():
"""๊ฒฝ๋Ÿ‰ TensorFlow Lite ๋ชจ๋ธ ๋กœ๋“œ"""
global models, model_info
try:
import tensorflow as tf
# TensorFlow Lite ๋ชจ๋ธ ๊ฒฝ๋กœ
if USE_HUGGINGFACE:
# Hugging Face์—์„œ ๊ฒฝ๋Ÿ‰ ๋ชจ๋ธ๋งŒ ๋‹ค์šด๋กœ๋“œ (24MB)
local_dir = snapshot_download(
repo_id=HUGGINGFACE_REPO,
cache_dir="./cache",
local_dir="./models_hf",
allow_patterns=["models/serving/model_optimized.tflite"]
)
tflite_path = os.path.join(local_dir, "models", "serving", "model_optimized.tflite")
else:
tflite_path = LIGHTWEIGHT_MODEL_PATH
if not os.path.exists(tflite_path):
raise FileNotFoundError(f"TensorFlow Lite ๋ชจ๋ธ์ด ์—†์Šต๋‹ˆ๋‹ค: {tflite_path}")
# TensorFlow Lite ์ธํ„ฐํ”„๋ฆฌํ„ฐ ๋กœ๋“œ
interpreter = tf.lite.Interpreter(model_path=tflite_path)
interpreter.allocate_tensors()
models["tflite"] = interpreter
model_info = {
"total_models": 1,
"model_names": ["TensorFlow_Lite"],
"accuracy": 62.0, # ์ถ”์ • ์„ฑ๋Šฅ
"classes": CLASS_NAMES,
"input_shape": [224, 224, 3],
"ensemble_method": "single_model"
}
logger.info(f"๐Ÿš€ ๊ฒฝ๋Ÿ‰ ๋ชจ๋ธ ๋กœ๋”ฉ ์™„๋ฃŒ: TensorFlow Lite")
return True
except Exception as e:
logger.error(f"โŒ ๊ฒฝ๋Ÿ‰ ๋ชจ๋ธ ๋กœ๋”ฉ ์‹คํŒจ: {e}")
return False
def load_ensemble_models():
"""์•™์ƒ๋ธ” ๋ชจ๋ธ๋“ค ๋กœ๋“œ"""
global models, model_info
try:
# ๊ฒฝ๋Ÿ‰ ๋ชจ๋“œ ์‚ฌ์šฉ ์‹œ
if USE_LIGHTWEIGHT:
return load_lightweight_model()
# Hugging Face ์‚ฌ์šฉ ์‹œ ๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ
if USE_HUGGINGFACE:
hf_path = download_from_huggingface()
if hf_path:
model_path = os.path.join(hf_path, "models", "ensemble")
else:
logger.warning("Hugging Face ๋‹ค์šด๋กœ๋“œ ์‹คํŒจ, ๋กœ์ปฌ ๋ชจ๋ธ ์‚ฌ์šฉ")
model_path = MODEL_PATH
else:
model_path = MODEL_PATH
logger.info(f"๋ชจ๋ธ ๋กœ๋”ฉ ์‹œ์ž‘: {model_path}")
if not os.path.exists(model_path):
raise FileNotFoundError(f"๋ชจ๋ธ ๊ฒฝ๋กœ๊ฐ€ ์กด์žฌํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค: {model_path}")
# ๋ชจ๋ธ ํŒŒ์ผ๋“ค ์ฐพ๊ธฐ
model_files = [f for f in os.listdir(model_path) if f.endswith('.keras')]
if not model_files:
raise FileNotFoundError("๋ชจ๋ธ ํŒŒ์ผ์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค")
# ๊ฐ ๋ชจ๋ธ ๋กœ๋“œ
for model_file in model_files:
model_name = model_file.replace('.keras', '').replace('_best', '')
model_path = os.path.join(model_path, model_file)
try:
model = load_model(model_path)
models[model_name] = model
logger.info(f"โœ… {model_name} ๋กœ๋“œ ์™„๋ฃŒ")
except Exception as e:
logger.warning(f"โš ๏ธ {model_name} ๋กœ๋“œ ์‹คํŒจ: {e}")
if not models:
raise RuntimeError("๋กœ๋“œ๋œ ๋ชจ๋ธ์ด ์—†์Šต๋‹ˆ๋‹ค")
# ๋ชจ๋ธ ์ •๋ณด ์„ค์ •
model_info = {
"total_models": len(models),
"model_names": list(models.keys()),
"accuracy": 70.61,
"classes": CLASS_NAMES,
"input_shape": [224, 224, 3],
"ensemble_method": "soft_voting"
}
logger.info(f"๐ŸŽฏ ์•™์ƒ๋ธ” ๋ชจ๋ธ ๋กœ๋”ฉ ์™„๋ฃŒ: {len(models)}๊ฐœ ๋ชจ๋ธ")
return True
except Exception as e:
logger.error(f"โŒ ๋ชจ๋ธ ๋กœ๋”ฉ ์‹คํŒจ: {e}")
return False
def preprocess_image(image_bytes: bytes) -> np.ndarray:
"""์ด๋ฏธ์ง€ ์ „์ฒ˜๋ฆฌ"""
try:
# PIL๋กœ ์ด๋ฏธ์ง€ ์—ด๊ธฐ
image = Image.open(io.BytesIO(image_bytes))
# RGB ๋ณ€ํ™˜
if image.mode != 'RGB':
image = image.convert('RGB')
# ํฌ๊ธฐ ์กฐ์ •
image = image.resize(IMG_SIZE)
# numpy ๋ฐฐ์—ด๋กœ ๋ณ€ํ™˜ ๋ฐ ์ •๊ทœํ™”
image_array = np.array(image) / 255.0
# ๋ฐฐ์น˜ ์ฐจ์› ์ถ”๊ฐ€
image_array = np.expand_dims(image_array, axis=0)
return image_array
except Exception as e:
raise HTTPException(status_code=400, detail=f"์ด๋ฏธ์ง€ ์ „์ฒ˜๋ฆฌ ์‹คํŒจ: {str(e)}")
def ensemble_predict(image_array: np.ndarray) -> Dict[str, Any]:
"""์•™์ƒ๋ธ” ์˜ˆ์ธก"""
try:
predictions = []
# ๊ฐ ๋ชจ๋ธ๋กœ ์˜ˆ์ธก
for model_name, model in models.items():
pred = model.predict(image_array, verbose=0)
predictions.append(pred[0])
# ์†Œํ”„ํŠธ ๋ณดํŒ… (ํ‰๊ท )
ensemble_pred = np.mean(predictions, axis=0)
# ๊ฒฐ๊ณผ ์ฒ˜๋ฆฌ
predicted_class_idx = np.argmax(ensemble_pred)
predicted_class = CLASS_NAMES[predicted_class_idx]
confidence = float(ensemble_pred[predicted_class_idx])
# ๊ฐ ํด๋ž˜์Šค๋ณ„ ํ™•๋ฅ 
probabilities = {
CLASS_NAMES[i]: float(ensemble_pred[i])
for i in range(len(CLASS_NAMES))
}
return {
"predicted_class": predicted_class,
"confidence": confidence,
"probabilities": probabilities,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"์˜ˆ์ธก ์‹คํŒจ: {str(e)}")
@app.on_event("startup")
async def startup_event():
"""์„œ๋ฒ„ ์‹œ์ž‘ ์‹œ ๋ชจ๋ธ ๋กœ๋“œ"""
logger.info("๐Ÿš€ AI ๋ถ„๋ฅ˜ ์„œ๋ฒ„ ์‹œ์ž‘ ์ค‘...")
if not load_ensemble_models():
logger.error("โŒ ๋ชจ๋ธ ๋กœ๋”ฉ ์‹คํŒจ๋กœ ์„œ๋ฒ„ ์‹œ์ž‘ ๋ถˆ๊ฐ€")
raise RuntimeError("๋ชจ๋ธ ๋กœ๋”ฉ ์‹คํŒจ")
logger.info("โœ… AI ๋ถ„๋ฅ˜ ์„œ๋ฒ„ ์ค€๋น„ ์™„๋ฃŒ!")
@app.get("/")
async def root():
"""๋ฃจํŠธ ์—”๋“œํฌ์ธํŠธ"""
return {
"message": "AI ์ƒํ’ˆ ๋ถ„๋ฅ˜ API",
"version": "1.0.0",
"accuracy": "70.61%",
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""ํ—ฌ์Šค ์ฒดํฌ"""
return {
"status": "healthy",
"models_loaded": len(models),
"timestamp": datetime.now().isoformat()
}
@app.get("/model-info")
async def get_model_info():
"""๋ชจ๋ธ ์ •๋ณด ์กฐํšŒ"""
return {
"success": True,
"data": model_info
}
@app.get("/classes")
async def get_classes():
"""์ง€์› ํด๋ž˜์Šค ๋ชฉ๋ก"""
return {
"success": True,
"classes": CLASS_NAMES,
"total_classes": len(CLASS_NAMES)
}
@app.post("/predict")
async def predict_image(
file: UploadFile = File(...),
return_probabilities: bool = True
):
"""๋‹จ์ผ ์ด๋ฏธ์ง€ ๋ถ„๋ฅ˜"""
try:
# ํŒŒ์ผ ๊ฒ€์ฆ
if not file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="์ด๋ฏธ์ง€ ํŒŒ์ผ๋งŒ ์—…๋กœ๋“œ ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค")
# ์ด๋ฏธ์ง€ ์ฝ๊ธฐ
image_bytes = await file.read()
# ์ „์ฒ˜๋ฆฌ
image_array = preprocess_image(image_bytes)
# ์˜ˆ์ธก
result = ensemble_predict(image_array)
# ์‘๋‹ต ๊ตฌ์„ฑ
response_data = {
"predicted_class": result["predicted_class"],
"confidence": result["confidence"],
"timestamp": result["timestamp"]
}
if return_probabilities:
response_data["probabilities"] = result["probabilities"]
return {
"success": True,
"data": response_data,
"message": "๋ถ„๋ฅ˜ ์™„๋ฃŒ"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"์˜ˆ์ธก ์˜ค๋ฅ˜: {e}")
return JSONResponse(
status_code=500,
content={
"success": False,
"message": f"์˜ˆ์ธก ์‹คํŒจ: {str(e)}"
}
)
@app.post("/batch-predict")
async def batch_predict_images(
files: List[UploadFile] = File(...),
return_probabilities: bool = True
):
"""๋ฐฐ์น˜ ์ด๋ฏธ์ง€ ๋ถ„๋ฅ˜"""
try:
results = []
for i, file in enumerate(files):
try:
if not file.content_type.startswith('image/'):
results.append({
"filename": file.filename,
"success": False,
"message": "์ด๋ฏธ์ง€ ํŒŒ์ผ์ด ์•„๋‹™๋‹ˆ๋‹ค"
})
continue
# ์ด๋ฏธ์ง€ ์ฒ˜๋ฆฌ
image_bytes = await file.read()
image_array = preprocess_image(image_bytes)
result = ensemble_predict(image_array)
# ๊ฒฐ๊ณผ ์ถ”๊ฐ€
batch_result = {
"filename": file.filename,
"success": True,
"predicted_class": result["predicted_class"],
"confidence": result["confidence"]
}
if return_probabilities:
batch_result["probabilities"] = result["probabilities"]
results.append(batch_result)
except Exception as e:
results.append({
"filename": file.filename,
"success": False,
"message": str(e)
})
return {
"success": True,
"total_files": len(files),
"results": results,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"๋ฐฐ์น˜ ์˜ˆ์ธก ์˜ค๋ฅ˜: {e}")
return JSONResponse(
status_code=500,
content={
"success": False,
"message": f"๋ฐฐ์น˜ ์˜ˆ์ธก ์‹คํŒจ: {str(e)}"
}
)
if __name__ == "__main__":
print("๐Ÿš€ AI ์ƒํ’ˆ ๋ถ„๋ฅ˜ ์„œ๋ฒ„ ์‹œ์ž‘!")
print("๐Ÿ“Š ๋ชจ๋ธ: 70.61% ๋ฉ”๊ฐ€ ์•™์ƒ๋ธ”")
print("๐ŸŒ API ๋ฌธ์„œ: http://localhost:8000/docs")
print("๐Ÿ” ํ—ฌ์Šค ์ฒดํฌ: http://localhost:8000/health")
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)