Fix get_current_user and auth_is_admin creating their own db session instead of getting from get_session

This commit is contained in:
2026-05-16 17:53:42 +02:00
parent 6daf2345be
commit 46e883200e
2 changed files with 27 additions and 22 deletions

View File

@@ -48,28 +48,33 @@ def create_access_token(data: dict, expires_delta: timedelta | None = None):
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
with Session(engine) as db:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: Session = Depends(get_session),
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if username is None:
raise credentials_exception
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
def auth_is_admin(token: str = Depends(oauth2_scheme)):
user = get_current_user(token=token)
def auth_is_admin(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_session),
):
user = get_current_user(token=token, db=db)
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,

View File

@@ -117,7 +117,7 @@ def test_auth_is_admin(db_session, admin_user, regular_user):
admin_token = create_access_token(data={"sub": admin_user.name})
# Admin should pass
result = auth_is_admin(token=admin_token)
result = auth_is_admin(token=admin_token, db=db_session)
assert result is True
# Create token for regular user
@@ -125,7 +125,7 @@ def test_auth_is_admin(db_session, admin_user, regular_user):
# Regular user should fail
with pytest.raises(HTTPException) as exc_info:
auth_is_admin(token=user_token)
auth_is_admin(token=user_token, db=db_session)
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN