agatha
676ececfad
Release version 3.0 Reviewed-on: #4 Co-authored-by: agatha <agatha@juggalol.com> Co-committed-by: agatha <agatha@juggalol.com>
110 lines
3.4 KiB
Python
110 lines
3.4 KiB
Python
from datetime import timedelta, datetime, timezone
|
|
from typing import Annotated
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from passlib.context import CryptContext
|
|
from sqlalchemy.orm import Session
|
|
from starlette import status
|
|
from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer
|
|
from jose import jwt, JWTError
|
|
|
|
from models import User
|
|
from database import SessionLocal
|
|
|
|
router = APIRouter(
|
|
prefix='/auth',
|
|
tags=['auth']
|
|
)
|
|
|
|
SECRET_KEY = '3b004eeae34b43bd05226f210d9bdc2ad99abdd3c52bf32802906085b762ff55'
|
|
ALGORITHM = 'HS256'
|
|
|
|
bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
|
|
oauth2_bearer = OAuth2PasswordBearer(tokenUrl='auth/token')
|
|
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]):
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get('sub')
|
|
user_id: int = payload.get('id')
|
|
role: str = payload.get('role')
|
|
if username is None or user_id is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="1Could not validate credentials")
|
|
return {'username': username, 'user_id': user_id, 'role': role}
|
|
except JWTError as e:
|
|
print(str(e))
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="2Could not validate credentials")
|
|
|
|
|
|
db_dependency = Annotated[Session, Depends(get_db)]
|
|
user_dependency = Annotated[dict, Depends(get_current_user)]
|
|
|
|
|
|
def authenticate_user(username: str, password: str, db):
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
return False
|
|
if not bcrypt_context.verify(password, user.password):
|
|
return False
|
|
return user
|
|
|
|
|
|
def create_access_token(username: str, user_id: int, role: str, expires_delta: timedelta):
|
|
encode = {'sub': username, 'id': user_id, 'role': role}
|
|
expire = datetime.now(timezone.utc) + expires_delta
|
|
encode.update({'exp': expire})
|
|
|
|
return jwt.encode(encode, SECRET_KEY, ALGORITHM)
|
|
|
|
|
|
class CreateUser(BaseModel):
|
|
username: str
|
|
email: str
|
|
password: str
|
|
|
|
|
|
class Token(BaseModel):
|
|
access_token: str
|
|
token_type: str
|
|
|
|
|
|
@router.post('/user/create', status_code=status.HTTP_201_CREATED)
|
|
async def create_user(user: user_dependency, db: db_dependency, data: CreateUser):
|
|
if user is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Authentication failed')
|
|
|
|
if user['role'] != 'admin':
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Not authorized')
|
|
|
|
create_user_model = User(
|
|
username=data.username,
|
|
email=data.email,
|
|
password=bcrypt_context.hash(data.password),
|
|
role='admin'
|
|
)
|
|
|
|
db.add(create_user_model)
|
|
db.commit()
|
|
|
|
|
|
@router.post('/token', status_code=status.HTTP_200_OK, response_model=Token)
|
|
async def get_token(
|
|
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
|
db: db_dependency
|
|
):
|
|
user = authenticate_user(form_data.username, form_data.password, db)
|
|
if not user:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials")
|
|
|
|
token = create_access_token(user.username, user.id, user.role, timedelta(minutes=20))
|
|
return {'access_token': token, 'token_type': 'bearer'}
|