Auth working :)

This commit is contained in:
2026-05-15 20:47:45 +02:00
parent 6ad50df3c2
commit a6a5de4a35
3 changed files with 53 additions and 31 deletions

View File

@@ -4,13 +4,15 @@ from typing import List
from ..model.models import UserResponse, UserCreate, UserDB, UserUpdate
from ..services.database import engine, get_session, add_and_refresh
from ..services.auth import get_password_hash, get_current_user
user_router = APIRouter(tags=["Users"])
@user_router.post("/users/", response_model=UserResponse)
def create_user(*, db: Session = Depends(get_session), user: UserCreate):
print("creating user with data ", user)
db_user = UserDB.model_validate(user)
hashed_password = {"passwordhash": get_password_hash(user.password)}
db_user = UserDB.model_validate(user, update=hashed_password)
return add_and_refresh(db, db_user)
@user_router.get("/users/", response_model=List[UserResponse])
@@ -31,11 +33,12 @@ def update_user(*, db: Session = Depends(get_session), user_id: int, user: UserU
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
user_data = user.model_dump(exclude_unset=True)
db_user.sqlmodel_update(user_data)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
hashed_password = {}
if "password" in user_data:
password = user_data["password"]
hashed_password = {"passwordhash": get_password_hash(password)}
db_user.sqlmodel_update(user_data, update=hashed_password)
return add_and_refresh(db, db_user)
@user_router.delete("/users/{user_id}")
def delete_user(*, db: Session = Depends(get_session), user_id: int):

View File

@@ -2,7 +2,7 @@ from fastapi import FastAPI
from fastapi.security import OAuth2PasswordBearer
from .controllers import userManager, cardManager, groupManager, aaManager
from .services.database import create_db_and_tables
from .services.auth import token_router
from .services.auth import token_router, create_first_user
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@@ -10,6 +10,7 @@ app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
create_first_user()
print("Database created and tables initialized.")

View File

@@ -2,12 +2,13 @@ from typing import Annotated
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException, Depends, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlmodel import Session
from sqlmodel import Session, select
from pwdlib import PasswordHash
import jwt
from jwt.exceptions import InvalidTokenError
from ..model.models import UserDB, Token, TokenData
from ..services.database import engine, get_session
from ..model.models import UserDB, Token, TokenData, UserCreate
from ..services.database import *
import secrets, string
SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing
ALGORITHM = "HS256"
@@ -26,10 +27,10 @@ def get_password_hash(password):
return password_hash.hash(password)
def get_user(db, username: str):
user = db.get(UserDB, username)
user = db.exec(select(UserDB).where(UserDB.name == username)).first()
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Username not found in get_user, this shouldn't happen")
return user(**user.dict())
return user
def authenticate_user(db, username: str, password: str):
user = get_user(db, username)
@@ -42,31 +43,48 @@ def authenticate_user(db, username: str, password: str):
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.tzname("CET")) + expires_delta
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.tzname("CET")) + expires_delta(minutes=15)
expire = datetime.now(timezone.utc) + expires_delta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
with Session(engine) as db:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
def create_first_user():
print("Checking for admin user")
with Session(engine) as db:
admin_user = db.exec(select(UserDB)).first()
if admin_user is None:
password = ''.join(secrets.choice(string.digits) for i in range(8))
print("Creating first admin user with password", password)
user = UserDB(
name="admin",
passwordhash=get_password_hash(password),
is_admin=True
)
return add_and_refresh(db, user)
print(f"Admin user already exists: {admin_user.name}")
@token_router.post("/token")
def login_for_access_token(
@@ -82,7 +100,7 @@ def login_for_access_token(
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
data={"sub": user.name}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")