import secrets from datetime import UTC, datetime, timedelta import jwt from fastapi import HTTPException from app.auth.provider import AuthProvider, Identity _UNAUTHORIZED = HTTPException( status_code=401, detail={"detail": "Unauthorized", "code": "unauthorized"} ) class JWTAuthProvider(AuthProvider): def __init__( self, secret_key: str, expiry_seconds: int, owner_username: str, owner_password: str, ) -> None: self._secret_key = secret_key self._expiry_seconds = expiry_seconds self._owner_username = owner_username self._owner_password = owner_password def create_token(self) -> str: now = datetime.now(tz=UTC) payload = { "sub": "owner", "iat": now, "exp": now + timedelta(seconds=self._expiry_seconds), } return jwt.encode(payload, self._secret_key, algorithm="HS256") def verify_credentials(self, username: str, password: str) -> bool: username_ok = secrets.compare_digest(username, self._owner_username) password_ok = secrets.compare_digest(password, self._owner_password) return username_ok and password_ok async def get_identity(self, authorization: str | None) -> Identity: if not authorization or not authorization.startswith("Bearer "): raise _UNAUTHORIZED token = authorization.removeprefix("Bearer ") try: jwt.decode(token, self._secret_key, algorithms=["HS256"]) except jwt.ExpiredSignatureError: raise _UNAUTHORIZED from None except jwt.InvalidTokenError: raise _UNAUTHORIZED from None return Identity(id="owner", anonymous=False)