from datetime import timedelta, datetime from typing import Annotated from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from passlib.context import CryptContext from sqlalchemy.orm import Session from starlette import status from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer from jose import jwt, JWTError from models import User from database import SessionLocal router = APIRouter() SECRET_KEY = '3b004eeae34b43bd05226f210d9bdc2ad99abdd3c52bf32802906085b762ff55' ALGORITHM = 'HS256' bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto') oauth2_bearer = OAuth2PasswordBearer(tokenUrl='validate') def get_db(): db = SessionLocal() try: yield db finally: db.close() db_dependency = Annotated[Session, Depends(get_db)] def authenticate_user(username: str, password: str, db): user = db.query(User).filter(User.username == username).first() if not user: return False if not bcrypt_context.verify(password, user.password): return False return user def create_access_token(username: str, user_id: int, expires_delta: timedelta): encode = {'sub': username, 'id': user_id} expire = datetime.now() + expires_delta encode.update({'exp': expire}) return jwt.encode(encode, SECRET_KEY, ALGORITHM) class CreateUser(BaseModel): username: str email: str password: str class Token(BaseModel): access_token: str token_type: str async def get_current_user(token: Annotated[str, Depends(oauth2_bearer)]): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get('sub') user_id: int = payload.get('id') if username is None or user_id is None: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials") return {'username': username, 'user_id': user_id} except JWTError: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials") @router.post('/auth/create', status_code=status.HTTP_201_CREATED) async def create_user(db: db_dependency, data: CreateUser): create_user_model = User( username=data.username, email=data.email, password=bcrypt_context.hash(data.password), role='admin' ) db.add(create_user_model) db.commit() @router.post('/auth/token', status_code=status.HTTP_200_OK, response_model=Token) async def get_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: db_dependency ): user = authenticate_user(form_data.username, form_data.password, db) if not user: return "authentication failed" token = create_access_token(user.username, user.id, timedelta(minutes=20)) return {'access_token': token, 'token_type': 'bearer'}