from datetime import timedelta, datetime, timezone from typing import Annotated from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from passlib.context import CryptContext from sqlalchemy.orm import Session from starlette import status from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer from jose import jwt, JWTError from models import User from database import SessionLocal router = APIRouter( prefix='/auth', tags=['auth'] ) SECRET_KEY = '3b004eeae34b43bd05226f210d9bdc2ad99abdd3c52bf32802906085b762ff55' ALGORITHM = 'HS256' bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto') oauth2_bearer = OAuth2PasswordBearer(tokenUrl='auth/token') def get_db(): db = SessionLocal() try: yield db finally: db.close() async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get('sub') user_id: int = payload.get('id') role: str = payload.get('role') if username is None or user_id is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="1Could not validate credentials") return {'username': username, 'user_id': user_id, 'role': role} except JWTError as e: print(str(e)) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="2Could not validate credentials") db_dependency = Annotated[Session, Depends(get_db)] user_dependency = Annotated[dict, Depends(get_current_user)] def authenticate_user(username: str, password: str, db): user = db.query(User).filter(User.username == username).first() if not user: return False if not bcrypt_context.verify(password, user.password): return False return user def create_access_token(username: str, user_id: int, role: str, expires_delta: timedelta): encode = {'sub': username, 'id': user_id, 'role': role} expire = datetime.now(timezone.utc) + expires_delta encode.update({'exp': expire}) return jwt.encode(encode, SECRET_KEY, ALGORITHM) class CreateUser(BaseModel): username: str email: str password: str class Token(BaseModel): access_token: str token_type: str @router.post('/user/create', status_code=status.HTTP_201_CREATED) async def create_user(user: user_dependency, db: db_dependency, data: CreateUser): if user is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Authentication failed') if user['role'] != 'admin': raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Not authorized') create_user_model = User( username=data.username, email=data.email, password=bcrypt_context.hash(data.password), role='admin' ) db.add(create_user_model) db.commit() @router.post('/token', status_code=status.HTTP_200_OK, response_model=Token) async def get_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency ): user = authenticate_user(form_data.username, form_data.password, db) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials") token = create_access_token(user.username, user.id, user.role, timedelta(minutes=20)) return {'access_token': token, 'token_type': 'bearer'}