Implement auth

I basically copied this article https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt
This commit is contained in:
2026-05-08 13:17:32 +02:00
parent 6243618abb
commit 6ad50df3c2
4 changed files with 116 additions and 2 deletions

View File

@@ -1,6 +1,10 @@
from fastapi import FastAPI
from fastapi.security import OAuth2PasswordBearer
from .controllers import userManager, cardManager, groupManager, aaManager
from .services.database import create_db_and_tables
from .services.auth import token_router
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
@app.on_event("startup")
@@ -8,6 +12,8 @@ def on_startup():
create_db_and_tables()
print("Database created and tables initialized.")
app.include_router(token_router)
app.include_router(userManager.user_router)
app.include_router(groupManager.group_router)
app.include_router(cardManager.card_router)

View File

@@ -18,7 +18,7 @@ class UserCreate(UserBase):
class UserDB(UserBase, table=True):
id: int | None = Field(default=None, primary_key=True)
password: str
passwordhash: str
class UserUpdate(Base):
name: str | None = None
@@ -31,6 +31,14 @@ class AaGroupLink(Base, table=True):
group_id: int | None = Field(default=None, foreign_key="groupdb.id", primary_key=True)
accessauth_id: int | None = Field(default=None, foreign_key="accessauthorizationdb.id", primary_key=True)
#### Token
class Token(Base):
access_token: str
token_type: str
class TokenData(Base):
username: str | None = None
#### Group
class GroupBase(Base):
name: str = Field(index=True, unique=True)

93
app/services/auth.py Normal file
View File

@@ -0,0 +1,93 @@
from typing import Annotated
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlmodel import Session
from pwdlib import PasswordHash
import jwt
from jwt.exceptions import InvalidTokenError
from ..model.models import UserDB, Token, TokenData
from ..services.database import engine, get_session
SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
token_router = APIRouter(tags=["Token"])
password_hash = PasswordHash.recommended()
def verify_password(plain_password, hashed_password):
return password_hash.verify(plain_password, hashed_password)
def get_password_hash(password):
return password_hash.hash(password)
def get_user(db, username: str):
user = db.get(UserDB, username)
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Username not found in get_user, this shouldn't happen")
return user(**user.dict())
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.passwordhash):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.tzname("CET")) + expires_delta
else:
expire = datetime.now(timezone.tzname("CET")) + expires_delta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
@token_router.post("/token")
def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Session = Depends(get_session)
) -> Token:
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or pw",
headers={"WWW-Authenticate": "Bearer"}
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@token_router.get("/test/login")
def test_login(
current_user: Annotated[UserDB, Depends(get_current_user)]
) -> UserDB:
return current_user

View File

@@ -42,7 +42,14 @@ Plan:
Erstelle gruppe
Erstelle AA mit Timeslot
Registriere karte -> gruppe
CRUD
create(post)
read(get)[all items]
read(get)[single item]
update(put/patch)
delete(delete)
zeitplan
Hier bin ich nicht sicher, ich denke an cron style für wiederholende dinge aber das kann nur zeitpunkte und keine blöcke.
Villeicht ne liste von cron zeiten [ "0 16 * * 2" "0 18 * * 2" ](Wäre dienstag 16-18 uhr) - Ist aber warscheinlich schwer zu parsen