import time from datetime import datetime import os, warnings, nltk, json, subprocess import numpy as np from nltk.stem import WordNetLemmatizer from dotenv import load_dotenv from sklearn.preprocessing import MinMaxScaler os.environ['CURL_CA_BUNDLE'] = "" warnings.filterwarnings('ignore') nltk.download('wordnet') load_dotenv() from datasets import load_dataset import bm25s from bm25s.hf import BM25HF from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from schemas import * from bs4 import BeautifulSoup import requests lemmatizer = WordNetLemmatizer() spec_metadatas = load_dataset("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF_TOKEN"]) spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"]) tdoc_locations = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) bm25_index = BM25HF.load_from_hub("OrganizedProgrammers/3GPPBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"]) spec_metadatas = spec_metadatas["train"].to_list() spec_contents = spec_contents["train"].to_list() tdoc_locations = tdoc_locations["train"].to_list() def get_docs_from_url(url): """Get list of documents/directories from a URL""" try: response = requests.get(url, verify=False, timeout=10) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: print(f"Error accessing {url}: {e}") return [] def get_tdoc_url(doc_id): for tdoc in tdoc_locations: if tdoc["doc_id"] == doc_id: return tdoc["url"] def get_spec_url(document): series = document.split(".")[0].zfill(2) url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{document}" versions = get_docs_from_url(url) return url + "/" + versions[-1] if versions != [] else f"Specification {document} not found" def get_document(spec_id: str, spec_title: str): text = [f"{spec_id} - {spec_title}"] for section in spec_contents: if spec_id == section["doc_id"]: text.extend([section['section'], section['content']]) return text app = FastAPI(title="3GPP Document Finder Back-End", description="Backend for 3GPPDocFinder - Searching technical documents & specifications from 3GPP FTP server") app.mount("/static", StaticFiles(directory="static"), name="static") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def index(): return FileResponse(os.path.join('templates', 'index.html')) @app.post("/find", response_model=DocResponse) def find_document(request: DocRequest): start_time = time.time() document = request.doc_id url = get_tdoc_url(document) if document[0].isalpha() else get_spec_url(document) if "Specification" in url or "Document" in url: raise HTTPException(status_code=404, detail=url) version = url.split("/")[-1].replace(".zip", "").split("-")[-1] scope = None for spec in spec_metadatas: if spec['id'] == document: scope = spec['scope'] break return DocResponse( doc_id=document, version=version, url=url, search_time=time.time() - start_time, scope=scope ) @app.post("/batch", response_model=BatchDocResponse) def find_multiple_documents(request: BatchDocRequest): start_time = time.time() documents = request.doc_ids results = {} missing = [] for document in documents: url = get_tdoc_url(document) if document[0].isalpha() else get_spec_url(document) if "Specification" not in url and "Document" not in url: results[document] = url else: missing.append(document) return BatchDocResponse( results=results, missing=missing, search_time=time.time()-start_time ) @app.post("/search-spec", response_model=KeywordResponse) def search_specification_by_keywords(request: KeywordRequest): start_time = time.time() boolSensitiveCase = request.case_sensitive search_mode = request.search_mode working_group = request.working_group spec_type = request.spec_type keywords = [string.lower() if boolSensitiveCase else string for string in request.keywords.split(",")] print(keywords) unique_specs = set() results = [] if keywords == [""] and search_mode == "deep": raise HTTPException(status_code=400, detail="You must enter keywords in deep search mode !") for spec in spec_metadatas: valid = False if spec['id'] in unique_specs: continue if spec.get('type', None) is None or (spec_type is not None and spec["type"] != spec_type): continue if spec.get('working_group', None) is None or (working_group is not None and spec["working_group"] != working_group): continue if search_mode == "deep": contents = [] doc = get_document(spec["id"], spec["title"]) docValid = len(doc) > 1 if request.mode == "and": string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}+-+{spec['working_group']}" if all(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): valid = True if search_mode == "deep": if docValid: for x in range(1, len(doc) - 1, 2): section_title = doc[x] section_content = doc[x+1] if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): if all(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): valid = True contents.append({section_title: section_content}) elif request.mode == "or": string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}+-+{spec['working_group']}" if any(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): valid = True if search_mode == "deep": if docValid: for x in range(1, len(doc) - 1, 2): section_title = doc[x] section_content = doc[x+1] if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): if any(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): valid = True contents.append({section_title: section_content}) if valid: spec_content = spec if search_mode == "deep": spec_content["contains"] = {k: v for d in contents for k, v in d.items()} results.append(spec_content) else: unique_specs.add(spec['id']) if len(results) > 0: return KeywordResponse( results=results, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail="Specifications not found") @app.post("/search-spec/experimental", response_model=KeywordResponse) def bm25_search_specification(request: BM25KeywordRequest): start_time = time.time() working_group = request.working_group spec_type = request.spec_type threshold = request.threshold query = request.keywords results_out = [] query_tokens = bm25s.tokenize(query) results, scores = bm25_index.retrieve(query_tokens, k=len(bm25_index.corpus)) print("BM25 raw scores:", scores) def calculate_boosted_score(metadata, score, query): title = set(metadata['title'].lower().split()) q = set(query.lower().split()) spec_id_presence = 0.5 if metadata['id'].lower() in q else 0 booster = len(q & title) * 0.5 return score + spec_id_presence + booster spec_scores = {} spec_indices = {} spec_details = {} for i in range(results.shape[1]): doc = results[0, i] score = scores[0, i] spec = doc["metadata"]["id"] boosted_score = calculate_boosted_score(doc['metadata'], score, query) if spec not in spec_scores or boosted_score > spec_scores[spec]: spec_scores[spec] = boosted_score spec_indices[spec] = i spec_details[spec] = { 'original_score': score, 'boosted_score': boosted_score, 'doc': doc } def normalize_scores(scores_dict): if not scores_dict: return {} scores_array = np.array(list(scores_dict.values())).reshape(-1, 1) scaler = MinMaxScaler() normalized_scores = scaler.fit_transform(scores_array).flatten() normalized_dict = {} for i, spec in enumerate(scores_dict.keys()): normalized_dict[spec] = normalized_scores[i] return normalized_dict normalized_scores = normalize_scores(spec_scores) for spec in spec_details: spec_details[spec]["normalized_score"] = normalized_scores[spec] unique_specs = sorted(normalized_scores.keys(), key=lambda x: normalized_scores[x], reverse=True) for rank, spec in enumerate(unique_specs, 1): details = spec_details[spec] metadata = details['doc']['metadata'] if metadata.get('type', None) is None or (spec_type is not None and metadata["type"] != spec_type): continue if metadata.get('working_group', None) is None or (working_group is not None and metadata["working_group"] != working_group): continue if details['normalized_score'] < threshold / 100: break results_out.append(metadata) if len(results_out) > 0: return KeywordResponse( results=results_out, search_time=time.time() - start_time ) else: raise HTTPException(status_code=404, detail="Specifications not found")