Spaces:
Running
Running
''' | |
from fastapi import FastAPI, Query | |
from pydantic import BaseModel | |
import cloudscraper | |
from bs4 import BeautifulSoup | |
from transformers import pipeline | |
import torch | |
import re | |
import os | |
#os.environ["HF_HOME"] = "/home/user/huggingface" | |
#os.environ["TRANSFORMERS_CACHE"] = "/home/user/huggingface" | |
app = FastAPI() | |
class ThreadResponse(BaseModel): | |
question: str | |
replies: list[str] | |
def clean_text(text: str) -> str: | |
text = text.strip() | |
text = re.sub(r"\b\d+\s*likes?,?\s*\d*\s*replies?$", "", text, flags=re.IGNORECASE).strip() | |
return text | |
@app.get("/scrape", response_model=ThreadResponse) | |
def scrape(url: str = Query(...)): | |
scraper = cloudscraper.create_scraper() | |
response = scraper.get(url) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.content, 'html.parser') | |
comment_containers = soup.find_all('div', class_='post__content') | |
if comment_containers: | |
question = clean_text(comment_containers[0].get_text(strip=True, separator="\n")) | |
replies = [clean_text(comment.get_text(strip=True, separator="\n")) for comment in comment_containers[1:]] | |
return ThreadResponse(question=question, replies=replies) | |
return ThreadResponse(question="", replies=[]) | |
MODEL_NAME = "microsoft/phi-2" | |
# Load the text-generation pipeline once at startup | |
text_generator = pipeline( | |
"text-generation", | |
model=MODEL_NAME, | |
trust_remote_code=True, | |
device=0 if torch.cuda.is_available() else -1, # GPU if available, else CPU | |
) | |
class PromptRequest(BaseModel): | |
prompt: str | |
@app.post("/generate") | |
async def generate_text(request: PromptRequest): | |
# The model expects a string prompt, so pass request.prompt directly | |
outputs = text_generator( | |
request.prompt, | |
max_new_tokens=512, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
num_return_sequences=1, | |
) | |
generated_text = outputs[0]['generated_text'] | |
# Optional: parse reasoning and content if your model uses special tags like </think> | |
if "</think>" in generated_text: | |
reasoning_content = generated_text.split("</think>")[0].strip() | |
content = generated_text.split("</think>")[1].strip() | |
else: | |
reasoning_content = "" | |
content = generated_text.strip() | |
return { | |
"reasoning_content": reasoning_content, | |
"generated_text": content | |
} | |
''' | |
from fastapi import FastAPI, Query, Path | |
from pydantic import BaseModel | |
import cloudscraper | |
from bs4 import BeautifulSoup | |
from transformers import AutoTokenizer, AutoModelForCausalLM, T5Tokenizer, T5ForConditionalGeneration, PegasusTokenizer, PegasusForConditionalGeneration | |
import torch | |
import re | |
from fastapi.responses import JSONResponse | |
from fastapi.requests import Request | |
from fastapi import status | |
from typing import List, Dict, Optional | |
from llama_cpp import Llama | |
app = FastAPI() | |
# --- Data Models --- | |
class ThreadResponse(BaseModel): | |
question: str | |
replies: list[str] | |
class PromptRequest(BaseModel): | |
prompt: str | |
class GenerateResponse(BaseModel): | |
reasoning_content: str | |
generated_text: str | |
# New model for summarization request | |
class SummarizeRequest(BaseModel): | |
replies: List[str] | |
task: str # expecting "summarisation" | |
# New model for summarization response | |
class SummarizeResponse(BaseModel): | |
individual_summaries: Dict[int, Dict[str, str]] # {index: {"reasoning": str, "summary": str}} | |
combined_reasoning: str | |
combined_summary: str | |
# --- Utility Functions --- | |
def clean_text(text: str) -> str: | |
text = text.strip() | |
text = re.sub(r"\b\d+\s*likes?,?\s*\d*\s*replies?$", "", text, flags=re.IGNORECASE).strip() | |
return text | |
# --- Scraping Endpoint --- | |
def scrape(url: str): | |
scraper = cloudscraper.create_scraper() | |
response = scraper.get(url) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.content, "html.parser") | |
comment_containers = soup.find_all("div", class_="post__content") | |
if comment_containers: | |
question = clean_text(comment_containers[0].get_text(strip=True, separator="\n")) | |
replies = [clean_text(comment.get_text(strip=True, separator="\n")) for comment in comment_containers[1:]] | |
return ThreadResponse(question=question, replies=replies) | |
return ThreadResponse(question="", replies=[]) | |
# --- Load DeepSeek-R1-Distill-Qwen-1.5B Model & Tokenizer --- | |
deepseek_model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B" | |
deepseek_tokenizer = AutoTokenizer.from_pretrained(deepseek_model_name) | |
deepseek_model = AutoModelForCausalLM.from_pretrained(deepseek_model_name) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
deepseek_model = deepseek_model.to(device) | |
# --- Load T5-Large Model & Tokenizer --- | |
t5_model_name = "google-t5/t5-large" | |
t5_tokenizer = T5Tokenizer.from_pretrained(t5_model_name) | |
t5_model = T5ForConditionalGeneration.from_pretrained(t5_model_name) | |
t5_model = t5_model.to(device) | |
pegasus_model_name = "google/pegasus-large" | |
pegasus_tokenizer = PegasusTokenizer.from_pretrained(pegasus_model_name) | |
pegasus_model = PegasusForConditionalGeneration.from_pretrained(pegasus_model_name) | |
pegasus_model = pegasus_model.to(device) | |
qwen3_model_name = "Qwen/Qwen3-0.6B" | |
qwen3_tokenizer = AutoTokenizer.from_pretrained(qwen3_model_name) | |
qwen3_model = AutoModelForCausalLM.from_pretrained(qwen3_model_name) | |
qwen3_model = qwen3_model.to(device) | |
qwen3_gguf_llm = Llama.from_pretrained( | |
repo_id="unsloth/Qwen3-0.6B-GGUF", | |
filename="Qwen3-0.6B-BF16.gguf", | |
) | |
# --- Generation Functions --- | |
def generate_deepseek(prompt: str) -> (str, str): | |
inputs = deepseek_tokenizer(prompt, return_tensors="pt", truncation=True, max_length=1024).to(device) | |
outputs = deepseek_model.generate( | |
**inputs, | |
max_new_tokens=512, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
num_return_sequences=1, | |
pad_token_id=deepseek_tokenizer.eos_token_id, | |
) | |
generated_text = deepseek_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
if "</think>" in generated_text: | |
reasoning_content, content = generated_text.split("</think>", 1) | |
return reasoning_content.strip(), content.strip() | |
else: | |
return "", generated_text.strip() | |
def generate_t5(prompt: str) -> (str, str): | |
inputs = t5_tokenizer.encode(prompt, return_tensors="pt", max_length=512, truncation=True).to(device) | |
outputs = t5_model.generate( | |
inputs, | |
max_length=512, | |
num_beams=4, | |
repetition_penalty=2.5, | |
length_penalty=1.0, | |
early_stopping=True, | |
) | |
generated_text = t5_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
if "</think>" in generated_text: | |
reasoning_content, content = generated_text.split("</think>", 1) | |
return reasoning_content.strip(), content.strip() | |
else: | |
return "", generated_text.strip() | |
# --- API Endpoints --- | |
def generate_pegasus(prompt: str) -> (str, str): | |
# Pegasus expects raw text input (no prefix needed) | |
inputs = pegasus_tokenizer( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=1024, | |
).to(device) | |
outputs = pegasus_model.generate( | |
**inputs, | |
max_new_tokens=150, | |
num_beams=4, | |
length_penalty=2.0, | |
early_stopping=True, | |
) | |
generated_text = pegasus_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
# Pegasus does not use <think> tags, so no reasoning extraction | |
return "", generated_text.strip() | |
def generate_qwen3(prompt: str) -> (str, str): | |
inputs = qwen3_tokenizer( | |
prompt, | |
return_tensors="pt", | |
truncation=True, | |
max_length=1024, | |
).to(device) | |
outputs = qwen3_model.generate( | |
**inputs, | |
max_new_tokens=512, | |
temperature=0.7, | |
top_p=0.9, | |
do_sample=True, | |
num_return_sequences=1, | |
pad_token_id=qwen3_tokenizer.eos_token_id, | |
) | |
generated_text = qwen3_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
if "</think>" in generated_text: | |
reasoning_content, content = generated_text.split("</think>", 1) | |
return reasoning_content.strip(), content.strip() | |
else: | |
return "", generated_text.strip() | |
def generate_qwen3_gguf(prompt: str, max_tokens: int = 256) -> (str, str): | |
messages = [ | |
{"role": "user", "content": prompt} | |
] | |
response = qwen3_gguf_llm.create_chat_completion( | |
messages=messages, | |
max_tokens=max_tokens, | |
) | |
generated_text = response['choices'][0]['message']['content'] | |
if "</think>" in generated_text: | |
reasoning_content, content = generated_text.split("</think>", 1) | |
return reasoning_content.strip() + "</think>", content.strip() | |
else: | |
return "", generated_text.strip() | |
# --- New summarization endpoint --- | |
async def summarize_thread(request: SummarizeRequest): | |
if request.task.lower() != "summarisation": | |
return JSONResponse( | |
status_code=400, | |
content={"error": "Unsupported task. Only 'summarisation' is supported."} | |
) | |
individual_summaries = {} | |
combined_reasonings = [] | |
combined_summaries = [] | |
# Summarize each reply individually | |
for idx, reply in enumerate(request.replies): | |
reasoning, summary = generate_qwen3_gguf(reply, max_tokens=256) | |
individual_summaries[idx] = { | |
"reasoning": reasoning, | |
"summary": summary | |
} | |
if reasoning: | |
combined_reasonings.append(reasoning) | |
combined_summaries.append(summary) | |
# Combine all individual summaries into one text | |
combined_summary_text = " ".join(combined_summaries) | |
# Recursively summarize combined summary if too long (optional) | |
# Here, we summarize combined summary to get final reasoning and summary | |
final_reasoning, final_summary = generate_qwen3_gguf(combined_summary_text, max_tokens=256) | |
# Append final reasoning to combined reasonings | |
if final_reasoning: | |
combined_reasonings.append(final_reasoning) | |
return SummarizeResponse( | |
individual_summaries=individual_summaries, | |
combined_reasoning="\n\n".join(combined_reasonings).strip(), | |
combined_summary=final_summary.strip() | |
) | |
async def generate( | |
request: PromptRequest, | |
model_name: str = Path(..., description="Model to use: 'deepseekr1-qwen', 't5-large', 'pegasus-large', 'qwen3-0.6b-hf', or 'qwen3-0.6b-gguf'") | |
): | |
if model_name == "deepseekr1-qwen": | |
reasoning, text = generate_deepseek(request.prompt) | |
elif model_name == "t5-large": | |
reasoning, text = generate_t5(request.prompt) | |
elif model_name == "pegasus-large": | |
reasoning, text = generate_pegasus(request.prompt) | |
elif model_name == "qwen3-0.6b-hf": | |
reasoning, text = generate_qwen3_hf(request.prompt) | |
elif model_name == "qwen3-0.6b-gguf": | |
reasoning, text = generate_qwen3_gguf(request.prompt) | |
else: | |
return GenerateResponse(reasoning_content="", generated_text=f"Error: Unknown model '{model_name}'.") | |
return GenerateResponse(reasoning_content=reasoning, generated_text=text) | |
# --- Global Exception Handler --- | |
async def global_exception_handler(request: Request, exc: Exception): | |
print(f"Exception: {exc}") | |
return JSONResponse( | |
status_code=status.HTTP_200_OK, | |
content={ | |
"reasoning_content": "", | |
"generated_text": f"Error: {str(exc)}" | |
} | |
) | |