forum-app/backend/routers/auth.py
agatha 2db6e18d5b feat: add authentication for administration (#2)
Reviewed-on: #2
Co-authored-by: agatha <agatha@juggalol.com>
Co-committed-by: agatha <agatha@juggalol.com>
2024-04-06 19:56:48 +00:00

110 lines
3.4 KiB
Python

from datetime import timedelta, datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from starlette import status
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from jose import jwt, JWTError
from models import User
from database import SessionLocal
router = APIRouter(
prefix='/auth',
tags=['auth']
)
SECRET_KEY = '3b004eeae34b43bd05226f210d9bdc2ad99abdd3c52bf32802906085b762ff55'
ALGORITHM = 'HS256'
bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_bearer = OAuth2PasswordBearer(tokenUrl='auth/token')
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
user_id: int = payload.get('id')
role: str = payload.get('role')
if username is None or user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="1Could not validate credentials")
return {'username': username, 'user_id': user_id, 'role': role}
except JWTError as e:
print(str(e))
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="2Could not validate credentials")
db_dependency = Annotated[Session, Depends(get_db)]
user_dependency = Annotated[dict, Depends(get_current_user)]
def authenticate_user(username: str, password: str, db):
user = db.query(User).filter(User.username == username).first()
if not user:
return False
if not bcrypt_context.verify(password, user.password):
return False
return user
def create_access_token(username: str, user_id: int, role: str, expires_delta: timedelta):
encode = {'sub': username, 'id': user_id, 'role': role}
expire = datetime.now(timezone.utc) + expires_delta
encode.update({'exp': expire})
return jwt.encode(encode, SECRET_KEY, ALGORITHM)
class CreateUser(BaseModel):
username: str
email: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
@router.post('/user/create', status_code=status.HTTP_201_CREATED)
async def create_user(user: user_dependency, db: db_dependency, data: CreateUser):
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Authentication failed')
if user['role'] != 'admin':
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Not authorized')
create_user_model = User(
username=data.username,
email=data.email,
password=bcrypt_context.hash(data.password),
role='admin'
)
db.add(create_user_model)
db.commit()
@router.post('/token', status_code=status.HTTP_200_OK, response_model=Token)
async def get_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: db_dependency
):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
token = create_access_token(user.username, user.id, user.role, timedelta(minutes=20))
return {'access_token': token, 'token_type': 'bearer'}