# # from fastapi import FastAPI, Request # # from pydantic import BaseModel # # from transformers import AutoModel, AutoTokenizer # # import torch # # app = FastAPI() # # model_id = "Qwen/Qwen3-Embedding-0.6B" # # # Load tokenizer # # tokenizer = AutoTokenizer.from_pretrained(model_id) # # # Load model with GPU if available, else CPU # # use_gpu = torch.cuda.is_available() # # if use_gpu: # # print("CUDA is available, loading model with 4-bit quantization on GPU.") # # model = AutoModel.from_pretrained( # # model_id, # # device_map="auto", # # torch_dtype=torch.float16, # # load_in_4bit=True # # ) # # else: # # print("CUDA not available, loading model without 4-bit quantization on CPU.") # # model = AutoModel.from_pretrained( # # model_id, # # device_map="cpu", # # torch_dtype=torch.float32 # # ) # # model.eval() # # class TextInput(BaseModel): # # text: str # # @app.post("/embed") # # async def embed_text(input: TextInput): # # inputs = tokenizer(input.text, return_tensors="pt", truncation=True, max_length=512) # # # Move input tensors to same device as model # # device = next(model.parameters()).device # # inputs = {k: v.to(device) for k, v in inputs.items()} # # with torch.no_grad(): # # outputs = model(**inputs) # # embeddings = outputs.last_hidden_state.mean(dim=1) # Mean pooling # # # Convert to list for JSON serialization # # return {"embedding": embeddings[0].cpu().tolist()} # from fastapi import FastAPI # from pydantic import BaseModel # from typing import List # from transformers import AutoTokenizer, AutoModel # import torch # import torch.nn.functional as F # app = FastAPI() # # Model config # MODEL_ID = "Qwen/Qwen3-Embedding-0.6B" # USE_GPU = torch.cuda.is_available() # # Load tokenizer # tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, padding_side='left') # # Load model with appropriate settings # if USE_GPU: # print("๐Ÿ”‹ Loading model on GPU with 4-bit quantization...") # model = AutoModel.from_pretrained( # MODEL_ID, # device_map="auto", # torch_dtype=torch.float16, # load_in_4bit=True # ) # else: # print("๐Ÿง  Loading model on CPU...") # model = AutoModel.from_pretrained( # MODEL_ID, # device_map="cpu", # torch_dtype=torch.float32 # ) # model.eval() # device = next(model.parameters()).device # # Input schema # class EmbedRequest(BaseModel): # texts: List[str] # # Output schema # class EmbedResponse(BaseModel): # embeddings: List[List[float]] # # Masked mean pooling (ignores padded tokens) # def masked_mean_pooling(last_hidden_state, attention_mask): # mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() # masked_embeddings = last_hidden_state * mask # summed = masked_embeddings.sum(dim=1) # counts = mask.sum(dim=1) # return summed / counts.clamp(min=1e-9) # @app.post("/embed", response_model=EmbedResponse) # async def embed_texts(request: EmbedRequest): # # Tokenize input texts # inputs = tokenizer( # request.texts, # return_tensors="pt", # padding=True, # truncation=True, # max_length=32768 # Qwen supports long sequences # ) # inputs = {k: v.to(device) for k, v in inputs.items()} # # Get embeddings # with torch.no_grad(): # outputs = model(**inputs) # pooled = masked_mean_pooling(outputs.last_hidden_state, inputs['attention_mask']) # normalized = F.normalize(pooled, p=2, dim=1) # return {"embeddings": normalized.cpu().tolist()} from fastapi import FastAPI, Request from pydantic import BaseModel from typing import List from transformers import AutoTokenizer, AutoModel import torch import torch.nn.functional as F import logging import time # ------------------- Logging Setup -------------------- logging.basicConfig( format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO ) # ------------------- FastAPI Setup -------------------- app = FastAPI() # ------------------- Model Config -------------------- MODEL_ID = "Qwen/Qwen3-Embedding-0.6B" USE_GPU = torch.cuda.is_available() tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, padding_side='left') if USE_GPU: logging.info("๐Ÿ”‹ Loading model on GPU with 4-bit quantization...") model = AutoModel.from_pretrained( MODEL_ID, device_map="auto", torch_dtype=torch.float16, load_in_4bit=True ) else: logging.info("๐Ÿง  Loading model on CPU...") model = AutoModel.from_pretrained( MODEL_ID, device_map="cpu", torch_dtype=torch.float32 ) model.eval() device = next(model.parameters()).device # ------------------- Data Schemas -------------------- class EmbedRequest(BaseModel): texts: List[str] class EmbedResponse(BaseModel): embeddings: List[List[float]] # ------------------- Pooling Function -------------------- def masked_mean_pooling(last_hidden_state, attention_mask): mask = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float() masked_embeddings = last_hidden_state * mask summed = masked_embeddings.sum(dim=1) counts = mask.sum(dim=1) return summed / counts.clamp(min=1e-9) # ------------------- API Endpoint -------------------- @app.post("/embed", response_model=EmbedResponse) async def embed_texts(request: EmbedRequest): overall_start = time.perf_counter() logging.info(f"๐Ÿ“ฉ Received request with {len(request.texts)} texts.") # Tokenization t0 = time.perf_counter() inputs = tokenizer( request.texts, return_tensors="pt", padding=True, truncation=True, max_length=32768 ) inputs = {k: v.to(device) for k, v in inputs.items()} t1 = time.perf_counter() logging.info(f"๐Ÿงพ Tokenization took {t1 - t0:.3f} seconds.") # Model inference t2 = time.perf_counter() with torch.no_grad(): outputs = model(**inputs) t3 = time.perf_counter() logging.info(f"๐Ÿง  Model inference took {t3 - t2:.3f} seconds.") # Pooling t4 = time.perf_counter() pooled = masked_mean_pooling(outputs.last_hidden_state, inputs['attention_mask']) normalized = F.normalize(pooled, p=2, dim=1) t5 = time.perf_counter() logging.info(f"๐ŸŒ€ Pooling & normalization took {t5 - t4:.3f} seconds.") # Total overall_end = time.perf_counter() logging.info(f"โœ… Total processing time: {overall_end - overall_start:.3f} seconds.") return {"embeddings": normalized.cpu().tolist()}