File size: 3,213 Bytes
293ab16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from passlib.context import CryptContext

# === πŸ›‘οΈ Auth Configuration ===
USE_RSA = False  # Set to True to use RSA keys (RS256). False for HS256.

# --- HS256 Setup (default for local/testing) ---
SECRET_KEY = "your-super-secret-key"  # πŸ›‘ Change this in production
ALGORITHM = "HS256"

# --- RS256 Setup (only used if USE_RSA=True) ---
RSA_PRIVATE_KEY = """
-----BEGIN RSA PRIVATE KEY-----
YOUR_RSA_PRIVATE_KEY_HERE
-----END RSA PRIVATE KEY-----
""".strip()

RSA_PUBLIC_KEY = """
-----BEGIN PUBLIC KEY-----
YOUR_RSA_PUBLIC_KEY_HERE
-----END PUBLIC KEY-----
""".strip()

ACCESS_TOKEN_EXPIRE_MINUTES = 60

# === πŸ§‚ Password Hashing ===
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

# === πŸ§‘ Users ===
USERS_DB = {
    "admin": {
        "username": "admin",
        "full_name": "Administrator",
        "hashed_password": pwd_context.hash("admin123"),
        "disabled": False,
    },
    "testuser": {
        "username": "testuser",
        "full_name": "Test User",
        "hashed_password": pwd_context.hash("testpass"),
        "disabled": False,
    }
}

# === πŸ” Authentication ===
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")

def authenticate_user(username: str, password: str) -> Optional[dict]:
    user = USERS_DB.get(username)
    if not user or not verify_password(password, user["hashed_password"]):
        return None
    return user

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    to_encode.update({"exp": expire})
    if USE_RSA:
        return jwt.encode(to_encode, RSA_PRIVATE_KEY, algorithm="RS256")
    return jwt.encode(to_encode, SECRET_KEY, algorithm="HS256")

def decode_access_token(token: str) -> dict:
    try:
        if USE_RSA:
            return jwt.decode(token, RSA_PUBLIC_KEY, algorithms=["RS256"])
        return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid or expired token",
            headers={"WWW-Authenticate": "Bearer"},
        )
def verify_token(token: str) -> dict:
    """
    Decode and verify a JWT token.
    """
    return decode_access_token(token)

def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
    payload = decode_access_token(token)
    username: str = payload.get("sub")
    if not username or username not in USERS_DB:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return USERS_DB[username]