Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
9160b312c7
|
|||
|
07669fc1fc
|
|||
|
94d428c9d0
|
|||
|
a4bedba628
|
|||
|
e6a248529b
|
|||
|
713d41e81c
|
|||
|
6ec3bc5f14
|
|||
|
9e6510f465
|
|||
|
5a6cd970dd
|
|||
|
3d1893d84e
|
|||
|
495535a6de
|
|||
|
30b86dad5e
|
|||
|
92eef82e0a
|
@@ -1,4 +1,6 @@
|
|||||||
from fastapi import APIRouter, Depends, HTTPException
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||||||
from sqlmodel import Session, select
|
from sqlmodel import Session, select
|
||||||
from sqlalchemy.orm import selectinload
|
from sqlalchemy.orm import selectinload
|
||||||
from typing import List
|
from typing import List
|
||||||
@@ -12,7 +14,7 @@ aa_router = APIRouter(prefix="/aa", tags=["AccessAuth"])
|
|||||||
|
|
||||||
@aa_router.post("/", response_model=AccessAuthorizationResponse)
|
@aa_router.post("/", response_model=AccessAuthorizationResponse)
|
||||||
def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate, admin: bool = Depends(auth_is_admin)):
|
def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate, admin: bool = Depends(auth_is_admin)):
|
||||||
print("Creating accessauth with data: ", aa)
|
logger.info(f"Creating accessauth with data: {aa}")
|
||||||
timetables = [Timetable.model_validate(t) for t in aa.timetables]
|
timetables = [Timetable.model_validate(t) for t in aa.timetables]
|
||||||
db_aa = AccessAuthorizationDB(
|
db_aa = AccessAuthorizationDB(
|
||||||
name=aa.name,
|
name=aa.name,
|
||||||
@@ -44,7 +46,7 @@ def assign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_i
|
|||||||
if db_aa is None:
|
if db_aa is None:
|
||||||
raise HTTPException(status_code=404, detail="AA not found")
|
raise HTTPException(status_code=404, detail="AA not found")
|
||||||
if db_aa in db_group.accessauths:
|
if db_aa in db_group.accessauths:
|
||||||
raise HTTPException(status_code=200, detail="AA already assigned to group")
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="AA already assigned to group")
|
||||||
db_group.accessauths.append(db_aa)
|
db_group.accessauths.append(db_aa)
|
||||||
return add_and_refresh(db, db_group)
|
return add_and_refresh(db, db_group)
|
||||||
|
|
||||||
@@ -66,7 +68,12 @@ def change_accessauth(*, db: Session = Depends(get_session), aa_id: int, aa: Acc
|
|||||||
db_aa = db.get(AccessAuthorizationDB, aa_id)
|
db_aa = db.get(AccessAuthorizationDB, aa_id)
|
||||||
if db_aa is None:
|
if db_aa is None:
|
||||||
raise HTTPException(status_code=404, detail="AccessAuthorization not found")
|
raise HTTPException(status_code=404, detail="AccessAuthorization not found")
|
||||||
aa_data = aa.dict(exclude_unset=True)
|
aa_data = aa.model_dump(exclude_unset=True)
|
||||||
|
if "timetables" in aa_data and aa_data["timetables"] is not None:
|
||||||
|
db_aa.timetables.clear()
|
||||||
|
timetables = [Timetable.model_validate(t) for t in aa_data["timetables"]]
|
||||||
|
db_aa.timetables = timetables
|
||||||
|
aa_data.pop("timetables")
|
||||||
db_aa.sqlmodel_update(aa_data)
|
db_aa.sqlmodel_update(aa_data)
|
||||||
return add_and_refresh(db, db_aa)
|
return add_and_refresh(db, db_aa)
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
from fastapi import APIRouter, Depends, HTTPException, status
|
from fastapi import APIRouter, Depends, HTTPException, status
|
||||||
from sqlmodel import Session, select
|
from sqlmodel import Session, select
|
||||||
from typing import List
|
from typing import List
|
||||||
|
from sqlalchemy.exc import NoResultFound
|
||||||
|
|
||||||
from ..model.models import Card
|
from ..model.models import Card
|
||||||
from ..services.database import engine, get_session, add_and_refresh
|
from ..services.database import engine, get_session, add_and_refresh
|
||||||
@@ -13,7 +16,7 @@ card_router = APIRouter(prefix="/cards", tags=["Card"])
|
|||||||
def register_card(group_id: int):
|
def register_card(group_id: int):
|
||||||
key = WriteNewCard()
|
key = WriteNewCard()
|
||||||
if key == None:
|
if key == None:
|
||||||
print("No card registered. Check logs!")
|
logger.info("No card registered. Check logs!")
|
||||||
raise HTTPException(status.HTTP_417_EXPECTATION_FAILED, detail="No card registered. Check logs!")
|
raise HTTPException(status.HTTP_417_EXPECTATION_FAILED, detail="No card registered. Check logs!")
|
||||||
card = Card(group_id=group_id, uuid=key)
|
card = Card(group_id=group_id, uuid=key)
|
||||||
return card
|
return card
|
||||||
@@ -26,13 +29,16 @@ def add_card(*, db: Session = Depends(get_session), group_id: int, admin: bool =
|
|||||||
@card_router.get("/delete")
|
@card_router.get("/delete")
|
||||||
def del_card(*, db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
|
def del_card(*, db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
|
||||||
key = DeleteCard()
|
key = DeleteCard()
|
||||||
# card = db.get(Card, card_id)
|
logger.info(key)
|
||||||
# if card is None:
|
try:
|
||||||
# raise HTTPException(status_code=404, detail="Card not found")
|
card = db.exec(select(Card).where(Card.uuid == key)).one()
|
||||||
# db.delete(card)
|
except NoResultFound:
|
||||||
# db.commit()
|
logger.info(f"The key:'{key}' was not found in db!")
|
||||||
# return {"message": "Card deleted successfully"}
|
raise HTTPException(status_code=500, detail="Key on card not found in DB. Please tell an admin about this. KEY={key}")
|
||||||
##TBH not a big fan of having creation using group_id but deletion using card_id
|
db.delete(card)
|
||||||
|
db.commit()
|
||||||
|
return {"message": "Card deleted successfully"}
|
||||||
|
|
||||||
@card_router.get("/{group_id}", response_model=List[Card])
|
@card_router.get("/{group_id}", response_model=List[Card])
|
||||||
def get_cards(*, db: Session = Depends(get_session), group_id: int, admin: bool = Depends(auth_is_admin)):
|
def get_cards(*, db: Session = Depends(get_session), group_id: int, admin: bool = Depends(auth_is_admin)):
|
||||||
cards = db.exec(select(Card).where(Card.group_id == group_id)).all()
|
cards = db.exec(select(Card).where(Card.group_id == group_id)).all()
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
from fastapi import APIRouter, HTTPException, Depends
|
from fastapi import APIRouter, HTTPException, Depends
|
||||||
from sqlmodel import Session, select
|
from sqlmodel import Session, select
|
||||||
from typing import List
|
from typing import List
|
||||||
@@ -10,7 +12,7 @@ user_router = APIRouter(tags=["Users"])
|
|||||||
|
|
||||||
@user_router.post("/users/", response_model=UserResponse)
|
@user_router.post("/users/", response_model=UserResponse)
|
||||||
def create_user(*, db: Session = Depends(get_session), user: UserCreate, admin: bool = Depends(auth_is_admin)):
|
def create_user(*, db: Session = Depends(get_session), user: UserCreate, admin: bool = Depends(auth_is_admin)):
|
||||||
print("creating user with data ", user)
|
logger.info(f"creating user with data: {user}")
|
||||||
hashed_password = {"passwordhash": get_password_hash(user.password)}
|
hashed_password = {"passwordhash": get_password_hash(user.password)}
|
||||||
db_user = UserDB.model_validate(user, update=hashed_password)
|
db_user = UserDB.model_validate(user, update=hashed_password)
|
||||||
return add_and_refresh(db, db_user)
|
return add_and_refresh(db, db_user)
|
||||||
|
|||||||
12
app/main.py
12
app/main.py
@@ -1,22 +1,26 @@
|
|||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
from fastapi.security import OAuth2PasswordBearer
|
from fastapi.security import OAuth2PasswordBearer
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
from .controllers import userManager, cardManager, groupManager, aaManager, doorManager
|
from .controllers import userManager, cardManager, groupManager, aaManager, doorManager
|
||||||
from .services.database import create_db_and_tables, get_session
|
from .services.database import create_db_and_tables, get_db_session
|
||||||
from .services.auth import token_router, create_first_user
|
from .services.auth import token_router, create_first_user
|
||||||
from app.services.scanner import BackgroundScanner
|
from app.services.scanner import BackgroundScanner
|
||||||
|
|
||||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||||
scanner = BackgroundScanner(db=get_session())
|
scanner = BackgroundScanner(db=get_db_session())
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(app: FastAPI):
|
async def lifespan(app: FastAPI):
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
create_db_and_tables()
|
create_db_and_tables()
|
||||||
create_first_user()
|
create_first_user(db=get_db_session())
|
||||||
print("Database created and tables initialized.")
|
logger.info("Database created and tables initialized.")
|
||||||
scanner.start()
|
scanner.start()
|
||||||
yield
|
yield
|
||||||
#scanner.stop()
|
#scanner.stop()
|
||||||
|
|||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
from typing import Annotated
|
from typing import Annotated
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from fastapi import APIRouter, HTTPException, Depends, status
|
from fastapi import APIRouter, HTTPException, Depends, status
|
||||||
@@ -12,7 +14,7 @@ import secrets, string
|
|||||||
|
|
||||||
SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing
|
SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing
|
||||||
ALGORITHM = "HS256"
|
ALGORITHM = "HS256"
|
||||||
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
ACCESS_TOKEN_EXPIRE_MINUTES = 120
|
||||||
|
|
||||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||||
|
|
||||||
@@ -83,20 +85,19 @@ def auth_is_admin(
|
|||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def create_first_user():
|
def create_first_user(db: Session):
|
||||||
print("Checking for admin user")
|
logger.info("Checking for admin user")
|
||||||
with Session(engine) as db:
|
|
||||||
admin_user = db.exec(select(UserDB)).first()
|
admin_user = db.exec(select(UserDB)).first()
|
||||||
if admin_user is None:
|
if admin_user is None:
|
||||||
password = ''.join(secrets.choice(string.digits) for i in range(8))
|
password = ''.join(secrets.choice(string.digits) for i in range(8))
|
||||||
print("Creating first admin user with password", password)
|
logger.info(f"Creating first admin user with password: {password}")
|
||||||
user = UserDB(
|
user = UserDB(
|
||||||
name="admin",
|
name="admin",
|
||||||
passwordhash=get_password_hash(password),
|
passwordhash=get_password_hash(password),
|
||||||
is_admin=True
|
is_admin=True
|
||||||
)
|
)
|
||||||
return add_and_refresh(db, user)
|
return add_and_refresh(db, user)
|
||||||
print(f"Admin user already exists: {admin_user.name}")
|
logger.info(f"Admin user already exists: {admin_user.name}")
|
||||||
|
|
||||||
|
|
||||||
@token_router.post("/token")
|
@token_router.post("/token")
|
||||||
|
|||||||
@@ -13,6 +13,9 @@ def get_session():
|
|||||||
with Session(engine) as db:
|
with Session(engine) as db:
|
||||||
yield db
|
yield db
|
||||||
|
|
||||||
|
def get_db_session():
|
||||||
|
return Session(engine)
|
||||||
|
|
||||||
def add_and_refresh(db: Session, obj):
|
def add_and_refresh(db: Session, obj):
|
||||||
db.add(obj)
|
db.add(obj)
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
|
import logging
|
||||||
import paho.mqtt.client as mqttClient
|
logger = logging.getLogger(__name__)
|
||||||
import paho.mqtt.publish as publish
|
|
||||||
|
|
||||||
from sqlmodel import select
|
from sqlmodel import select
|
||||||
from fastapi import Depends, HTTPException, status
|
from fastapi import Depends, HTTPException, status
|
||||||
from sqlalchemy.orm import selectinload
|
from sqlalchemy.orm import selectinload
|
||||||
@@ -12,43 +10,43 @@ from app.services.database import Session, get_session
|
|||||||
from app.model.models import *
|
from app.model.models import *
|
||||||
|
|
||||||
doorIsOpen = True
|
doorIsOpen = True
|
||||||
client = mqttClient.Client(client_id="", userdata=None, protocol=mqttClient.MQTTv5)
|
|
||||||
client.tls_set(tls_version=mqttClient.ssl.PROTOCOL_TLS)
|
|
||||||
client.username_pw_set("username", "passwort")
|
|
||||||
#client.connect("host", port=8883)
|
|
||||||
# I think this could also be gpio controlled
|
# I think this could also be gpio controlled
|
||||||
#See: https://github.com/technyon/nuki_hub#gpio-lock-control-optional
|
#See: https://github.com/technyon/nuki_hub#gpio-lock-control-optional
|
||||||
|
|
||||||
def openDoor():
|
def openDoor():
|
||||||
|
global doorIsOpen
|
||||||
doorIsOpen = True
|
doorIsOpen = True
|
||||||
publish.single(topic="/lock/action", payload="unlock")
|
logger.info("Still needs gpio out")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def closeDoor():
|
def closeDoor():
|
||||||
|
global doorIsOpen
|
||||||
doorIsOpen = False
|
doorIsOpen = False
|
||||||
publish.single(topic="/lock/action", payload="lock")
|
logger.info("Still needs gpio out")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def isDoorOpen():
|
def isDoorOpen():
|
||||||
return doorIsOpen
|
return doorIsOpen
|
||||||
|
|
||||||
def checkAccess(uuid: str, db: Session = Depends(get_session)):
|
def checkAccess(uuid: str, db: Session):
|
||||||
try:
|
try:
|
||||||
current_weekday = datetime.datetime.weekday(datetime.date.today())
|
current_weekday = datetime.datetime.weekday(datetime.date.today())
|
||||||
current_time = datetime.datetime.now()
|
current_time = datetime.datetime.now()
|
||||||
card = db.exec(select(Card).where(Card.uuid == uuid)).one()
|
card = db.exec(select(Card).where(Card.uuid == uuid)).one()
|
||||||
for auth in card.group.accessauths:
|
for auth in card.group.accessauths:
|
||||||
print(f"checking auth: {auth.name}")
|
logger.info(f"checking auth: {auth.name}")
|
||||||
for timetable in auth.timetables:
|
for timetable in auth.timetables:
|
||||||
print(f" checking timetable {timetable.id}")
|
logger.info(f" checking timetable {timetable.id}")
|
||||||
print(f" comparing weekday: CUR:{current_weekday} TT:{timetable.weekday}")
|
logger.info(f" comparing weekday: CUR:{current_weekday} TT:{timetable.weekday}")
|
||||||
if current_weekday == timetable.weekday:
|
if current_weekday == timetable.weekday:
|
||||||
starttime = datetime.datetime.combine(datetime.date.today(), timetable.starttime)
|
starttime = datetime.datetime.combine(datetime.date.today(), timetable.starttime)
|
||||||
endtime = starttime + datetime.timedelta(minutes=timetable.duration)
|
endtime = starttime + datetime.timedelta(minutes=timetable.duration)
|
||||||
print(f" comparing time: Start:{starttime} Current:{current_time} End:{endtime}")
|
logger.info(f" comparing time: Start:{starttime} Current:{current_time} End:{endtime}")
|
||||||
if starttime < current_time < endtime:
|
if starttime < current_time < endtime:
|
||||||
|
logger.info("Access Valid!")
|
||||||
return True
|
return True
|
||||||
|
logger.info("No more auths found")
|
||||||
return False
|
return False
|
||||||
except exc.NoResultFound:
|
except exc.NoResultFound:
|
||||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
raise Exception("No Access with that key found, this might be a db error")
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import os
|
import os
|
||||||
@@ -18,6 +20,8 @@ from desfire.enums import DESFireCommunicationMode, DESFireFileType, DESFireKeyS
|
|||||||
from desfire.schemas import FilePermissions, FileSettings, KeySettings
|
from desfire.schemas import FilePermissions, FileSettings, KeySettings
|
||||||
import desfire.exceptions as desExceptions
|
import desfire.exceptions as desExceptions
|
||||||
|
|
||||||
|
from app.services.door import openDoor, closeDoor, isDoorOpen, checkAccess
|
||||||
|
|
||||||
|
|
||||||
#ENV vars
|
#ENV vars
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
@@ -32,10 +36,6 @@ MIFARE_ACL_WRITE_BASE_KEY_ID = 0x2
|
|||||||
MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything
|
MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything
|
||||||
MIFARE_ENCRYPTED_FILE_ID = 0x1
|
MIFARE_ENCRYPTED_FILE_ID = 0x1
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def checkForKey():
|
def checkForKey():
|
||||||
if MIFARE_APP_MASTER_KEY == None:
|
if MIFARE_APP_MASTER_KEY == None:
|
||||||
logger.critical("NO MASTER KEY LOADED")
|
logger.critical("NO MASTER KEY LOADED")
|
||||||
@@ -44,34 +44,80 @@ def checkForKey():
|
|||||||
def getCardService(timeout: int = 10):
|
def getCardService(timeout: int = 10):
|
||||||
cardtype = AnyCardType()
|
cardtype = AnyCardType()
|
||||||
cardrequest = CardRequest(timeout=timeout, cardType=cardtype)
|
cardrequest = CardRequest(timeout=timeout, cardType=cardtype)
|
||||||
print("Please present DESfire tag...")
|
|
||||||
try:
|
|
||||||
cardservice = cardrequest.waitforcard()
|
cardservice = cardrequest.waitforcard()
|
||||||
except CardRequestTimeoutException:
|
cardservice.connection.connect()
|
||||||
logger.error("No tag detected within the timeout.")
|
|
||||||
raise Exception
|
|
||||||
return cardservice
|
return cardservice
|
||||||
|
|
||||||
|
def readFileOnCard(desfire: DESFire):
|
||||||
|
#create keys
|
||||||
|
#desfire = DESFire(PCSCDevice(cardservice.connection.component))
|
||||||
|
aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
|
||||||
|
keysettings = desfire.get_key_setting()
|
||||||
|
desKey = DESFireKey(keysettings, "00" * 8)
|
||||||
|
|
||||||
|
# Get real UID
|
||||||
|
desfire.authenticate(0x0, desKey)
|
||||||
|
#To get the uid you have to auth with an empty (default) key
|
||||||
|
uid = desfire.get_real_uid()
|
||||||
|
applications = desfire.get_application_ids()
|
||||||
|
try:
|
||||||
|
assert len(applications) == 1
|
||||||
|
assert applications[0] == get_list(MIFARE_APP_ID)
|
||||||
|
except AssertionError:
|
||||||
|
logger.error("No application found!")
|
||||||
|
time.sleep(4)
|
||||||
|
return
|
||||||
|
#Then use the key derivation with that uid, the appid, the sysid
|
||||||
|
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
|
||||||
|
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
|
||||||
|
|
||||||
|
#Log in with derived read key
|
||||||
|
logger.info("Start auth")
|
||||||
|
aes_app_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
|
||||||
|
desfire.select_application(MIFARE_APP_ID)
|
||||||
|
|
||||||
|
desfire.authenticate(MIFARE_ACL_READ_BASE_KEY_ID, aes_app_read_key)
|
||||||
|
|
||||||
|
logger.info("Read data")
|
||||||
|
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
|
||||||
|
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
|
||||||
|
#convert list of int to str
|
||||||
|
rdata = to_hex_string(rdata).replace(" ", "").lower()
|
||||||
|
logger.info(f"Data on card: {rdata}")
|
||||||
|
return rdata
|
||||||
|
|
||||||
def DeleteCard():
|
def DeleteCard():
|
||||||
try:
|
try:
|
||||||
checkForKey()
|
checkForKey()
|
||||||
from app.main import scanner as scannerThread
|
from app.main import scanner as scannerThread
|
||||||
scannerThread.stop()
|
scannerThread.stop()
|
||||||
cardservice = getCardService(15)
|
cardservice = getCardService(15)
|
||||||
cardservice.connection.connect()
|
|
||||||
|
|
||||||
# Create Desfire object
|
# Create Desfire object
|
||||||
desfire = DESFire(PCSCDevice(cardservice.connection.component))
|
desfire = DESFire(PCSCDevice(cardservice.connection.component))
|
||||||
|
|
||||||
|
rdata = readFileOnCard(desfire=desfire)
|
||||||
|
|
||||||
# Create Key objects
|
# Create Key objects
|
||||||
AES_NULL_KEY_DATA = "00" * 8
|
aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
|
||||||
aes_keysettings = KeySettings(
|
des_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_2K3DES)
|
||||||
key_type=DESFireKeyType.DF_KEY_AES,
|
desKey = DESFireKey(des_keysettings, "00" * 8)
|
||||||
)
|
|
||||||
key_settings = desfire.get_key_setting()
|
|
||||||
aes_null_key = DESFireKey(key_settings, AES_NULL_KEY_DATA)
|
|
||||||
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
|
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
|
||||||
|
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
|
||||||
|
|
||||||
|
desfire.select_application(0x0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
logger.info("Auth1")
|
||||||
|
desfire.authenticate(0x0, desKey)
|
||||||
|
except:
|
||||||
|
logger.info("Auth2")
|
||||||
desfire.authenticate(0x0, aes_null_key)
|
desfire.authenticate(0x0, aes_null_key)
|
||||||
|
except:
|
||||||
|
logger.info("Auth3")
|
||||||
|
desfire.authenticate(0x0, aes_master_key)
|
||||||
|
|
||||||
applications = desfire.get_application_ids()
|
applications = desfire.get_application_ids()
|
||||||
logger.debug(f"Applications: {applications}")
|
logger.debug(f"Applications: {applications}")
|
||||||
if len(applications) == 0:
|
if len(applications) == 0:
|
||||||
@@ -79,13 +125,15 @@ def DeleteCard():
|
|||||||
|
|
||||||
desfire.select_application(MIFARE_APP_ID)
|
desfire.select_application(MIFARE_APP_ID)
|
||||||
desfire.authenticate(0x0, aes_master_key)
|
desfire.authenticate(0x0, aes_master_key)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
desfire.delete_application(MIFARE_APP_ID)
|
desfire.delete_application(MIFARE_APP_ID)
|
||||||
|
logger.info("App deleted!")
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
scannerThread.start()
|
scannerThread.start()
|
||||||
|
return rdata
|
||||||
except Exception as e:
|
except(Exception, AssertionError) as e:
|
||||||
logger.error(f"Error in deletion function: {e}", exc_info=True)
|
logger.error(f"Error in deletion function: {e}", exc_info=True)
|
||||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error: {e}")
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error: {e}")
|
||||||
|
|
||||||
@@ -95,43 +143,29 @@ def WriteNewCard():
|
|||||||
from app.main import scanner as scannerThread
|
from app.main import scanner as scannerThread
|
||||||
scannerThread.stop()
|
scannerThread.stop()
|
||||||
|
|
||||||
cardtype = AnyCardType()
|
cardservice = getCardService(20)
|
||||||
cardrequest = CardRequest(timeout=10, cardType=cardtype)
|
|
||||||
print("Please present DESfire tag...")
|
|
||||||
try:
|
|
||||||
cardservice = cardrequest.waitforcard()
|
|
||||||
except CardRequestTimeoutException:
|
|
||||||
logger.error("No tag detected within the timeout.")
|
|
||||||
return None
|
|
||||||
|
|
||||||
cardservice.connection.connect()
|
|
||||||
|
|
||||||
# Create Desfire object
|
|
||||||
desfire = DESFire(PCSCDevice(cardservice.connection.component))
|
desfire = DESFire(PCSCDevice(cardservice.connection.component))
|
||||||
|
|
||||||
# Create Key objects
|
# Create Key objects
|
||||||
AES_NULL_KEY_DATA = "00" * 16
|
aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
|
||||||
aes_keysettings = KeySettings(
|
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
|
||||||
key_type=DESFireKeyType.DF_KEY_AES,
|
|
||||||
)
|
|
||||||
aes_null_key = DESFireKey(aes_keysettings, AES_NULL_KEY_DATA)
|
|
||||||
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
|
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
|
||||||
keysetting = desfire.get_key_setting()
|
desKey = DESFireKey(desfire.get_key_setting(), "00" * 8)
|
||||||
desKey = DESFireKey(keysetting, "00" * 8)
|
|
||||||
|
|
||||||
# Authenticate with default DES key
|
# Authenticate with default DES key
|
||||||
print("Authenticating with default DES key...")
|
logger.info("Authenticating with default DES key...")
|
||||||
desfire.authenticate(0x0, desKey)
|
desfire.authenticate(0x0, desKey)
|
||||||
|
|
||||||
#get uid
|
#get uid
|
||||||
uid = desfire.get_real_uid()
|
uid = desfire.get_real_uid()
|
||||||
|
|
||||||
# Set default key
|
# Set default key
|
||||||
print("Setting default key...")
|
logger.info("Setting default key...")
|
||||||
desfire.change_default_key(aes_null_key, 0x0)
|
desfire.change_default_key(aes_null_key, 0x0)
|
||||||
|
|
||||||
# Create application
|
# Create application
|
||||||
print("Creating application...")
|
logger.info("Creating application...")
|
||||||
app_settings = KeySettings(
|
app_settings = KeySettings(
|
||||||
settings=[
|
settings=[
|
||||||
DESFireKeySettings.KS_ALLOW_CHANGE_MK,
|
DESFireKeySettings.KS_ALLOW_CHANGE_MK,
|
||||||
@@ -147,34 +181,34 @@ def WriteNewCard():
|
|||||||
applications = desfire.get_application_ids()
|
applications = desfire.get_application_ids()
|
||||||
assert len(applications) == 1
|
assert len(applications) == 1
|
||||||
assert applications[0] == get_list(MIFARE_APP_ID)
|
assert applications[0] == get_list(MIFARE_APP_ID)
|
||||||
print(" - Application created successfully.")
|
logger.info(" - Application created successfully.")
|
||||||
|
|
||||||
# Select application
|
# Select application
|
||||||
desfire.select_application(MIFARE_APP_ID)
|
desfire.select_application(MIFARE_APP_ID)
|
||||||
#Auth again as 0key
|
|
||||||
aes_null_auth_key = DESFireKey(aes_keysettings, AES_NULL_KEY_DATA)
|
|
||||||
desfire.authenticate(0x0, aes_null_auth_key)
|
|
||||||
# Authenticate with AES key, as this has been set as the default key
|
|
||||||
aes_app_mk = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
|
|
||||||
desfire.change_key(0x0, aes_null_key, aes_app_mk, 0x1)
|
|
||||||
|
|
||||||
print("new key auth")
|
#recreate key object
|
||||||
desfire.authenticate(0x0, aes_app_mk)
|
desfire.authenticate(0x0, aes_null_key)
|
||||||
|
desfire.change_key(0x0, aes_null_key, aes_master_key, 0x1)
|
||||||
|
|
||||||
|
logger.info("new key auth")
|
||||||
|
desfire.authenticate(0x0, aes_master_key)
|
||||||
|
|
||||||
|
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
|
||||||
|
|
||||||
#generate div data
|
#generate div data
|
||||||
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
|
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
|
||||||
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
|
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
|
||||||
write_div_key_bytes = diversify_key(get_list(MIFARE_ACL_WRITE_BASE_KEY), diversification_data, pad_to_32=False)
|
write_div_key_bytes = diversify_key(get_list(MIFARE_ACL_WRITE_BASE_KEY), diversification_data, pad_to_32=False)
|
||||||
|
|
||||||
print("Changing file read key...")
|
logger.info("Changing file read key...")
|
||||||
aes_file_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
|
aes_file_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
|
||||||
desfire.change_key(MIFARE_ACL_READ_BASE_KEY_ID, aes_null_key, aes_file_read_key, 0x1)
|
desfire.change_key(MIFARE_ACL_READ_BASE_KEY_ID, aes_null_key, aes_file_read_key, 0x1)
|
||||||
|
|
||||||
print("Changing file write key...")
|
logger.info("Changing file write key...")
|
||||||
aes_file_write_key = DESFireKey(aes_keysettings, write_div_key_bytes)
|
aes_file_write_key = DESFireKey(aes_keysettings, write_div_key_bytes)
|
||||||
desfire.change_key(MIFARE_ACL_WRITE_BASE_KEY_ID, aes_null_key, aes_file_write_key, 0x1)
|
desfire.change_key(MIFARE_ACL_WRITE_BASE_KEY_ID, aes_null_key, aes_file_write_key, 0x1)
|
||||||
|
|
||||||
print("Create encrypted file containing UUID...")
|
logger.info("Create encrypted file containing key...")
|
||||||
file_settings = FileSettings(
|
file_settings = FileSettings(
|
||||||
file_size=16,
|
file_size=16,
|
||||||
encryption=DESFireCommunicationMode.ENCRYPTED,
|
encryption=DESFireCommunicationMode.ENCRYPTED,
|
||||||
@@ -185,25 +219,16 @@ def WriteNewCard():
|
|||||||
file_type=DESFireFileType.MDFT_STANDARD_DATA_FILE,
|
file_type=DESFireFileType.MDFT_STANDARD_DATA_FILE,
|
||||||
)
|
)
|
||||||
desfire.create_standard_file(MIFARE_ENCRYPTED_FILE_ID, file_settings)
|
desfire.create_standard_file(MIFARE_ENCRYPTED_FILE_ID, file_settings)
|
||||||
|
|
||||||
print("Read and verify file settings again...")
|
|
||||||
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
|
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
|
||||||
assert file_data.file_size == 16
|
|
||||||
assert file_data.encryption == DESFireCommunicationMode.ENCRYPTED
|
|
||||||
assert file_data.permissions is not None
|
|
||||||
assert file_data.permissions.read_access == MIFARE_ACL_READ_BASE_KEY_ID
|
|
||||||
assert file_data.permissions.write_access == MIFARE_ACL_WRITE_BASE_KEY_ID
|
|
||||||
assert file_data.file_type == DESFireFileType.MDFT_STANDARD_DATA_FILE
|
|
||||||
print(" - File created successfully.")
|
|
||||||
|
|
||||||
print("Writing UID to encrypted file...")
|
logger.info("Writing UID to encrypted file...")
|
||||||
key = secrets.token_hex(16)
|
key = secrets.token_hex(16)
|
||||||
desfire.write_file_data(MIFARE_ENCRYPTED_FILE_ID, 0x0, file_data.encryption, get_list(key))
|
desfire.write_file_data(MIFARE_ENCRYPTED_FILE_ID, 0x0, file_data.encryption, get_list(key))
|
||||||
|
|
||||||
print("Reading from encrypted file...")
|
logger.info("Reading from encrypted file...")
|
||||||
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
|
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
|
||||||
assert rdata == get_list(key)
|
assert rdata == get_list(key)
|
||||||
print(" - Data written successfully.")
|
logger.info(" - Data written successfully.")
|
||||||
scannerThread.start()
|
scannerThread.start()
|
||||||
return key
|
return key
|
||||||
|
|
||||||
@@ -238,12 +263,11 @@ class BackgroundScanner:
|
|||||||
try:
|
try:
|
||||||
card_content = self._read_card()
|
card_content = self._read_card()
|
||||||
if card_content:
|
if card_content:
|
||||||
logger.info(to_hex_string(card_content))
|
self._check_db(card_content)
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
logger.debug("READY after success")
|
logger.debug("READY after success")
|
||||||
#self._check_db(card_content)
|
|
||||||
else:
|
else:
|
||||||
time.sleep(0.5)
|
time.sleep(0.1)
|
||||||
logger.debug("READY after timout")
|
logger.debug("READY after timout")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -251,53 +275,24 @@ class BackgroundScanner:
|
|||||||
time.sleep(6)
|
time.sleep(6)
|
||||||
|
|
||||||
def _read_card(self):
|
def _read_card(self):
|
||||||
cardtype = AnyCardType()
|
time.sleep(0.5)
|
||||||
cardrequest = CardRequest(timeout=5, cardType=cardtype)
|
|
||||||
try:
|
try:
|
||||||
cardservice = cardrequest.waitforcard()
|
cardservice = getCardService(3)
|
||||||
except CardRequestTimeoutException:
|
except CardRequestTimeoutException:
|
||||||
logger.debug("No tag detected within the timeout.")
|
logger.debug("No tag detected within the timeout.")
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
|
||||||
cardservice.connection.connect()
|
|
||||||
|
|
||||||
# Create Desfire object
|
# Create Desfire object
|
||||||
desfire = DESFire(PCSCDevice(cardservice.connection.component))
|
desfire = DESFire(PCSCDevice(cardservice.connection.component))
|
||||||
aes_keysettings = KeySettings(
|
|
||||||
key_type=DESFireKeyType.DF_KEY_AES,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Get real UID
|
|
||||||
mk = DESFireKey(desfire.get_key_setting(), "00" * 8)
|
|
||||||
desfire.authenticate(0x0, mk)
|
|
||||||
#To get the uid you have to auth with an empty (default) key
|
|
||||||
uid = desfire.get_real_uid()
|
|
||||||
applications = desfire.get_application_ids()
|
|
||||||
try:
|
try:
|
||||||
assert len(applications) == 1
|
rdata = readFileOnCard(desfire=desfire)
|
||||||
assert applications[0] == get_list(MIFARE_APP_ID)
|
|
||||||
except AssertionError:
|
|
||||||
logger.error("No application found!")
|
|
||||||
time.sleep(4)
|
|
||||||
return None
|
|
||||||
#Then use the key derivation with that uid, the appid, the sysid
|
|
||||||
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
|
|
||||||
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
|
|
||||||
|
|
||||||
#Log in with derived read key
|
|
||||||
logger.info("Start auth")
|
|
||||||
aes_app_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
|
|
||||||
desfire.select_application(MIFARE_APP_ID)
|
|
||||||
|
|
||||||
desfire.authenticate(MIFARE_ACL_READ_BASE_KEY_ID, aes_app_read_key)
|
|
||||||
|
|
||||||
logger.info("Read data")
|
|
||||||
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
|
|
||||||
logger.info(f"File settings: {file_data}")
|
|
||||||
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
|
|
||||||
return rdata
|
return rdata
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"something went wrong: {e}")
|
logger.error(f"something went wrong: {e}")
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
|
def _check_db(self, key):
|
||||||
|
check = checkAccess(key, self.db)
|
||||||
|
if check == True:
|
||||||
|
openDoor()
|
||||||
|
else:
|
||||||
|
logger.info("Access denied!")
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ dependencies = [
|
|||||||
"sqlmodel>=0.0.38",
|
"sqlmodel>=0.0.38",
|
||||||
"poetry>=2.3.4",
|
"poetry>=2.3.4",
|
||||||
"python-desfire",
|
"python-desfire",
|
||||||
"paho-mqtt>=2.1.0",
|
|
||||||
"pyjwt[crypto]>=2.12.1",
|
"pyjwt[crypto]>=2.12.1",
|
||||||
"pwdlib[argon2]>=0.3.0",
|
"pwdlib[argon2]>=0.3.0",
|
||||||
"pytest>=9.0.3",
|
"pytest>=9.0.3",
|
||||||
|
|||||||
@@ -4,13 +4,6 @@ def test_app_startup(client):
|
|||||||
# Application should respond (even if it's a 404)
|
# Application should respond (even if it's a 404)
|
||||||
assert response.status_code in [404, 200]
|
assert response.status_code in [404, 200]
|
||||||
|
|
||||||
|
|
||||||
def test_health_check(client):
|
|
||||||
"""Test basic health check endpoint if it exists."""
|
|
||||||
# Note: This would require adding a health check endpoint
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def test_router_includes():
|
def test_router_includes():
|
||||||
"""Test that all routers are included in the app."""
|
"""Test that all routers are included in the app."""
|
||||||
from app.main import app
|
from app.main import app
|
||||||
|
|||||||
@@ -75,8 +75,9 @@ def test_assign_already_assigned_access_auth(client, auth_headers, test_group, t
|
|||||||
f"/aa/assign/{test_group.id}/{test_aa.id}",
|
f"/aa/assign/{test_group.id}/{test_aa.id}",
|
||||||
headers=auth_headers
|
headers=auth_headers
|
||||||
)
|
)
|
||||||
# According to the code, this returns 200 with "already assigned" message
|
# According to the code, this returns 409 with "already assigned" message
|
||||||
assert response.status_code == 200
|
assert response.status_code == 409
|
||||||
|
assert "already assigned" in response.json()["detail"].lower()
|
||||||
|
|
||||||
|
|
||||||
def test_unassign_access_auth_from_group(client, auth_headers, test_group, test_aa):
|
def test_unassign_access_auth_from_group(client, auth_headers, test_group, test_aa):
|
||||||
@@ -147,6 +148,11 @@ def test_update_access_auth_with_timetables(client, auth_headers, test_aa):
|
|||||||
headers=auth_headers
|
headers=auth_headers
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
jresponse = response.json()
|
||||||
|
assert len(jresponse["timetables"]) == 1
|
||||||
|
assert jresponse["timetables"][0]["weekday"] == 5
|
||||||
|
assert jresponse["timetables"][0]["starttime"] == "10:00:00"
|
||||||
|
assert jresponse["timetables"][0]["duration"] == 120
|
||||||
|
|
||||||
|
|
||||||
def test_update_nonexistent_access_auth(client, auth_headers):
|
def test_update_nonexistent_access_auth(client, auth_headers):
|
||||||
|
|||||||
@@ -131,7 +131,6 @@ def test_auth_is_admin(db_session, admin_user, regular_user):
|
|||||||
|
|
||||||
def test_create_first_user(db_session):
|
def test_create_first_user(db_session):
|
||||||
"""Test automatic creation of first admin user."""
|
"""Test automatic creation of first admin user."""
|
||||||
#Currently broken because this uses the prod db because of how i wrote the create_first_user function
|
|
||||||
# Clear any existing users
|
# Clear any existing users
|
||||||
from sqlmodel import select
|
from sqlmodel import select
|
||||||
db_session.exec(select(UserDB)).all()
|
db_session.exec(select(UserDB)).all()
|
||||||
@@ -140,7 +139,7 @@ def test_create_first_user(db_session):
|
|||||||
db_session.commit()
|
db_session.commit()
|
||||||
|
|
||||||
# Create first user
|
# Create first user
|
||||||
result = create_first_user()
|
result = create_first_user(db=db_session)
|
||||||
assert result is not None
|
assert result is not None
|
||||||
assert result.name == "admin"
|
assert result.name == "admin"
|
||||||
assert result.is_admin is True
|
assert result.is_admin is True
|
||||||
@@ -151,7 +150,7 @@ def test_create_first_user(db_session):
|
|||||||
assert user.is_admin is True
|
assert user.is_admin is True
|
||||||
|
|
||||||
# Test that it doesn't create another admin if one exists
|
# Test that it doesn't create another admin if one exists
|
||||||
second_result = create_first_user()
|
second_result = create_first_user(db=db_session)
|
||||||
assert second_result is None # Should print "Admin user already exists"
|
assert second_result is None # Should print "Admin user already exists"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,41 +1,6 @@
|
|||||||
import pytest
|
import pytest
|
||||||
from fastapi import status
|
from fastapi import status
|
||||||
|
|
||||||
|
|
||||||
def test_add_card(client, auth_headers, test_group):
|
|
||||||
"""Test adding a card to a group."""
|
|
||||||
response = client.post(f"/cards/{test_group.id}", headers=auth_headers)
|
|
||||||
assert response.status_code == 200
|
|
||||||
|
|
||||||
data = response.json()
|
|
||||||
assert "id" in data
|
|
||||||
assert "uuid" in data
|
|
||||||
assert data["group_id"] == test_group.id
|
|
||||||
assert len(data["uuid"]) > 0 # UUID should be generated
|
|
||||||
|
|
||||||
|
|
||||||
def test_add_card_to_nonexistent_group(client, auth_headers):
|
|
||||||
"""Test adding a card to a non-existent group."""
|
|
||||||
response = client.post("/cards/99999", headers=auth_headers)
|
|
||||||
# This might succeed and create a card with a non-existent group_id
|
|
||||||
# or fail depending on foreign key constraints
|
|
||||||
# For now, let's assume it might fail
|
|
||||||
# assert response.status_code == 404
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_card(client, auth_headers, test_card):
|
|
||||||
"""Test deleting a card."""
|
|
||||||
response = client.delete(f"/cards/{test_card.id}", headers=auth_headers)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert "deleted successfully" in response.json()["message"].lower()
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_nonexistent_card(client, auth_headers):
|
|
||||||
"""Test deleting a non-existent card."""
|
|
||||||
response = client.delete("/cards/99999", headers=auth_headers)
|
|
||||||
assert response.status_code == 404
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_cards_for_group(client, auth_headers, test_group, test_card):
|
def test_get_cards_for_group(client, auth_headers, test_group, test_card):
|
||||||
"""Test getting all cards for a group."""
|
"""Test getting all cards for a group."""
|
||||||
response = client.get(f"/cards/{test_group.id}", headers=auth_headers)
|
response = client.get(f"/cards/{test_group.id}", headers=auth_headers)
|
||||||
|
|||||||
61
test/test_services/test_door.py
Normal file
61
test/test_services/test_door.py
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import pytest
|
||||||
|
import datetime
|
||||||
|
from app.services.door import checkAccess
|
||||||
|
from app.model.models import Card, GroupDB, AccessAuthorizationDB, Timetable
|
||||||
|
|
||||||
|
def test_check_access_with_valid_timetable(db_session):
|
||||||
|
# Setup: create card with valid access
|
||||||
|
group = GroupDB(name="Test Group")
|
||||||
|
db_session.add(group)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
card = Card(uuid="test-uuid-123", group_id=group.id)
|
||||||
|
db_session.add(card)
|
||||||
|
|
||||||
|
timetable = Timetable(
|
||||||
|
weekday=datetime.datetime.weekday(datetime.date.today()),
|
||||||
|
starttime=datetime.datetime.now().time(),
|
||||||
|
duration=120 # 2 hours
|
||||||
|
)
|
||||||
|
db_session.add(timetable)
|
||||||
|
|
||||||
|
aa = AccessAuthorizationDB(name="Test AA", is_active=True)
|
||||||
|
db_session.add(aa)
|
||||||
|
aa.timetables = [timetable]
|
||||||
|
group.accessauths = [aa]
|
||||||
|
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Test: access should be granted within time window
|
||||||
|
result = checkAccess("test-uuid-123", db_session)
|
||||||
|
assert result == True
|
||||||
|
|
||||||
|
def test_check_access_outside_hours(db_session):
|
||||||
|
# Test when current time is outside valid hours
|
||||||
|
group = GroupDB(name="Test Group")
|
||||||
|
db_session.add(group)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
card = Card(uuid="test-uuid-123", group_id=group.id)
|
||||||
|
db_session.add(card)
|
||||||
|
|
||||||
|
timetable = Timetable(
|
||||||
|
weekday=datetime.datetime.weekday(datetime.date.today()),
|
||||||
|
starttime=datetime.time(1, 0),
|
||||||
|
duration=1 # 2 hours
|
||||||
|
)
|
||||||
|
db_session.add(timetable)
|
||||||
|
|
||||||
|
aa = AccessAuthorizationDB(name="Test AA", is_active=True)
|
||||||
|
db_session.add(aa)
|
||||||
|
aa.timetables = [timetable]
|
||||||
|
group.accessauths = [aa]
|
||||||
|
|
||||||
|
db_session.commit()
|
||||||
|
result = checkAccess("test-uuid-123", db_session)
|
||||||
|
assert result == False
|
||||||
|
|
||||||
|
def test_check_access_invalid_card(db_session):
|
||||||
|
# Should raise exception for non-existent card
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
checkAccess("non-existent-uuid", db_session)
|
||||||
11
uv.lock
generated
11
uv.lock
generated
@@ -609,7 +609,6 @@ version = "0.1.0"
|
|||||||
source = { virtual = "." }
|
source = { virtual = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "fastapi", extra = ["standard"] },
|
{ name = "fastapi", extra = ["standard"] },
|
||||||
{ name = "paho-mqtt" },
|
|
||||||
{ name = "poetry" },
|
{ name = "poetry" },
|
||||||
{ name = "pwdlib", extra = ["argon2"] },
|
{ name = "pwdlib", extra = ["argon2"] },
|
||||||
{ name = "pyjwt", extra = ["crypto"] },
|
{ name = "pyjwt", extra = ["crypto"] },
|
||||||
@@ -625,7 +624,6 @@ dependencies = [
|
|||||||
[package.metadata]
|
[package.metadata]
|
||||||
requires-dist = [
|
requires-dist = [
|
||||||
{ name = "fastapi", extras = ["standard"], specifier = ">=0.135.3" },
|
{ name = "fastapi", extras = ["standard"], specifier = ">=0.135.3" },
|
||||||
{ name = "paho-mqtt", specifier = ">=2.1.0" },
|
|
||||||
{ name = "poetry", specifier = ">=2.3.4" },
|
{ name = "poetry", specifier = ">=2.3.4" },
|
||||||
{ name = "pwdlib", extras = ["argon2"], specifier = ">=0.3.0" },
|
{ name = "pwdlib", extras = ["argon2"], specifier = ">=0.3.0" },
|
||||||
{ name = "pyjwt", extras = ["crypto"], specifier = ">=2.12.1" },
|
{ name = "pyjwt", extras = ["crypto"], specifier = ">=2.12.1" },
|
||||||
@@ -952,15 +950,6 @@ wheels = [
|
|||||||
{ url = "https://files.pythonhosted.org/packages/7a/c2/920ef838e2f0028c8262f16101ec09ebd5969864e5a64c4c05fad0617c56/packaging-26.1-py3-none-any.whl", hash = "sha256:5d9c0669c6285e491e0ced2eee587eaf67b670d94a19e94e3984a481aba6802f", size = 95831, upload-time = "2026-04-14T21:12:47.56Z" },
|
{ url = "https://files.pythonhosted.org/packages/7a/c2/920ef838e2f0028c8262f16101ec09ebd5969864e5a64c4c05fad0617c56/packaging-26.1-py3-none-any.whl", hash = "sha256:5d9c0669c6285e491e0ced2eee587eaf67b670d94a19e94e3984a481aba6802f", size = 95831, upload-time = "2026-04-14T21:12:47.56Z" },
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "paho-mqtt"
|
|
||||||
version = "2.1.0"
|
|
||||||
source = { registry = "https://pypi.org/simple" }
|
|
||||||
sdist = { url = "https://files.pythonhosted.org/packages/39/15/0a6214e76d4d32e7f663b109cf71fb22561c2be0f701d67f93950cd40542/paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834", size = 148848, upload-time = "2024-04-29T19:52:55.591Z" }
|
|
||||||
wheels = [
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219, upload-time = "2024-04-29T19:52:48.345Z" },
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pbs-installer"
|
name = "pbs-installer"
|
||||||
version = "2026.4.7"
|
version = "2026.4.7"
|
||||||
|
|||||||
Reference in New Issue
Block a user