server_test / app.py
gouravbhadraDev's picture
Update app.py
c945e1b verified
'''
from fastapi import FastAPI, Query
from pydantic import BaseModel
import cloudscraper
from bs4 import BeautifulSoup
from transformers import pipeline
import torch
import re
import os
#os.environ["HF_HOME"] = "/home/user/huggingface"
#os.environ["TRANSFORMERS_CACHE"] = "/home/user/huggingface"
app = FastAPI()
class ThreadResponse(BaseModel):
question: str
replies: list[str]
def clean_text(text: str) -> str:
text = text.strip()
text = re.sub(r"\b\d+\s*likes?,?\s*\d*\s*replies?$", "", text, flags=re.IGNORECASE).strip()
return text
@app.get("/scrape", response_model=ThreadResponse)
def scrape(url: str = Query(...)):
scraper = cloudscraper.create_scraper()
response = scraper.get(url)
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
comment_containers = soup.find_all('div', class_='post__content')
if comment_containers:
question = clean_text(comment_containers[0].get_text(strip=True, separator="\n"))
replies = [clean_text(comment.get_text(strip=True, separator="\n")) for comment in comment_containers[1:]]
return ThreadResponse(question=question, replies=replies)
return ThreadResponse(question="", replies=[])
MODEL_NAME = "microsoft/phi-2"
# Load the text-generation pipeline once at startup
text_generator = pipeline(
"text-generation",
model=MODEL_NAME,
trust_remote_code=True,
device=0 if torch.cuda.is_available() else -1, # GPU if available, else CPU
)
class PromptRequest(BaseModel):
prompt: str
@app.post("/generate")
async def generate_text(request: PromptRequest):
# The model expects a string prompt, so pass request.prompt directly
outputs = text_generator(
request.prompt,
max_new_tokens=512,
temperature=0.7,
top_p=0.9,
do_sample=True,
num_return_sequences=1,
)
generated_text = outputs[0]['generated_text']
# Optional: parse reasoning and content if your model uses special tags like </think>
if "</think>" in generated_text:
reasoning_content = generated_text.split("</think>")[0].strip()
content = generated_text.split("</think>")[1].strip()
else:
reasoning_content = ""
content = generated_text.strip()
return {
"reasoning_content": reasoning_content,
"generated_text": content
}
'''
from fastapi import FastAPI, Query, Path
from pydantic import BaseModel
import cloudscraper
from bs4 import BeautifulSoup
from transformers import AutoTokenizer, AutoModelForCausalLM, T5Tokenizer, T5ForConditionalGeneration, PegasusTokenizer, PegasusForConditionalGeneration
import torch
import re
from fastapi.responses import JSONResponse
from fastapi.requests import Request
from fastapi import status
from typing import List, Dict, Optional
from llama_cpp import Llama
app = FastAPI()
# --- Data Models ---
class ThreadResponse(BaseModel):
question: str
replies: list[str]
class PromptRequest(BaseModel):
prompt: str
class GenerateResponse(BaseModel):
reasoning_content: str
generated_text: str
# New model for summarization request
class SummarizeRequest(BaseModel):
replies: List[str]
task: str # expecting "summarisation"
# New model for summarization response
class SummarizeResponse(BaseModel):
individual_summaries: Dict[int, Dict[str, str]] # {index: {"reasoning": str, "summary": str}}
combined_reasoning: str
combined_summary: str
# --- Utility Functions ---
def clean_text(text: str) -> str:
text = text.strip()
text = re.sub(r"\b\d+\s*likes?,?\s*\d*\s*replies?$", "", text, flags=re.IGNORECASE).strip()
return text
# --- Scraping Endpoint ---
@app.get("/scrape", response_model=ThreadResponse)
def scrape(url: str):
scraper = cloudscraper.create_scraper()
response = scraper.get(url)
if response.status_code == 200:
soup = BeautifulSoup(response.content, "html.parser")
comment_containers = soup.find_all("div", class_="post__content")
if comment_containers:
question = clean_text(comment_containers[0].get_text(strip=True, separator="\n"))
replies = [clean_text(comment.get_text(strip=True, separator="\n")) for comment in comment_containers[1:]]
return ThreadResponse(question=question, replies=replies)
return ThreadResponse(question="", replies=[])
# --- Load DeepSeek-R1-Distill-Qwen-1.5B Model & Tokenizer ---
deepseek_model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B"
deepseek_tokenizer = AutoTokenizer.from_pretrained(deepseek_model_name)
deepseek_model = AutoModelForCausalLM.from_pretrained(deepseek_model_name)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
deepseek_model = deepseek_model.to(device)
# --- Load T5-Large Model & Tokenizer ---
t5_model_name = "google-t5/t5-large"
t5_tokenizer = T5Tokenizer.from_pretrained(t5_model_name)
t5_model = T5ForConditionalGeneration.from_pretrained(t5_model_name)
t5_model = t5_model.to(device)
pegasus_model_name = "google/pegasus-large"
pegasus_tokenizer = PegasusTokenizer.from_pretrained(pegasus_model_name)
pegasus_model = PegasusForConditionalGeneration.from_pretrained(pegasus_model_name)
pegasus_model = pegasus_model.to(device)
qwen3_model_name = "Qwen/Qwen3-0.6B"
qwen3_tokenizer = AutoTokenizer.from_pretrained(qwen3_model_name)
qwen3_model = AutoModelForCausalLM.from_pretrained(qwen3_model_name)
qwen3_model = qwen3_model.to(device)
qwen3_gguf_llm = Llama.from_pretrained(
repo_id="unsloth/Qwen3-0.6B-GGUF",
filename="Qwen3-0.6B-BF16.gguf",
)
# --- Generation Functions ---
def generate_deepseek(prompt: str) -> (str, str):
inputs = deepseek_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device)
outputs = deepseek_model.generate(
**inputs,
max_new_tokens=512,
temperature=0.7,
top_p=0.9,
do_sample=True,
num_return_sequences=1,
pad_token_id=deepseek_tokenizer.eos_token_id,
)
generated_text = deepseek_tokenizer.decode(outputs[0], skip_special_tokens=True)
if "</think>" in generated_text:
reasoning_content, content = generated_text.split("</think>", 1)
return reasoning_content.strip(), content.strip()
else:
return "", generated_text.strip()
def generate_t5(prompt: str) -> (str, str):
inputs = t5_tokenizer.encode(prompt, return_tensors="pt", max_length=512, truncation=True).to(device)
outputs = t5_model.generate(
inputs,
max_length=512,
num_beams=4,
repetition_penalty=2.5,
length_penalty=1.0,
early_stopping=True,
)
generated_text = t5_tokenizer.decode(outputs[0], skip_special_tokens=True)
if "</think>" in generated_text:
reasoning_content, content = generated_text.split("</think>", 1)
return reasoning_content.strip(), content.strip()
else:
return "", generated_text.strip()
# --- API Endpoints ---
def generate_pegasus(prompt: str) -> (str, str):
# Pegasus expects raw text input (no prefix needed)
inputs = pegasus_tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=1024,
).to(device)
outputs = pegasus_model.generate(
**inputs,
max_new_tokens=150,
num_beams=4,
length_penalty=2.0,
early_stopping=True,
)
generated_text = pegasus_tokenizer.decode(outputs[0], skip_special_tokens=True)
# Pegasus does not use <think> tags, so no reasoning extraction
return "", generated_text.strip()
def generate_qwen3(prompt: str) -> (str, str):
inputs = qwen3_tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=1024,
).to(device)
outputs = qwen3_model.generate(
**inputs,
max_new_tokens=512,
temperature=0.7,
top_p=0.9,
do_sample=True,
num_return_sequences=1,
pad_token_id=qwen3_tokenizer.eos_token_id,
)
generated_text = qwen3_tokenizer.decode(outputs[0], skip_special_tokens=True)
if "</think>" in generated_text:
reasoning_content, content = generated_text.split("</think>", 1)
return reasoning_content.strip(), content.strip()
else:
return "", generated_text.strip()
def generate_qwen3_gguf(prompt: str, max_tokens: int = 256) -> (str, str):
messages = [
{"role": "user", "content": prompt}
]
response = qwen3_gguf_llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
)
generated_text = response['choices'][0]['message']['content']
if "</think>" in generated_text:
reasoning_content, content = generated_text.split("</think>", 1)
return reasoning_content.strip() + "</think>", content.strip()
else:
return "", generated_text.strip()
# --- New summarization endpoint ---
@app.post("/summarize_thread", response_model=SummarizeResponse)
async def summarize_thread(request: SummarizeRequest):
if request.task.lower() != "summarisation":
return JSONResponse(
status_code=400,
content={"error": "Unsupported task. Only 'summarisation' is supported."}
)
individual_summaries = {}
combined_reasonings = []
combined_summaries = []
# Summarize each reply individually
for idx, reply in enumerate(request.replies):
reasoning, summary = generate_qwen3_gguf(reply, max_tokens=256)
individual_summaries[idx] = {
"reasoning": reasoning,
"summary": summary
}
if reasoning:
combined_reasonings.append(reasoning)
combined_summaries.append(summary)
# Combine all individual summaries into one text
combined_summary_text = " ".join(combined_summaries)
# Recursively summarize combined summary if too long (optional)
# Here, we summarize combined summary to get final reasoning and summary
final_reasoning, final_summary = generate_qwen3_gguf(combined_summary_text, max_tokens=256)
# Append final reasoning to combined reasonings
if final_reasoning:
combined_reasonings.append(final_reasoning)
return SummarizeResponse(
individual_summaries=individual_summaries,
combined_reasoning="\n\n".join(combined_reasonings).strip(),
combined_summary=final_summary.strip()
)
@app.post("/generate/{model_name}", response_model=GenerateResponse)
async def generate(
request: PromptRequest,
model_name: str = Path(..., description="Model to use: 'deepseekr1-qwen', 't5-large', 'pegasus-large', 'qwen3-0.6b-hf', or 'qwen3-0.6b-gguf'")
):
if model_name == "deepseekr1-qwen":
reasoning, text = generate_deepseek(request.prompt)
elif model_name == "t5-large":
reasoning, text = generate_t5(request.prompt)
elif model_name == "pegasus-large":
reasoning, text = generate_pegasus(request.prompt)
elif model_name == "qwen3-0.6b-hf":
reasoning, text = generate_qwen3_hf(request.prompt)
elif model_name == "qwen3-0.6b-gguf":
reasoning, text = generate_qwen3_gguf(request.prompt)
else:
return GenerateResponse(reasoning_content="", generated_text=f"Error: Unknown model '{model_name}'.")
return GenerateResponse(reasoning_content=reasoning, generated_text=text)
# --- Global Exception Handler ---
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
print(f"Exception: {exc}")
return JSONResponse(
status_code=status.HTTP_200_OK,
content={
"reasoning_content": "",
"generated_text": f"Error: {str(exc)}"
}
)