from datetime import timedelta, datetime from typing import Annotated from fastapi import APIRouter, Depends from pydantic import BaseModel from passlib.context import CryptContext from sqlalchemy.orm import Session from starlette import status from fastapi.security import OAuth2PasswordRequestForm from jose import jwt from models import User from database import SessionLocal router = APIRouter() SECRET_KEY = '3b004eeae34b43bd05226f210d9bdc2ad99abdd3c52bf32802906085b762ff55' ALGORITHM = 'HS256' bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto') def get_db(): db = SessionLocal() try: yield db finally: db.close() db_dependency = Annotated[Session, Depends(get_db)] def authenticate_user(username: str, password: str, db): user = db.query(User).filter(User.username == username).first() if not user: return False if not bcrypt_context.verify(password, user.password): return False return user def create_access_token(username: str, user_id: int, expires_delta: timedelta): encode = {'sub': username, 'id': user_id} expire = datetime.now() + expires_delta encode.update({'exp': expire}) return jwt.encode(encode, SECRET_KEY, ALGORITHM) class CreateUser(BaseModel): username: str email: str password: str class Token(BaseModel): access_token: str token_type: str @router.post('/auth/create', status_code=status.HTTP_201_CREATED) async def create_user(db: db_dependency, data: CreateUser): create_user_model = User( username=data.username, email=data.email, password=bcrypt_context.hash(data.password), role='admin' ) db.add(create_user_model) db.commit() @router.post('/auth/token', status_code=status.HTTP_200_OK, response_model=Token) async def get_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency ): user = authenticate_user(form_data.username, form_data.password, db) if not user: return "authentication failed" token = create_access_token(user.username, user.id, timedelta(minutes=20)) return {'access_token': token, 'token_type': 'bearer'}