3GPPDocFinder / app.py
om4r932's picture
Fix
e67d03e
import time
from datetime import datetime
import os, warnings, nltk, json, subprocess
import numpy as np
from nltk.stem import WordNetLemmatizer
from dotenv import load_dotenv
from sklearn.preprocessing import MinMaxScaler
os.environ['CURL_CA_BUNDLE'] = ""
warnings.filterwarnings('ignore')
nltk.download('wordnet')
load_dotenv()
from datasets import load_dataset
import bm25s
from bm25s.hf import BM25HF
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from schemas import *
from bs4 import BeautifulSoup
import requests
lemmatizer = WordNetLemmatizer()
spec_metadatas = load_dataset("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF_TOKEN"])
spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"])
tdoc_locations = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"])
bm25_index = BM25HF.load_from_hub("OrganizedProgrammers/3GPPBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"])
spec_metadatas = spec_metadatas["train"].to_list()
spec_contents = spec_contents["train"].to_list()
tdoc_locations = tdoc_locations["train"].to_list()
def get_docs_from_url(url):
"""Get list of documents/directories from a URL"""
try:
response = requests.get(url, verify=False, timeout=10)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
print(f"Error accessing {url}: {e}")
return []
def get_tdoc_url(doc_id):
for tdoc in tdoc_locations:
if tdoc["doc_id"] == doc_id:
return tdoc["url"]
def get_spec_url(document):
series = document.split(".")[0].zfill(2)
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{document}"
versions = get_docs_from_url(url)
return url + "/" + versions[-1] if versions != [] else f"Specification {document} not found"
def get_document(spec_id: str, spec_title: str):
text = [f"{spec_id} - {spec_title}"]
for section in spec_contents:
if spec_id == section["doc_id"]:
text.extend([section['section'], section['content']])
return text
app = FastAPI(title="3GPP Document Finder Back-End", description="Backend for 3GPPDocFinder - Searching technical documents & specifications from 3GPP FTP server")
app.mount("/static", StaticFiles(directory="static"), name="static")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def index():
return FileResponse(os.path.join('templates', 'index.html'))
@app.post("/find", response_model=DocResponse)
def find_document(request: DocRequest):
start_time = time.time()
document = request.doc_id
url = get_tdoc_url(document) if document[0].isalpha() else get_spec_url(document)
if "Specification" in url or "Document" in url:
raise HTTPException(status_code=404, detail=url)
version = url.split("/")[-1].replace(".zip", "").split("-")[-1]
scope = None
for spec in spec_metadatas:
if spec['id'] == document:
scope = spec['scope']
break
return DocResponse(
doc_id=document,
version=version,
url=url,
search_time=time.time() - start_time,
scope=scope
)
@app.post("/batch", response_model=BatchDocResponse)
def find_multiple_documents(request: BatchDocRequest):
start_time = time.time()
documents = request.doc_ids
results = {}
missing = []
for document in documents:
url = get_tdoc_url(document) if document[0].isalpha() else get_spec_url(document)
if "Specification" not in url and "Document" not in url:
results[document] = url
else:
missing.append(document)
return BatchDocResponse(
results=results,
missing=missing,
search_time=time.time()-start_time
)
@app.post("/search-spec", response_model=KeywordResponse)
def search_specification_by_keywords(request: KeywordRequest):
start_time = time.time()
boolSensitiveCase = request.case_sensitive
search_mode = request.search_mode
working_group = request.working_group
spec_type = request.spec_type
keywords = [string.lower() if boolSensitiveCase else string for string in request.keywords.split(",")]
print(keywords)
unique_specs = set()
results = []
if keywords == [""] and search_mode == "deep":
raise HTTPException(status_code=400, detail="You must enter keywords in deep search mode !")
for spec in spec_metadatas:
valid = False
if spec['id'] in unique_specs: continue
if spec.get('type', None) is None or (spec_type is not None and spec["type"] != spec_type): continue
if spec.get('working_group', None) is None or (working_group is not None and spec["working_group"] != working_group): continue
if search_mode == "deep":
contents = []
doc = get_document(spec["id"], spec["title"])
docValid = len(doc) > 1
if request.mode == "and":
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}+-+{spec['working_group']}"
if all(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords):
valid = True
if search_mode == "deep":
if docValid:
for x in range(1, len(doc) - 1, 2):
section_title = doc[x]
section_content = doc[x+1]
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower():
if all(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords):
valid = True
contents.append({section_title: section_content})
elif request.mode == "or":
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}+-+{spec['working_group']}"
if any(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords):
valid = True
if search_mode == "deep":
if docValid:
for x in range(1, len(doc) - 1, 2):
section_title = doc[x]
section_content = doc[x+1]
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower():
if any(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords):
valid = True
contents.append({section_title: section_content})
if valid:
spec_content = spec
if search_mode == "deep":
spec_content["contains"] = {k: v for d in contents for k, v in d.items()}
results.append(spec_content)
else:
unique_specs.add(spec['id'])
if len(results) > 0:
return KeywordResponse(
results=results,
search_time=time.time() - start_time
)
else:
raise HTTPException(status_code=404, detail="Specifications not found")
@app.post("/search-spec/experimental", response_model=KeywordResponse)
def bm25_search_specification(request: BM25KeywordRequest):
start_time = time.time()
working_group = request.working_group
spec_type = request.spec_type
threshold = request.threshold
query = request.keywords
results_out = []
query_tokens = bm25s.tokenize(query)
results, scores = bm25_index.retrieve(query_tokens, k=len(bm25_index.corpus))
print("BM25 raw scores:", scores)
def calculate_boosted_score(metadata, score, query):
title = set(metadata['title'].lower().split())
q = set(query.lower().split())
spec_id_presence = 0.5 if metadata['id'].lower() in q else 0
booster = len(q & title) * 0.5
return score + spec_id_presence + booster
spec_scores = {}
spec_indices = {}
spec_details = {}
for i in range(results.shape[1]):
doc = results[0, i]
score = scores[0, i]
spec = doc["metadata"]["id"]
boosted_score = calculate_boosted_score(doc['metadata'], score, query)
if spec not in spec_scores or boosted_score > spec_scores[spec]:
spec_scores[spec] = boosted_score
spec_indices[spec] = i
spec_details[spec] = {
'original_score': score,
'boosted_score': boosted_score,
'doc': doc
}
def normalize_scores(scores_dict):
if not scores_dict:
return {}
scores_array = np.array(list(scores_dict.values())).reshape(-1, 1)
scaler = MinMaxScaler()
normalized_scores = scaler.fit_transform(scores_array).flatten()
normalized_dict = {}
for i, spec in enumerate(scores_dict.keys()):
normalized_dict[spec] = normalized_scores[i]
return normalized_dict
normalized_scores = normalize_scores(spec_scores)
for spec in spec_details:
spec_details[spec]["normalized_score"] = normalized_scores[spec]
unique_specs = sorted(normalized_scores.keys(), key=lambda x: normalized_scores[x], reverse=True)
for rank, spec in enumerate(unique_specs, 1):
details = spec_details[spec]
metadata = details['doc']['metadata']
if metadata.get('type', None) is None or (spec_type is not None and metadata["type"] != spec_type):
continue
if metadata.get('working_group', None) is None or (working_group is not None and metadata["working_group"] != working_group):
continue
if details['normalized_score'] < threshold / 100:
break
results_out.append(metadata)
if len(results_out) > 0:
return KeywordResponse(
results=results_out,
search_time=time.time() - start_time
)
else:
raise HTTPException(status_code=404, detail="Specifications not found")