from fastapi import FastAPI, HTTPException, Depends, Request, Response from fastapi.responses import StreamingResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import httpx import json import asyncio import secrets from typing import Dict, Any, Optional from datetime import datetime, timedelta from auth import verify_token, verify_admin_token from config import settings import logging from pydantic import BaseModel, Field from typing import List, Dict, Any, Optional # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): # Startup logger.info("🚀 Starting Authenticated Ollama API with Admin Panel...") # Wait for Ollama to be ready max_retries = 30 for i in range(max_retries): try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"http://{settings.ollama_host}/api/tags") if response.status_code == 200: logger.info("✅ Ollama is ready!") break except Exception as e: logger.info(f"⏳ Waiting for Ollama... ({i+1}/{max_retries})") await asyncio.sleep(2) else: logger.warning("⚠️ Ollama not responding, but continuing...") yield # Shutdown (if needed) logger.info("🛑 Shutting down...") app = FastAPI( title="Authenticated Ollama API with Admin Panel", description="Secure Ollama API with Bearer Token Authentication and Admin Key Management", version="2.0.0", docs_url="/docs", redoc_url="/redoc", lifespan=lifespan ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================= # PYDANTIC MODELS FOR REQUEST BODIES # ============================================================================= class GenerateRequest(BaseModel): model: str = Field(..., description="Model name to use", example="hf.co/bartowski/Llama-3.2-1B-Instruct-GGUF:Q4_K_M") prompt: str = Field(..., description="Text prompt to generate from", example="Explain quantum computing in simple terms") stream: bool = Field(False, description="Whether to stream the response") options: Optional[Dict[str, Any]] = Field(None, description="Additional model options") template: Optional[str] = Field(None, description="Prompt template to use") context: Optional[List[int]] = Field(None, description="Context from previous conversation") raw: Optional[bool] = Field(None, description="Return raw response without formatting") class ChatMessage(BaseModel): role: str = Field(..., description="Role of the message", example="user") content: str = Field(..., description="Content of the message", example="Hello, how are you?") class ChatRequest(BaseModel): model: str = Field(..., description="Model name to use", example="hf.co/bartowski/Llama-3.2-1B-Instruct-GGUF:Q4_K_M") messages: List[ChatMessage] = Field(..., description="Array of chat messages") stream: bool = Field(False, description="Whether to stream the response") options: Optional[Dict[str, Any]] = Field(None, description="Additional model options") class PullRequest(BaseModel): name: str = Field(..., description="Model name to pull", example="hf.co/bartowski/Llama-3.2-1B-Instruct-GGUF:Q4_K_M") insecure: Optional[bool] = Field(False, description="Allow insecure connections") stream: Optional[bool] = Field(True, description="Stream pull progress") class DeleteRequest(BaseModel): name: str = Field(..., description="Model name to delete", example="hf.co/bartowski/Llama-3.2-1B-Instruct-GGUF:Q4_K_M") # ============================================================================= # RESPONSE MODELS # ============================================================================= class GenerateResponse(BaseModel): model: str created_at: str response: str done: bool context: Optional[List[int]] = None total_duration: Optional[int] = None load_duration: Optional[int] = None prompt_eval_count: Optional[int] = None prompt_eval_duration: Optional[int] = None eval_count: Optional[int] = None eval_duration: Optional[int] = None class ChatResponse(BaseModel): model: str created_at: str message: ChatMessage done: bool total_duration: Optional[int] = None load_duration: Optional[int] = None prompt_eval_count: Optional[int] = None prompt_eval_duration: Optional[int] = None eval_count: Optional[int] = None eval_duration: Optional[int] = None @app.get("/") async def root(): """Root endpoint with API information""" return { "message": "Authenticated Ollama API Server with Admin Panel", "status": "running", "version": "2.0.0", "features": [ "Bearer Token Authentication", "Admin Key Management", "Llama 3.2 1B Model", "Complete Ollama API Wrapper" ], "endpoints": { "docs": "/docs", "health": "/health", "api": "/api/*", "admin": "/admin/*" }, "authentication": { "api_operations": "Bearer Token (API Key) required", "admin_operations": "Bearer Token (Admin Key) required" } } @app.get("/health") async def health_check(): """Public health check endpoint (no authentication required)""" try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"http://{settings.ollama_host}/api/tags") ollama_status = "healthy" if response.status_code == 200 else "unhealthy" except Exception: ollama_status = "unreachable" return { "status": "healthy", "timestamp": datetime.now().isoformat(), "services": { "fastapi": "healthy", "ollama": ollama_status, "authentication": "active" } } # ============================================================================= # REGULAR API ENDPOINTS (Require API Key) # ============================================================================= @app.get("/api/tags") async def list_models(token: str = Depends(verify_token)): """List all available models""" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"http://{settings.ollama_host}/api/tags") return response.json() except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Ollama service unavailable: {str(e)}") @app.post("/api/generate", response_model=GenerateResponse) async def generate_text(request: GenerateRequest, token: str = Depends(verify_token)): """Generate text using Ollama models""" try: # Convert Pydantic model to dict body = request.dict(exclude_unset=True) async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"http://{settings.ollama_host}/api/generate", json=body ) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail=response.text) return response.json() except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Ollama service unavailable: {str(e)}") @app.post("/api/chat", response_model=ChatResponse) async def chat(request: ChatRequest, token: str = Depends(verify_token)): """Chat with Ollama models""" try: # Convert Pydantic model to dict body = request.dict(exclude_unset=True) async with httpx.AsyncClient(timeout=300.0) as client: response = await client.post( f"http://{settings.ollama_host}/api/chat", json=body ) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail=response.text) return response.json() except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Ollama service unavailable: {str(e)}") @app.post("/api/pull") async def pull_model(request: PullRequest, token: str = Depends(verify_token)): """Pull a model from Ollama library""" try: # Convert Pydantic model to dict body = request.dict(exclude_unset=True) async with httpx.AsyncClient(timeout=600.0) as client: response = await client.post( f"http://{settings.ollama_host}/api/pull", json=body ) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail=response.text) return response.json() except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Ollama service unavailable: {str(e)}") @app.delete("/api/delete") async def delete_model(request: DeleteRequest, token: str = Depends(verify_token)): """Delete a model""" try: # Convert Pydantic model to dict body = request.dict(exclude_unset=True) async with httpx.AsyncClient(timeout=30.0) as client: response = await client.delete( f"http://{settings.ollama_host}/api/delete", json=body ) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail=response.text) return {"message": f"Model {request.name} deleted successfully"} except httpx.RequestError as e: raise HTTPException(status_code=503, detail=f"Ollama service unavailable: {str(e)}") @app.get("/auth/test") async def test_auth(token: str = Depends(verify_token)): """Test authentication endpoint""" return { "message": "🎉 Authentication successful!", "token_valid": True, "token_type": "api_key", "timestamp": datetime.now().isoformat(), "access_level": "user" } # ============================================================================= # ADMIN ENDPOINTS (Require Admin Key) # ============================================================================= @app.get("/admin/info") async def admin_info(admin_token: str = Depends(verify_admin_token)): """Get admin panel information""" return { "message": "🔧 Admin access granted", "admin_endpoints": [ "GET /admin/api-key - Retrieve current API key", "GET /admin/key-info - Get API key information", "POST /admin/rotate-key - Generate new API key", "GET /admin/logs - View recent access logs", "GET /admin/stats - Get usage statistics" ], "timestamp": datetime.now().isoformat(), "access_level": "administrator" } @app.get("/admin/api-key") async def get_api_key(admin_token: str = Depends(verify_admin_token)): """ 🔑 Retrieve the current API key (Admin only) ⚠️ This endpoint requires admin authentication """ logger.warning(f"🔑 API KEY RETRIEVED via admin endpoint at {datetime.now().isoformat()}") return { "api_key": settings.api_key, "message": "Current API key retrieved successfully", "warning": "🚨 Keep this key secure and do not share it", "expires": "Never (until manually rotated)", "length": len(settings.api_key), "retrieved_at": datetime.now().isoformat() } @app.get("/admin/key-info") async def get_key_info(admin_token: str = Depends(verify_admin_token)): """ Get API key information without exposing the actual key """ key_prefix = settings.api_key[:8] + "..." + settings.api_key[-4:] return { "key_preview": key_prefix, "key_length": len(settings.api_key), "status": "active", "created": "At server startup", "type": "url_safe_base64", "admin_access": "enabled", "last_checked": datetime.now().isoformat() } @app.post("/admin/rotate-key") async def rotate_api_key(admin_token: str = Depends(verify_admin_token)): """ 🔄 Generate a new API key (Admin only) ⚠️ This will invalidate all existing API keys immediately """ old_key_prefix = settings.api_key[:8] + "..." + settings.api_key[-4:] new_key = secrets.token_urlsafe(32) # Update the key old_key = settings.api_key settings.api_key = new_key # Log the rotation with security info logger.warning(f"🔄 API KEY ROTATED: {datetime.now().isoformat()}") logger.warning(f" Old key: {old_key_prefix}") logger.warning(f" New key: {new_key[:8]}...{new_key[-4:]}") return { "message": "✅ API key rotated successfully", "old_key_preview": old_key_prefix, "new_api_key": new_key, "rotated_at": datetime.now().isoformat(), "warning": "🚨 Update all clients with the new key immediately", "action_required": "All existing API tokens are now invalid" } @app.get("/admin/logs") async def get_admin_logs(admin_token: str = Depends(verify_admin_token)): """ 📋 Get recent server logs (Admin only) """ # In a real implementation, you'd read actual log files # This is a simplified example showing what would be logged sample_logs = [ { "timestamp": datetime.now().isoformat(), "level": "INFO", "message": "Admin logs accessed" }, { "timestamp": (datetime.now() - timedelta(minutes=5)).isoformat(), "level": "INFO", "message": "API key authentication successful" }, { "timestamp": (datetime.now() - timedelta(minutes=10)).isoformat(), "level": "WARNING", "message": "Failed authentication attempt detected" } ] startup_info = { "server_start": "Server started successfully", "api_key": f"API Key: {settings.api_key}", "admin_key": f"Admin Key: {settings.admin_key}", "ollama_status": "Ollama service connected" } return { "message": "Recent server activity", "startup_info": startup_info, "recent_logs": sample_logs, "warning": "🚨 These logs contain sensitive authentication information", "retrieved_at": datetime.now().isoformat() } @app.get("/admin/stats") async def get_usage_stats(admin_token: str = Depends(verify_admin_token)): """ 📊 Get usage statistics (Admin only) """ # In a real implementation, you'd track actual usage return { "server_uptime": "Running since startup", "authentication": { "api_key_status": "active", "admin_key_status": "active", "failed_attempts": "Check logs for details" }, "endpoints": { "total_endpoints": 15, "public_endpoints": 2, "authenticated_endpoints": 8, "admin_endpoints": 5 }, "models": { "default_model": "hf.co/bartowski/Llama-3.2-1B-Instruct-GGUF:Q4_K_M", "status": "loaded" }, "generated_at": datetime.now().isoformat() } @app.get("/admin/test") async def test_admin_auth(admin_token: str = Depends(verify_admin_token)): """Test admin authentication""" return { "message": "🔧 Admin authentication successful!", "token_valid": True, "token_type": "admin_key", "timestamp": datetime.now().isoformat(), "access_level": "administrator", "permissions": ["key_management", "logs_access", "stats_viewing"] } if __name__ == "__main__": import uvicorn uvicorn.run( "app:app", host=settings.app_host, port=settings.app_port, reload=False, log_level="info" )