feat: add authentication for administration #2
@ -5,7 +5,7 @@ from sqlalchemy import Column, Integer, String, ForeignKey
|
||||
class Post(Base):
|
||||
__tablename__ = 'posts'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
thread_id = Column(Integer, ForeignKey("threads.id"))
|
||||
author = Column(String)
|
||||
title = Column(String)
|
||||
@ -15,7 +15,17 @@ class Post(Base):
|
||||
class Thread(Base):
|
||||
__tablename__ = 'threads'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
author = Column(String)
|
||||
title = Column(String)
|
||||
content = Column(String)
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = 'users'
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
username = Column(String, unique=True)
|
||||
email = Column(String, unique=True)
|
||||
password = Column(String)
|
||||
role = Column(String)
|
||||
|
@ -1,3 +1,7 @@
|
||||
fastapi
|
||||
uvicorn[standard]
|
||||
sqlalchemy
|
||||
passlib[bcrypt]
|
||||
pydantic
|
||||
starlette
|
||||
python-multipart
|
@ -1,8 +1,65 @@
|
||||
from fastapi import APIRouter
|
||||
from typing import Annotated
|
||||
from fastapi import APIRouter, Depends
|
||||
from pydantic import BaseModel
|
||||
from passlib.context import CryptContext
|
||||
from sqlalchemy.orm import Session
|
||||
from starlette import status
|
||||
from fastapi.security import OAuth2PasswordRequestForm
|
||||
|
||||
from models import User
|
||||
from database import SessionLocal
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto')
|
||||
|
||||
@router.get('/auth/')
|
||||
async def get_user():
|
||||
return {'user': 'authenticated'}
|
||||
|
||||
def get_db():
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
db_dependency = Annotated[Session, Depends(get_db)]
|
||||
|
||||
|
||||
def authenticate_user(username: str, password: str, db):
|
||||
user = db.query(User).filter(User.username == username).first()
|
||||
if not user:
|
||||
return False
|
||||
if not bcrypt_context.verify(password, user.password):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class CreateUser(BaseModel):
|
||||
username: str
|
||||
email: str
|
||||
password: str
|
||||
|
||||
|
||||
@router.post('/auth/create', status_code=status.HTTP_201_CREATED)
|
||||
async def create_user(db: db_dependency, data: CreateUser):
|
||||
create_user_model = User(
|
||||
username=data.username,
|
||||
email=data.email,
|
||||
password=bcrypt_context.hash(data.password),
|
||||
role='admin'
|
||||
)
|
||||
|
||||
db.add(create_user_model)
|
||||
db.commit()
|
||||
|
||||
|
||||
@router.post('/auth/token', status_code=status.HTTP_200_OK)
|
||||
async def get_token(
|
||||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
|
||||
db: db_dependency
|
||||
):
|
||||
user = authenticate_user(form_data.username, form_data.password, db)
|
||||
if user:
|
||||
return "you good fam"
|
||||
|
||||
return "failed authentication"
|
||||
|
Loading…
Reference in New Issue
Block a user