Spaces:
Sleeping
Sleeping
File size: 3,213 Bytes
293ab16 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext
# === π‘οΈ Auth Configuration ===
USE_RSA = False # Set to True to use RSA keys (RS256). False for HS256.
# --- HS256 Setup (default for local/testing) ---
SECRET_KEY = "your-super-secret-key" # π Change this in production
ALGORITHM = "HS256"
# --- RS256 Setup (only used if USE_RSA=True) ---
RSA_PRIVATE_KEY = """
-----BEGIN RSA PRIVATE KEY-----
YOUR_RSA_PRIVATE_KEY_HERE
-----END RSA PRIVATE KEY-----
""".strip()
RSA_PUBLIC_KEY = """
-----BEGIN PUBLIC KEY-----
YOUR_RSA_PUBLIC_KEY_HERE
-----END PUBLIC KEY-----
""".strip()
ACCESS_TOKEN_EXPIRE_MINUTES = 60
# === π§ Password Hashing ===
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# === π§ Users ===
USERS_DB = {
"admin": {
"username": "admin",
"full_name": "Administrator",
"hashed_password": pwd_context.hash("admin123"),
"disabled": False,
},
"testuser": {
"username": "testuser",
"full_name": "Test User",
"hashed_password": pwd_context.hash("testpass"),
"disabled": False,
}
}
# === π Authentication ===
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
def authenticate_user(username: str, password: str) -> Optional[dict]:
user = USERS_DB.get(username)
if not user or not verify_password(password, user["hashed_password"]):
return None
return user
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
if USE_RSA:
return jwt.encode(to_encode, RSA_PRIVATE_KEY, algorithm="RS256")
return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")
def decode_access_token(token: str) -> dict:
try:
if USE_RSA:
return jwt.decode(token, RSA_PUBLIC_KEY, algorithms=["RS256"])
return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
def verify_token(token: str) -> dict:
"""
Decode and verify a JWT token.
"""
return decode_access_token(token)
def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
payload = decode_access_token(token)
username: str = payload.get("sub")
if not username or username not in USERS_DB:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return USERS_DB[username]
|