|
from fastapi import APIRouter, UploadFile, File, HTTPException |
|
import os |
|
import shutil |
|
import uuid |
|
from bs4 import BeautifulSoup |
|
from PyPDF2 import PdfReader |
|
|
|
router = APIRouter(tags=["files"]) |
|
|
|
|
|
session_files = {} |
|
|
|
|
|
UPLOAD_ROOT = "uploaded_files" |
|
os.makedirs(UPLOAD_ROOT, exist_ok=True) |
|
|
|
def validate_pdf(file_path: str) -> bool: |
|
"""Validate if file is a valid PDF.""" |
|
try: |
|
reader = PdfReader(file_path) |
|
|
|
return len(reader.pages) > 0 |
|
except: |
|
return False |
|
|
|
def validate_markdown(file_path: str) -> bool: |
|
"""Validate if file is a valid Markdown file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
return len(content) > 0 and any(marker in content for marker in ['#', '-', '*', '`', '[', '>']) |
|
except: |
|
return False |
|
|
|
def validate_html(file_path: str) -> bool: |
|
"""Validate if file is a valid HTML file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
BeautifulSoup(f.read(), 'html.parser') |
|
return True |
|
except: |
|
return False |
|
|
|
def validate_txt(file_path: str) -> bool: |
|
"""Validate if file is a valid text file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
return len(content.strip()) > 0 |
|
except: |
|
return False |
|
|
|
|
|
precalculated_docs = ["the-bitter-lesson", "hurricane-faq", "pokemon-guide"] |
|
|
|
for doc_id in precalculated_docs: |
|
doc_dir = os.path.join(UPLOAD_ROOT, doc_id) |
|
if os.path.exists(doc_dir): |
|
doc_files_dir = os.path.join(doc_dir, "uploaded_files") |
|
if os.path.exists(doc_files_dir): |
|
for filename in os.listdir(doc_files_dir): |
|
if filename.endswith((".pdf", ".txt", ".html", ".md")): |
|
file_path = os.path.join(doc_files_dir, filename) |
|
session_files[doc_id] = file_path |
|
print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") |
|
break |
|
else: |
|
|
|
for filename in os.listdir(doc_dir): |
|
if filename.endswith((".pdf", ".txt", ".html", ".md")): |
|
file_path = os.path.join(doc_dir, filename) |
|
session_files[doc_id] = file_path |
|
print(f"Added pre-calculated document to session_files: {doc_id} -> {file_path}") |
|
break |
|
|
|
@router.post("/upload") |
|
async def upload_file(file: UploadFile = File(...)): |
|
""" |
|
Upload a file to the server and generate a session ID |
|
|
|
Args: |
|
file: The file to upload |
|
|
|
Returns: |
|
Dictionary with filename, status and session_id |
|
""" |
|
|
|
if not file.filename.endswith(('.pdf', '.txt', '.html', '.md')): |
|
raise HTTPException(status_code=400, detail="Only PDF, TXT, HTML and MD files are accepted") |
|
|
|
|
|
file_extension = os.path.splitext(file.filename)[1].lower() |
|
|
|
|
|
session_id = str(uuid.uuid4()) |
|
|
|
|
|
session_dir = os.path.join(UPLOAD_ROOT, session_id) |
|
uploaded_files_dir = os.path.join(session_dir, "uploaded_files") |
|
os.makedirs(uploaded_files_dir, exist_ok=True) |
|
|
|
|
|
standardized_filename = f"document{file_extension}" |
|
|
|
|
|
file_path = os.path.join(uploaded_files_dir, standardized_filename) |
|
|
|
|
|
with open(file_path, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
is_valid = False |
|
if file_extension == '.pdf': |
|
is_valid = validate_pdf(file_path) |
|
elif file_extension == '.md': |
|
is_valid = validate_markdown(file_path) |
|
elif file_extension == '.html': |
|
is_valid = validate_html(file_path) |
|
elif file_extension == '.txt': |
|
is_valid = validate_txt(file_path) |
|
|
|
if not is_valid: |
|
|
|
os.remove(file_path) |
|
raise HTTPException(status_code=400, detail=f"Invalid {file_extension[1:].upper()} file") |
|
|
|
|
|
session_files[session_id] = file_path |
|
|
|
return {"filename": standardized_filename, "status": "uploaded", "session_id": session_id} |