Spaces:
Sleeping
Sleeping
File size: 3,691 Bytes
293ab16 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
import os
import shutil
import tempfile
import logging
from typing import Optional, List
from fastapi import UploadFile, HTTPException
from app.embeddings import add_to_vector_store
UPLOAD_DIR = "uploaded_files"
os.makedirs(UPLOAD_DIR, exist_ok=True)
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
ALLOWED_EXTENSIONS = {".txt", ".md", ".pdf"} # Customize as needed
logger = logging.getLogger(__name__)
def is_extension_allowed(filename: str) -> bool:
ext = os.path.splitext(filename)[1].lower()
return ext in ALLOWED_EXTENSIONS
def save_upload_to_disk(file: UploadFile, upload_dir: Optional[str] = None) -> str:
"""
Save UploadFile to disk under upload_dir (defaults to UPLOAD_DIR).
Prevent overwriting by appending a counter if needed.
Returns the saved file path.
"""
target_dir = upload_dir or UPLOAD_DIR
os.makedirs(target_dir, exist_ok=True)
filename = file.filename
path = os.path.join(target_dir, filename)
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(path):
filename = f"{base}_{counter}{ext}"
path = os.path.join(target_dir, filename)
counter += 1
with open(path, "wb") as f:
file.file.seek(0)
shutil.copyfileobj(file.file, f)
logger.info(f"π File saved to {path}")
return path
async def save_upload(file: UploadFile) -> str:
"""
Saves uploaded file to disk and returns the saved file path.
"""
file_path = os.path.join(UPLOAD_DIR, file.filename)
os.makedirs(UPLOAD_DIR, exist_ok=True)
with open(file_path, "wb") as out_file:
content = await file.read()
out_file.write(content)
logger.info(f"β
Uploaded file saved: {file_path}")
return file_path
def save_upload_temp(file: UploadFile) -> str:
"""
Save UploadFile to a temp file and return path.
"""
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, file.filename)
with open(temp_path, "wb") as f:
file.file.seek(0)
shutil.copyfileobj(file.file, f)
logger.debug(f"π¦ File saved temporarily at {temp_path}")
return temp_path
def read_file_content(file: UploadFile, max_size: int = MAX_FILE_SIZE) -> str:
"""
Read file content as decoded string.
Raises HTTPException if file is too large.
"""
file.file.seek(0)
content_bytes = file.file.read()
if len(content_bytes) > max_size:
raise HTTPException(status_code=413, detail="File too large")
return content_bytes.decode(errors="ignore")
def summarize_file_content(content: str, max_lines: int = 3) -> str:
"""
Return first max_lines lines of content, add "..." if truncated.
"""
lines = content.strip().splitlines()
summary = "\n".join(lines[:max_lines])
if len(lines) > max_lines:
summary += "\n..."
return summary
def process_uploaded_file(file: UploadFile) -> str:
"""
Read uploaded file content, validate type, add to vector store, return summary.
"""
if not is_extension_allowed(file.filename):
raise HTTPException(status_code=415, detail="Unsupported file type")
try:
content = read_file_content(file)
doc = {"page_content": content, "metadata": {"source": file.filename}}
success = add_to_vector_store([doc], vector_store=None) # Provide actual vector_store if required
logger.info(f"Vector store add success: {success}")
return summarize_file_content(content)
except Exception as e:
logger.error(f"β Error processing uploaded file: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Failed to process uploaded file")
|