From 6ad50df3c27e8a8d08c4de3f2e778cb4e1fc1550 Mon Sep 17 00:00:00 2001 From: ahtlon Date: Fri, 8 May 2026 13:17:32 +0200 Subject: [PATCH] Implement auth I basically copied this article https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt --- app/main.py | 6 +++ app/model/models.py | 10 ++++- app/services/auth.py | 93 ++++++++++++++++++++++++++++++++++++++++++++ plan.txt | 9 ++++- 4 files changed, 116 insertions(+), 2 deletions(-) create mode 100644 app/services/auth.py diff --git a/app/main.py b/app/main.py index 81a34a6..2a783dd 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,10 @@ from fastapi import FastAPI +from fastapi.security import OAuth2PasswordBearer from .controllers import userManager, cardManager, groupManager, aaManager from .services.database import create_db_and_tables +from .services.auth import token_router + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") app = FastAPI() @app.on_event("startup") @@ -8,6 +12,8 @@ def on_startup(): create_db_and_tables() print("Database created and tables initialized.") + +app.include_router(token_router) app.include_router(userManager.user_router) app.include_router(groupManager.group_router) app.include_router(cardManager.card_router) diff --git a/app/model/models.py b/app/model/models.py index 9abd820..ea0c80e 100644 --- a/app/model/models.py +++ b/app/model/models.py @@ -18,7 +18,7 @@ class UserCreate(UserBase): class UserDB(UserBase, table=True): id: int | None = Field(default=None, primary_key=True) - password: str + passwordhash: str class UserUpdate(Base): name: str | None = None @@ -31,6 +31,14 @@ class AaGroupLink(Base, table=True): group_id: int | None = Field(default=None, foreign_key="groupdb.id", primary_key=True) accessauth_id: int | None = Field(default=None, foreign_key="accessauthorizationdb.id", primary_key=True) +#### Token +class Token(Base): + access_token: str + token_type: str + +class TokenData(Base): + username: str | None = None + #### Group class GroupBase(Base): name: str = Field(index=True, unique=True) diff --git a/app/services/auth.py b/app/services/auth.py new file mode 100644 index 0000000..89d8663 --- /dev/null +++ b/app/services/auth.py @@ -0,0 +1,93 @@ +from typing import Annotated +from datetime import datetime, timedelta, timezone +from fastapi import APIRouter, HTTPException, Depends, status +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from sqlmodel import Session +from pwdlib import PasswordHash +import jwt +from jwt.exceptions import InvalidTokenError +from ..model.models import UserDB, Token, TokenData +from ..services.database import engine, get_session + +SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +token_router = APIRouter(tags=["Token"]) + +password_hash = PasswordHash.recommended() + +def verify_password(plain_password, hashed_password): + return password_hash.verify(plain_password, hashed_password) + +def get_password_hash(password): + return password_hash.hash(password) + +def get_user(db, username: str): + user = db.get(UserDB, username) + if user is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Username not found in get_user, this shouldn't happen") + return user(**user.dict()) + +def authenticate_user(db, username: str, password: str): + user = get_user(db, username) + if not user: + return False + if not verify_password(password, user.passwordhash): + return False + return user + +def create_access_token(data: dict, expires_delta: timedelta | None = None): + to_encode = data.copy() + if expires_delta: + expire = datetime.now(timezone.tzname("CET")) + expires_delta + else: + expire = datetime.now(timezone.tzname("CET")) + expires_delta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Could not validate credentials", + headers={"WWW-Authenticate": "Bearer"} + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username = payload.get("sub") + if username is None: + raise credentials_exception + token_data = TokenData(username=username) + except InvalidTokenError: + raise credentials_exception + user = get_user(db, username=token_data.username) + if user is None: + raise credentials_exception + return user + +@token_router.post("/token") +def login_for_access_token( + form_data: Annotated[OAuth2PasswordRequestForm, Depends()], + db: Session = Depends(get_session) +) -> Token: + user = authenticate_user(db, form_data.username, form_data.password) + if not user: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Incorrect username or pw", + headers={"WWW-Authenticate": "Bearer"} + ) + access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = create_access_token( + data={"sub": user.username}, expires_delta=access_token_expires + ) + return Token(access_token=access_token, token_type="bearer") + +@token_router.get("/test/login") +def test_login( + current_user: Annotated[UserDB, Depends(get_current_user)] +) -> UserDB: + return current_user \ No newline at end of file diff --git a/plan.txt b/plan.txt index 1b70376..62e3a8a 100644 --- a/plan.txt +++ b/plan.txt @@ -42,7 +42,14 @@ Plan: Erstelle gruppe Erstelle AA mit Timeslot Registriere karte -> gruppe - + + CRUD + create(post) + read(get)[all items] + read(get)[single item] + update(put/patch) + delete(delete) + zeitplan Hier bin ich nicht sicher, ich denke an cron style für wiederholende dinge aber das kann nur zeitpunkte und keine blöcke. Villeicht ne liste von cron zeiten [ "0 16 * * 2" "0 18 * * 2" ](Wäre dienstag 16-18 uhr) - Ist aber warscheinlich schwer zu parsen