Files
gatekeeper/app/services/auth.py
ahtlon cbc2526c14 Squashed commit of the following:
commit a6a5de4a35
Author: ahtlon <git@ahtlon.de>
Date:   Fri May 15 20:47:45 2026 +0200

    Auth working :)

commit 6ad50df3c2
Author: ahtlon <git@ahtlon.de>
Date:   Fri May 8 13:17:32 2026 +0200

    Implement auth
    I basically copied this article https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt

commit 6243618abb
Author: ahtlon <git@ahtlon.de>
Date:   Fri May 8 12:18:56 2026 +0200

    Add auth deps
2026-05-15 22:01:57 +02:00

111 lines
4.1 KiB
Python

from typing import Annotated
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlmodel import Session, select
from pwdlib import PasswordHash
import jwt
from jwt.exceptions import InvalidTokenError
from ..model.models import UserDB, Token, TokenData, UserCreate
from ..services.database import *
import secrets, string
SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
token_router = APIRouter(tags=["Token"])
password_hash = PasswordHash.recommended()
def verify_password(plain_password, hashed_password):
return password_hash.verify(plain_password, hashed_password)
def get_password_hash(password):
return password_hash.hash(password)
def get_user(db, username: str):
user = db.exec(select(UserDB).where(UserDB.name == username)).first()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Username not found in get_user, this shouldn't happen")
return user
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.passwordhash):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + expires_delta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
with Session(engine) as db:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
def create_first_user():
print("Checking for admin user")
with Session(engine) as db:
admin_user = db.exec(select(UserDB)).first()
if admin_user is None:
password = ''.join(secrets.choice(string.digits) for i in range(8))
print("Creating first admin user with password", password)
user = UserDB(
name="admin",
passwordhash=get_password_hash(password),
is_admin=True
)
return add_and_refresh(db, user)
print(f"Admin user already exists: {admin_user.name}")
@token_router.post("/token")
def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Session = Depends(get_session)
) -> Token:
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or pw",
headers={"WWW-Authenticate": "Bearer"}
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.name}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@token_router.get("/test/login")
def test_login(
current_user: Annotated[UserDB, Depends(get_current_user)]
) -> UserDB:
return current_user