|
import time |
|
from datetime import datetime |
|
import os, warnings, nltk, json, subprocess |
|
import numpy as np |
|
from nltk.stem import WordNetLemmatizer |
|
from dotenv import load_dotenv |
|
from sklearn.preprocessing import MinMaxScaler |
|
|
|
os.environ['CURL_CA_BUNDLE'] = "" |
|
warnings.filterwarnings('ignore') |
|
nltk.download('wordnet') |
|
load_dotenv() |
|
|
|
from datasets import load_dataset |
|
import bm25s |
|
from bm25s.hf import BM25HF |
|
|
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import FileResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from schemas import * |
|
|
|
from bs4 import BeautifulSoup |
|
import requests |
|
|
|
lemmatizer = WordNetLemmatizer() |
|
|
|
spec_metadatas = load_dataset("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF_TOKEN"]) |
|
spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"]) |
|
tdoc_locations = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) |
|
bm25_index = BM25HF.load_from_hub("OrganizedProgrammers/3GPPBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"]) |
|
|
|
spec_metadatas = spec_metadatas["train"].to_list() |
|
spec_contents = spec_contents["train"].to_list() |
|
tdoc_locations = tdoc_locations["train"].to_list() |
|
|
|
def get_docs_from_url(url): |
|
"""Get list of documents/directories from a URL""" |
|
try: |
|
response = requests.get(url, verify=False, timeout=10) |
|
soup = BeautifulSoup(response.text, "html.parser") |
|
return [item.get_text() for item in soup.select("tr td a")] |
|
except Exception as e: |
|
print(f"Error accessing {url}: {e}") |
|
return [] |
|
|
|
def get_tdoc_url(doc_id): |
|
for tdoc in tdoc_locations: |
|
if tdoc["doc_id"] == doc_id: |
|
return tdoc["url"] |
|
|
|
def get_spec_url(document): |
|
series = document.split(".")[0].zfill(2) |
|
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{document}" |
|
versions = get_docs_from_url(url) |
|
return url + "/" + versions[-1] if versions != [] else f"Specification {document} not found" |
|
|
|
def get_document(spec_id: str, spec_title: str): |
|
text = [f"{spec_id} - {spec_title}"] |
|
for section in spec_contents: |
|
if spec_id == section["doc_id"]: |
|
text.extend([section['section'], section['content']]) |
|
return text |
|
|
|
app = FastAPI(title="3GPP Document Finder Back-End", description="Backend for 3GPPDocFinder - Searching technical documents & specifications from 3GPP FTP server") |
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
@app.get("/") |
|
def index(): |
|
return FileResponse(os.path.join('templates', 'index.html')) |
|
|
|
@app.post("/find", response_model=DocResponse) |
|
def find_document(request: DocRequest): |
|
start_time = time.time() |
|
document = request.doc_id |
|
url = get_tdoc_url(document) if document[0].isalpha() else get_spec_url(document) |
|
if "Specification" in url or "Document" in url: |
|
raise HTTPException(status_code=404, detail=url) |
|
|
|
version = url.split("/")[-1].replace(".zip", "").split("-")[-1] |
|
scope = None |
|
for spec in spec_metadatas: |
|
if spec['id'] == document: |
|
scope = spec['scope'] |
|
break |
|
return DocResponse( |
|
doc_id=document, |
|
version=version, |
|
url=url, |
|
search_time=time.time() - start_time, |
|
scope=scope |
|
) |
|
|
|
@app.post("/batch", response_model=BatchDocResponse) |
|
def find_multiple_documents(request: BatchDocRequest): |
|
start_time = time.time() |
|
documents = request.doc_ids |
|
results = {} |
|
missing = [] |
|
|
|
for document in documents: |
|
url = get_tdoc_url(document) if document[0].isalpha() else get_spec_url(document) |
|
if "Specification" not in url and "Document" not in url: |
|
results[document] = url |
|
else: |
|
missing.append(document) |
|
|
|
return BatchDocResponse( |
|
results=results, |
|
missing=missing, |
|
search_time=time.time()-start_time |
|
) |
|
|
|
@app.post("/search-spec", response_model=KeywordResponse) |
|
def search_specification_by_keywords(request: KeywordRequest): |
|
start_time = time.time() |
|
boolSensitiveCase = request.case_sensitive |
|
search_mode = request.search_mode |
|
working_group = request.working_group |
|
spec_type = request.spec_type |
|
keywords = [string.lower() if boolSensitiveCase else string for string in request.keywords.split(",")] |
|
print(keywords) |
|
unique_specs = set() |
|
results = [] |
|
|
|
if keywords == [""] and search_mode == "deep": |
|
raise HTTPException(status_code=400, detail="You must enter keywords in deep search mode !") |
|
|
|
for spec in spec_metadatas: |
|
valid = False |
|
if spec['id'] in unique_specs: continue |
|
if spec.get('type', None) is None or (spec_type is not None and spec["type"] != spec_type): continue |
|
if spec.get('working_group', None) is None or (working_group is not None and spec["working_group"] != working_group): continue |
|
|
|
if search_mode == "deep": |
|
contents = [] |
|
doc = get_document(spec["id"], spec["title"]) |
|
docValid = len(doc) > 1 |
|
|
|
if request.mode == "and": |
|
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}+-+{spec['working_group']}" |
|
if all(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): |
|
valid = True |
|
if search_mode == "deep": |
|
if docValid: |
|
for x in range(1, len(doc) - 1, 2): |
|
section_title = doc[x] |
|
section_content = doc[x+1] |
|
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): |
|
if all(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): |
|
valid = True |
|
contents.append({section_title: section_content}) |
|
elif request.mode == "or": |
|
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}+-+{spec['working_group']}" |
|
if any(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): |
|
valid = True |
|
if search_mode == "deep": |
|
if docValid: |
|
for x in range(1, len(doc) - 1, 2): |
|
section_title = doc[x] |
|
section_content = doc[x+1] |
|
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): |
|
if any(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): |
|
valid = True |
|
contents.append({section_title: section_content}) |
|
if valid: |
|
spec_content = spec |
|
if search_mode == "deep": |
|
spec_content["contains"] = {k: v for d in contents for k, v in d.items()} |
|
results.append(spec_content) |
|
else: |
|
unique_specs.add(spec['id']) |
|
|
|
if len(results) > 0: |
|
return KeywordResponse( |
|
results=results, |
|
search_time=time.time() - start_time |
|
) |
|
else: |
|
raise HTTPException(status_code=404, detail="Specifications not found") |
|
|
|
@app.post("/search-spec/experimental", response_model=KeywordResponse) |
|
def bm25_search_specification(request: BM25KeywordRequest): |
|
start_time = time.time() |
|
working_group = request.working_group |
|
spec_type = request.spec_type |
|
threshold = request.threshold |
|
query = request.keywords |
|
|
|
results_out = [] |
|
query_tokens = bm25s.tokenize(query) |
|
results, scores = bm25_index.retrieve(query_tokens, k=len(bm25_index.corpus)) |
|
print("BM25 raw scores:", scores) |
|
|
|
def calculate_boosted_score(metadata, score, query): |
|
title = set(metadata['title'].lower().split()) |
|
q = set(query.lower().split()) |
|
spec_id_presence = 0.5 if metadata['id'].lower() in q else 0 |
|
booster = len(q & title) * 0.5 |
|
return score + spec_id_presence + booster |
|
|
|
spec_scores = {} |
|
spec_indices = {} |
|
spec_details = {} |
|
|
|
for i in range(results.shape[1]): |
|
doc = results[0, i] |
|
score = scores[0, i] |
|
spec = doc["metadata"]["id"] |
|
|
|
boosted_score = calculate_boosted_score(doc['metadata'], score, query) |
|
|
|
if spec not in spec_scores or boosted_score > spec_scores[spec]: |
|
spec_scores[spec] = boosted_score |
|
spec_indices[spec] = i |
|
spec_details[spec] = { |
|
'original_score': score, |
|
'boosted_score': boosted_score, |
|
'doc': doc |
|
} |
|
|
|
def normalize_scores(scores_dict): |
|
if not scores_dict: |
|
return {} |
|
|
|
scores_array = np.array(list(scores_dict.values())).reshape(-1, 1) |
|
scaler = MinMaxScaler() |
|
normalized_scores = scaler.fit_transform(scores_array).flatten() |
|
|
|
normalized_dict = {} |
|
for i, spec in enumerate(scores_dict.keys()): |
|
normalized_dict[spec] = normalized_scores[i] |
|
|
|
return normalized_dict |
|
|
|
normalized_scores = normalize_scores(spec_scores) |
|
|
|
for spec in spec_details: |
|
spec_details[spec]["normalized_score"] = normalized_scores[spec] |
|
|
|
unique_specs = sorted(normalized_scores.keys(), key=lambda x: normalized_scores[x], reverse=True) |
|
|
|
for rank, spec in enumerate(unique_specs, 1): |
|
details = spec_details[spec] |
|
metadata = details['doc']['metadata'] |
|
if metadata.get('type', None) is None or (spec_type is not None and metadata["type"] != spec_type): |
|
continue |
|
if metadata.get('working_group', None) is None or (working_group is not None and metadata["working_group"] != working_group): |
|
continue |
|
if details['normalized_score'] < threshold / 100: |
|
break |
|
results_out.append(metadata) |
|
|
|
if len(results_out) > 0: |
|
return KeywordResponse( |
|
results=results_out, |
|
search_time=time.time() - start_time |
|
) |
|
else: |
|
raise HTTPException(status_code=404, detail="Specifications not found") |