llama-models / app /auth.py
deniskiplimo816's picture
Upload 27 files
293ab16 verified
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
# === πŸ›‘οΈ Auth Configuration ===
USE_RSA = False # Set to True to use RSA keys (RS256). False for HS256.
# --- HS256 Setup (default for local/testing) ---
SECRET_KEY = "your-super-secret-key" # πŸ›‘ Change this in production
ALGORITHM = "HS256"
# --- RS256 Setup (only used if USE_RSA=True) ---
RSA_PRIVATE_KEY = """
-----BEGIN RSA PRIVATE KEY-----
YOUR_RSA_PRIVATE_KEY_HERE
-----END RSA PRIVATE KEY-----
""".strip()
RSA_PUBLIC_KEY = """
-----BEGIN PUBLIC KEY-----
YOUR_RSA_PUBLIC_KEY_HERE
-----END PUBLIC KEY-----
""".strip()
ACCESS_TOKEN_EXPIRE_MINUTES = 60
# === πŸ§‚ Password Hashing ===
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# === πŸ§‘ Users ===
USERS_DB = {
"admin": {
"username": "admin",
"full_name": "Administrator",
"hashed_password": pwd_context.hash("admin123"),
"disabled": False,
},
"testuser": {
"username": "testuser",
"full_name": "Test User",
"hashed_password": pwd_context.hash("testpass"),
"disabled": False,
}
}
# === πŸ” Authentication ===
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
def authenticate_user(username: str, password: str) -> Optional[dict]:
user = USERS_DB.get(username)
if not user or not verify_password(password, user["hashed_password"]):
return None
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
if USE_RSA:
return jwt.encode(to_encode, RSA_PRIVATE_KEY, algorithm="RS256")
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
def decode_access_token(token: str) -> dict:
try:
if USE_RSA:
return jwt.decode(token, RSA_PUBLIC_KEY, algorithms=["RS256"])
return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
def verify_token(token: str) -> dict:
"""
Decode and verify a JWT token.
"""
return decode_access_token(token)
def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
payload = decode_access_token(token)
username: str = payload.get("sub")
if not username or username not in USERS_DB:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return USERS_DB[username]