87 lines
4.0 KiB
Python
87 lines
4.0 KiB
Python
import logging
|
|
logger = logging.getLogger(__name__)
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlmodel import Session, select
|
|
from sqlalchemy.orm import selectinload
|
|
from typing import List
|
|
|
|
from ..model.models import *
|
|
from ..services.database import engine, get_session, add_and_refresh
|
|
from ..services.auth import auth_is_admin
|
|
import uuid as gen_uuid
|
|
|
|
aa_router = APIRouter(prefix="/aa", tags=["AccessAuth"])
|
|
|
|
@aa_router.post("/", response_model=AccessAuthorizationResponse)
|
|
def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate, admin: bool = Depends(auth_is_admin)):
|
|
logger.info(f"Creating accessauth with data: {aa}")
|
|
timetables = [Timetable.model_validate(t) for t in aa.timetables]
|
|
db_aa = AccessAuthorizationDB(
|
|
name=aa.name,
|
|
is_active=aa.is_active,
|
|
timetables=timetables
|
|
)
|
|
return add_and_refresh(db, db_aa)
|
|
|
|
@aa_router.get("/", response_model=List[AccessAuthorizationResponse])
|
|
def get_all_accessauths(db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
|
|
return db.exec(
|
|
select(AccessAuthorizationDB)
|
|
.options(selectinload(AccessAuthorizationDB.timetables))
|
|
).all()
|
|
|
|
@aa_router.get("/{aa_id}", response_model=AccessAuthorizationResponse)
|
|
def get_one_accessauth(*, db: Session = Depends(get_session), aa_id: int, admin: bool = Depends(auth_is_admin)):
|
|
db_aa = db.get(AccessAuthorizationDB, aa_id)
|
|
if db_aa is None:
|
|
raise HTTPException(status_code=404, detail="AA not found")
|
|
return db_aa
|
|
|
|
@aa_router.put("/assign/{group_id}/{aa_id}", response_model=GroupResponse)
|
|
def assign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_id: int, admin: bool = Depends(auth_is_admin)):
|
|
db_group = db.get(GroupDB, group_id)
|
|
if db_group is None:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
db_aa = db.get(AccessAuthorizationDB, aa_id)
|
|
if db_aa is None:
|
|
raise HTTPException(status_code=404, detail="AA not found")
|
|
if db_aa in db_group.accessauths:
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="AA already assigned to group")
|
|
db_group.accessauths.append(db_aa)
|
|
return add_and_refresh(db, db_group)
|
|
|
|
@aa_router.put("/unassign/{group_id}/{aa_id}", response_model=GroupResponse)
|
|
def unassign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_id: int, admin: bool = Depends(auth_is_admin)):
|
|
db_group = db.get(GroupDB, group_id)
|
|
if db_group is None:
|
|
raise HTTPException(status_code=404, detail="Group not found")
|
|
db_aa = db.get(AccessAuthorizationDB, aa_id)
|
|
if db_aa is None:
|
|
raise HTTPException(status_code=404, detail="AA not found")
|
|
if db_aa not in db_group.accessauths:
|
|
raise HTTPException(status_code=200, detail="AA not assigned to group")
|
|
db_group.accessauths.remove(db_aa)
|
|
return add_and_refresh(db, db_group)
|
|
|
|
@aa_router.patch("/{aa_id}", response_model=AccessAuthorizationResponse)
|
|
def change_accessauth(*, db: Session = Depends(get_session), aa_id: int, aa: AccessAuthorizationUpdate, admin: bool = Depends(auth_is_admin)):
|
|
db_aa = db.get(AccessAuthorizationDB, aa_id)
|
|
if db_aa is None:
|
|
raise HTTPException(status_code=404, detail="AccessAuthorization not found")
|
|
aa_data = aa.model_dump(exclude_unset=True)
|
|
if "timetables" in aa_data and aa_data["timetables"] is not None:
|
|
db_aa.timetables.clear()
|
|
timetables = [Timetable.model_validate(t) for t in aa_data["timetables"]]
|
|
db_aa.timetables = timetables
|
|
aa_data.pop("timetables")
|
|
db_aa.sqlmodel_update(aa_data)
|
|
return add_and_refresh(db, db_aa)
|
|
|
|
@aa_router.delete("/{aa_id}")
|
|
def delete_accessauth(*, db: Session = Depends(get_session), aa_id: int, admin: bool = Depends(auth_is_admin)):
|
|
db_aa = db.get(AccessAuthorizationDB, aa_id)
|
|
if db_aa is None:
|
|
raise HTTPException(status_code=404, detail="AccessAuthorization not found")
|
|
db.delete(db_aa)
|
|
db.commit()
|
|
return {"message": "AccessAuthorization deleted successfully"} |