Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import tempfile | |
import logging | |
from typing import Optional, List | |
from fastapi import UploadFile, HTTPException | |
from app.embeddings import add_to_vector_store | |
UPLOAD_DIR = "uploaded_files" | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB | |
ALLOWED_EXTENSIONS = {".txt", ".md", ".pdf"} # Customize as needed | |
logger = logging.getLogger(__name__) | |
def is_extension_allowed(filename: str) -> bool: | |
ext = os.path.splitext(filename)[1].lower() | |
return ext in ALLOWED_EXTENSIONS | |
def save_upload_to_disk(file: UploadFile, upload_dir: Optional[str] = None) -> str: | |
""" | |
Save UploadFile to disk under upload_dir (defaults to UPLOAD_DIR). | |
Prevent overwriting by appending a counter if needed. | |
Returns the saved file path. | |
""" | |
target_dir = upload_dir or UPLOAD_DIR | |
os.makedirs(target_dir, exist_ok=True) | |
filename = file.filename | |
path = os.path.join(target_dir, filename) | |
base, ext = os.path.splitext(filename) | |
counter = 1 | |
while os.path.exists(path): | |
filename = f"{base}_{counter}{ext}" | |
path = os.path.join(target_dir, filename) | |
counter += 1 | |
with open(path, "wb") as f: | |
file.file.seek(0) | |
shutil.copyfileobj(file.file, f) | |
logger.info(f"π File saved to {path}") | |
return path | |
async def save_upload(file: UploadFile) -> str: | |
""" | |
Saves uploaded file to disk and returns the saved file path. | |
""" | |
file_path = os.path.join(UPLOAD_DIR, file.filename) | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
with open(file_path, "wb") as out_file: | |
content = await file.read() | |
out_file.write(content) | |
logger.info(f"β Uploaded file saved: {file_path}") | |
return file_path | |
def save_upload_temp(file: UploadFile) -> str: | |
""" | |
Save UploadFile to a temp file and return path. | |
""" | |
temp_dir = tempfile.gettempdir() | |
temp_path = os.path.join(temp_dir, file.filename) | |
with open(temp_path, "wb") as f: | |
file.file.seek(0) | |
shutil.copyfileobj(file.file, f) | |
logger.debug(f"π¦ File saved temporarily at {temp_path}") | |
return temp_path | |
def read_file_content(file: UploadFile, max_size: int = MAX_FILE_SIZE) -> str: | |
""" | |
Read file content as decoded string. | |
Raises HTTPException if file is too large. | |
""" | |
file.file.seek(0) | |
content_bytes = file.file.read() | |
if len(content_bytes) > max_size: | |
raise HTTPException(status_code=413, detail="File too large") | |
return content_bytes.decode(errors="ignore") | |
def summarize_file_content(content: str, max_lines: int = 3) -> str: | |
""" | |
Return first max_lines lines of content, add "..." if truncated. | |
""" | |
lines = content.strip().splitlines() | |
summary = "\n".join(lines[:max_lines]) | |
if len(lines) > max_lines: | |
summary += "\n..." | |
return summary | |
def process_uploaded_file(file: UploadFile) -> str: | |
""" | |
Read uploaded file content, validate type, add to vector store, return summary. | |
""" | |
if not is_extension_allowed(file.filename): | |
raise HTTPException(status_code=415, detail="Unsupported file type") | |
try: | |
content = read_file_content(file) | |
doc = {"page_content": content, "metadata": {"source": file.filename}} | |
success = add_to_vector_store([doc], vector_store=None) # Provide actual vector_store if required | |
logger.info(f"Vector store add success: {success}") | |
return summarize_file_content(content) | |
except Exception as e: | |
logger.error(f"β Error processing uploaded file: {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail="Failed to process uploaded file") | |