embiddings_qwen / app.py
novamysticX's picture
Update app.py
4467e27 verified
# # from fastapi import FastAPI, Request
# # from pydantic import BaseModel
# # from transformers import AutoModel, AutoTokenizer
# # import torch
# # app = FastAPI()
# # model_id = "Qwen/Qwen3-Embedding-0.6B"
# # # Load tokenizer
# # tokenizer = AutoTokenizer.from_pretrained(model_id)
# # # Load model with GPU if available, else CPU
# # use_gpu = torch.cuda.is_available()
# # if use_gpu:
# # print("CUDA is available, loading model with 4-bit quantization on GPU.")
# # model = AutoModel.from_pretrained(
# # model_id,
# # device_map="auto",
# # torch_dtype=torch.float16,
# # load_in_4bit=True
# # )
# # else:
# # print("CUDA not available, loading model without 4-bit quantization on CPU.")
# # model = AutoModel.from_pretrained(
# # model_id,
# # device_map="cpu",
# # torch_dtype=torch.float32
# # )
# # model.eval()
# # class TextInput(BaseModel):
# # text: str
# # @app.post("/embed")
# # async def embed_text(input: TextInput):
# # inputs = tokenizer(input.text, return_tensors="pt", truncation=True, max_length=512)
# # # Move input tensors to same device as model
# # device = next(model.parameters()).device
# # inputs = {k: v.to(device) for k, v in inputs.items()}
# # with torch.no_grad():
# # outputs = model(**inputs)
# # embeddings = outputs.last_hidden_state.mean(dim=1) # Mean pooling
# # # Convert to list for JSON serialization
# # return {"embedding": embeddings[0].cpu().tolist()}
# from fastapi import FastAPI
# from pydantic import BaseModel
# from typing import List
# from transformers import AutoTokenizer, AutoModel
# import torch
# import torch.nn.functional as F
# app = FastAPI()
# # Model config
# MODEL_ID = "Qwen/Qwen3-Embedding-0.6B"
# USE_GPU = torch.cuda.is_available()
# # Load tokenizer
# tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, padding_side='left')
# # Load model with appropriate settings
# if USE_GPU:
# print("🔋 Loading model on GPU with 4-bit quantization...")
# model = AutoModel.from_pretrained(
# MODEL_ID,
# device_map="auto",
# torch_dtype=torch.float16,
# load_in_4bit=True
# )
# else:
# print("🧠 Loading model on CPU...")
# model = AutoModel.from_pretrained(
# MODEL_ID,
# device_map="cpu",
# torch_dtype=torch.float32
# )
# model.eval()
# device = next(model.parameters()).device
# # Input schema
# class EmbedRequest(BaseModel):
# texts: List[str]
# # Output schema
# class EmbedResponse(BaseModel):
# embeddings: List[List[float]]
# # Masked mean pooling (ignores padded tokens)
# def masked_mean_pooling(last_hidden_state, attention_mask):
# mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
# masked_embeddings = last_hidden_state * mask
# summed = masked_embeddings.sum(dim=1)
# counts = mask.sum(dim=1)
# return summed / counts.clamp(min=1e-9)
# @app.post("/embed", response_model=EmbedResponse)
# async def embed_texts(request: EmbedRequest):
# # Tokenize input texts
# inputs = tokenizer(
# request.texts,
# return_tensors="pt",
# padding=True,
# truncation=True,
# max_length=32768 # Qwen supports long sequences
# )
# inputs = {k: v.to(device) for k, v in inputs.items()}
# # Get embeddings
# with torch.no_grad():
# outputs = model(**inputs)
# pooled = masked_mean_pooling(outputs.last_hidden_state, inputs['attention_mask'])
# normalized = F.normalize(pooled, p=2, dim=1)
# return {"embeddings": normalized.cpu().tolist()}
from fastapi import FastAPI, Request
from pydantic import BaseModel
from typing import List
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F
import logging
import time
# ------------------- Logging Setup --------------------
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s",
level=logging.INFO
)
# ------------------- FastAPI Setup --------------------
app = FastAPI()
# ------------------- Model Config --------------------
MODEL_ID = "Qwen/Qwen3-Embedding-0.6B"
USE_GPU = torch.cuda.is_available()
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, padding_side='left')
if USE_GPU:
logging.info("🔋 Loading model on GPU with 4-bit quantization...")
model = AutoModel.from_pretrained(
MODEL_ID,
device_map="auto",
torch_dtype=torch.float16,
load_in_4bit=True
)
else:
logging.info("🧠 Loading model on CPU...")
model = AutoModel.from_pretrained(
MODEL_ID,
device_map="cpu",
torch_dtype=torch.float32
)
model.eval()
device = next(model.parameters()).device
# ------------------- Data Schemas --------------------
class EmbedRequest(BaseModel):
texts: List[str]
class EmbedResponse(BaseModel):
embeddings: List[List[float]]
# ------------------- Pooling Function --------------------
def masked_mean_pooling(last_hidden_state, attention_mask):
mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
masked_embeddings = last_hidden_state * mask
summed = masked_embeddings.sum(dim=1)
counts = mask.sum(dim=1)
return summed / counts.clamp(min=1e-9)
# ------------------- API Endpoint --------------------
@app.post("/embed", response_model=EmbedResponse)
async def embed_texts(request: EmbedRequest):
overall_start = time.perf_counter()
logging.info(f"📩 Received request with {len(request.texts)} texts.")
# Tokenization
t0 = time.perf_counter()
inputs = tokenizer(
request.texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=32768
)
inputs = {k: v.to(device) for k, v in inputs.items()}
t1 = time.perf_counter()
logging.info(f"🧾 Tokenization took {t1 - t0:.3f} seconds.")
# Model inference
t2 = time.perf_counter()
with torch.no_grad():
outputs = model(**inputs)
t3 = time.perf_counter()
logging.info(f"🧠 Model inference took {t3 - t2:.3f} seconds.")
# Pooling
t4 = time.perf_counter()
pooled = masked_mean_pooling(outputs.last_hidden_state, inputs['attention_mask'])
normalized = F.normalize(pooled, p=2, dim=1)
t5 = time.perf_counter()
logging.info(f"🌀 Pooling & normalization took {t5 - t4:.3f} seconds.")
# Total
overall_end = time.perf_counter()
logging.info(f"✅ Total processing time: {overall_end - overall_start:.3f} seconds.")
return {"embeddings": normalized.cpu().tolist()}