Secure all endpoints behind auth

This commit is contained in:
2026-05-15 22:19:41 +02:00
parent cbc2526c14
commit 5941f38d2a
6 changed files with 33 additions and 20 deletions

View File

@@ -2,13 +2,13 @@
Status: WIP - not prod ready
Dev with `nix develop`
sync python deps with `uv sync`
start dev server `uv run fastapi dev`
Swagger UI @ http://127.0.0.1:8000/docs
start prod server `uv run fastapi run`
Issues:
- `nix run` currently broken
- no auth
- no door state
- no door operations
- card system is dummy until I get hardware

View File

@@ -5,12 +5,13 @@ from typing import List
from ..model.models import *
from ..services.database import engine, get_session, add_and_refresh
from ..services.auth import auth_is_admin
import uuid as gen_uuid
aa_router = APIRouter(prefix="/aa", tags=["AccessAuth"])
@aa_router.post("/", response_model=AccessAuthorizationResponse)
def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate):
def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate, admin: bool = Depends(auth_is_admin)):
print("Creating accessauth with data: ", aa)
timetables = [Timetable.model_validate(t) for t in aa.timetables]
db_aa = AccessAuthorizationDB(
@@ -21,21 +22,21 @@ def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizatio
return add_and_refresh(db, db_aa)
@aa_router.get("/", response_model=List[AccessAuthorizationResponse])
def get_all_accessauths(db: Session = Depends(get_session)):
def get_all_accessauths(db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
return db.exec(
select(AccessAuthorizationDB)
.options(selectinload(AccessAuthorizationDB.timetables))
).all()
@aa_router.get("/{aa_id}", response_model=AccessAuthorizationResponse)
def get_one_accessauth(*, db: Session = Depends(get_session), aa_id: int):
def get_one_accessauth(*, db: Session = Depends(get_session), aa_id: int, admin: bool = Depends(auth_is_admin)):
db_aa = db.get(AccessAuthorizationDB, aa_id)
if db_aa is None:
raise HTTPException(status_code=404, detail="AA not found")
return db_aa
@aa_router.put("/assign/{group_id}/{aa_id}", response_model=GroupResponse)
def assign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_id: int):
def assign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_id: int, admin: bool = Depends(auth_is_admin)):
db_group = db.get(GroupDB, group_id)
if db_group is None:
raise HTTPException(status_code=404, detail="Group not found")
@@ -48,7 +49,7 @@ def assign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_i
return add_and_refresh(db, db_group)
@aa_router.put("/unassign/{group_id}/{aa_id}", response_model=GroupResponse)
def unassign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_id: int):
def unassign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_id: int, admin: bool = Depends(auth_is_admin)):
db_group = db.get(GroupDB, group_id)
if db_group is None:
raise HTTPException(status_code=404, detail="Group not found")
@@ -61,7 +62,7 @@ def unassign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa
return add_and_refresh(db, db_group)
@aa_router.patch("/{aa_id}", response_model=AccessAuthorizationResponse)
def change_accessauth(*, db: Session = Depends(get_session), aa_id: int, aa: AccessAuthorizationUpdate):
def change_accessauth(*, db: Session = Depends(get_session), aa_id: int, aa: AccessAuthorizationUpdate, admin: bool = Depends(auth_is_admin)):
db_aa = db.get(AccessAuthorizationDB, aa_id)
if db_aa is None:
raise HTTPException(status_code=404, detail="AccessAuthorization not found")
@@ -70,7 +71,7 @@ def change_accessauth(*, db: Session = Depends(get_session), aa_id: int, aa: Acc
return add_and_refresh(db, db_aa)
@aa_router.delete("/{aa_id}")
def delete_accessauth(*, db: Session = Depends(get_session), aa_id: int):
def delete_accessauth(*, db: Session = Depends(get_session), aa_id: int, admin: bool = Depends(auth_is_admin)):
db_aa = db.get(AccessAuthorizationDB, aa_id)
if db_aa is None:
raise HTTPException(status_code=404, detail="AccessAuthorization not found")

View File

@@ -4,6 +4,7 @@ from typing import List
from ..model.models import Card
from ..services.database import engine, get_session, add_and_refresh
from ..services.auth import auth_is_admin
import uuid as gen_uuid
card_router = APIRouter(prefix="/cards", tags=["Card"])
@@ -14,12 +15,12 @@ def register_card(group_id: int):
return card
@card_router.post("/{group_id}", response_model=Card)
def add_card(*, db: Session = Depends(get_session), group_id: int):
def add_card(*, db: Session = Depends(get_session), group_id: int, admin: bool = Depends(auth_is_admin)):
card = register_card(group_id)
return add_and_refresh(db, card)
@card_router.delete("/{card_id}")
def del_card(*, db: Session = Depends(get_session), card_id: int):
def del_card(*, db: Session = Depends(get_session), card_id: int, admin: bool = Depends(auth_is_admin)):
card = db.get(Card, card_id)
if card is None:
raise HTTPException(status_code=404, detail="Card not found")
@@ -28,7 +29,7 @@ def del_card(*, db: Session = Depends(get_session), card_id: int):
return {"message": "Card deleted successfully"}
##TBH not a big fan of having creation using group_id but deletion using card_id
@card_router.get("/{group_id}", response_model=List[Card])
def get_cards(*, db: Session = Depends(get_session), group_id: int):
def get_cards(*, db: Session = Depends(get_session), group_id: int, admin: bool = Depends(auth_is_admin)):
cards = db.exec(select(Card).where(Card.group_id == group_id)).all()
return cards

View File

@@ -4,21 +4,22 @@ from typing import List
from ..model.models import GroupDB, GroupResponse, GroupCreate
from ..services.database import engine, get_session, add_and_refresh
from ..services.auth import auth_is_admin
group_router = APIRouter(prefix="/groups", tags=["Group"])
@group_router.get("/", response_model=List[GroupResponse])
def get_groups(*, db: Session = Depends(get_session)):
def get_groups(*, db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
groups = db.exec(select(GroupDB)).all()
return groups
@group_router.post("/", response_model=GroupResponse)
def create_group(*, db: Session = Depends(get_session), group: GroupCreate):
def create_group(*, db: Session = Depends(get_session), group: GroupCreate, admin: bool = Depends(auth_is_admin)):
db_group = GroupDB.model_validate(group)
return add_and_refresh(db, db_group)
@group_router.delete("/{group_id}")
def delete_group(*, db: Session = Depends(get_session), group_id: int):
def delete_group(*, db: Session = Depends(get_session), group_id: int, admin: bool = Depends(auth_is_admin)):
db_group = db.get(GroupDB, group_id)
if db_group is None:
raise HTTPException(status_code=404, detail="Group not found")

View File

@@ -4,31 +4,31 @@ from typing import List
from ..model.models import UserResponse, UserCreate, UserDB, UserUpdate
from ..services.database import engine, get_session, add_and_refresh
from ..services.auth import get_password_hash, get_current_user
from ..services.auth import get_password_hash, get_current_user, auth_is_admin
user_router = APIRouter(tags=["Users"])
@user_router.post("/users/", response_model=UserResponse)
def create_user(*, db: Session = Depends(get_session), user: UserCreate):
def create_user(*, db: Session = Depends(get_session), user: UserCreate, admin: bool = Depends(auth_is_admin)):
print("creating user with data ", user)
hashed_password = {"passwordhash": get_password_hash(user.password)}
db_user = UserDB.model_validate(user, update=hashed_password)
return add_and_refresh(db, db_user)
@user_router.get("/users/", response_model=List[UserResponse])
def read_users(*, db: Session = Depends(get_session)):
def read_users(*, db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
users = db.exec(select(UserDB)).all()
return users
@user_router.get("/users/{user_id}", response_model=UserResponse)
def read_user(*, db: Session = Depends(get_session), user_id: int):
def read_user(*, db: Session = Depends(get_session), user_id: int, admin: bool = Depends(auth_is_admin)):
db_user = db.get(UserDB, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
return db_user
@user_router.patch("/users/{user_id}", response_model=UserResponse)
def update_user(*, db: Session = Depends(get_session), user_id: int, user: UserUpdate):
def update_user(*, db: Session = Depends(get_session), user_id: int, user: UserUpdate, admin: bool = Depends(auth_is_admin)):
db_user = db.get(UserDB, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")
@@ -41,7 +41,7 @@ def update_user(*, db: Session = Depends(get_session), user_id: int, user: UserU
return add_and_refresh(db, db_user)
@user_router.delete("/users/{user_id}")
def delete_user(*, db: Session = Depends(get_session), user_id: int):
def delete_user(*, db: Session = Depends(get_session), user_id: int, admin: bool = Depends(auth_is_admin)):
db_user = db.get(UserDB, user_id)
if db_user is None:
raise HTTPException(status_code=404, detail="User not found")

View File

@@ -70,6 +70,16 @@ def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
raise credentials_exception
return user
def auth_is_admin(token: str = Depends(oauth2_scheme)):
user = get_current_user(token=token)
if not user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to perform this action",
headers={"WWW-Authenticate": "Bearer"}
)
return True
def create_first_user():
print("Checking for admin user")
with Session(engine) as db: