forum-app/backend/routers/auth.py
2024-04-06 15:36:48 -04:00

102 lines
2.9 KiB
Python

from datetime import timedelta, datetime
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from starlette import status
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
from jose import jwt, JWTError
from models import User
from database import SessionLocal
router = APIRouter(
prefix='/auth',
tags=['auth']
)
SECRET_KEY = '3b004eeae34b43bd05226f210d9bdc2ad99abdd3c52bf32802906085b762ff55'
ALGORITHM = 'HS256'
bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
oauth2_bearer = OAuth2PasswordBearer(tokenUrl='auth/token')
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
db_dependency = Annotated[Session, Depends(get_db)]
def authenticate_user(username: str, password: str, db):
user = db.query(User).filter(User.username == username).first()
if not user:
return False
if not bcrypt_context.verify(password, user.password):
return False
return user
def create_access_token(username: str, user_id: int, expires_delta: timedelta):
encode = {'sub': username, 'id': user_id}
expire = datetime.now() + expires_delta
encode.update({'exp': expire})
return jwt.encode(encode, SECRET_KEY, ALGORITHM)
class CreateUser(BaseModel):
username: str
email: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
user_id: int = payload.get('id')
if username is None or user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
return {'username': username, 'user_id': user_id}
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
@router.post('/user/create', status_code=status.HTTP_201_CREATED)
async def create_user(db: db_dependency, data: CreateUser):
create_user_model = User(
username=data.username,
email=data.email,
password=bcrypt_context.hash(data.password),
role='admin'
)
db.add(create_user_model)
db.commit()
@router.post('/token', status_code=status.HTTP_200_OK, response_model=Token)
async def get_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: db_dependency
):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
token = create_access_token(user.username, user.id, timedelta(minutes=20))
return {'access_token': token, 'token_type': 'bearer'}