13 Commits
door ... master

Author SHA1 Message Date
9160b312c7 Fix the things i broke using logger.info 2026-05-23 20:35:12 +02:00
07669fc1fc I don't know how to test hardware functions 2026-05-23 20:27:16 +02:00
94d428c9d0 Change all instances of print() to logger.info() 2026-05-23 20:17:43 +02:00
a4bedba628 Remove mqtt 2026-05-23 20:09:12 +02:00
e6a248529b Fix test test_create_first_user() 2026-05-23 20:08:10 +02:00
713d41e81c Add door.py tests 2026-05-23 19:49:02 +02:00
6ec3bc5f14 Improve assign_accessauth test and change already assigned to 409 2026-05-23 18:58:59 +02:00
9e6510f465 Remove unused test 2026-05-23 18:58:20 +02:00
5a6cd970dd Fix change_accessauth() not changing timetables
Also fix the test not actually checking the response :/
2026-05-23 18:41:50 +02:00
3d1893d84e Fix door check db session 2026-05-23 17:32:28 +02:00
495535a6de Scanner write-read-delete workflow working! 2026-05-23 00:30:03 +02:00
30b86dad5e Increase login time 2026-05-22 23:14:55 +02:00
92eef82e0a Slight improvements 2026-05-22 21:56:30 +02:00
15 changed files with 245 additions and 217 deletions

View File

@@ -1,4 +1,6 @@
from fastapi import APIRouter, Depends, HTTPException import logging
logger = logging.getLogger(__name__)
from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select from sqlmodel import Session, select
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
from typing import List from typing import List
@@ -12,7 +14,7 @@ aa_router = APIRouter(prefix="/aa", tags=["AccessAuth"])
@aa_router.post("/", response_model=AccessAuthorizationResponse) @aa_router.post("/", response_model=AccessAuthorizationResponse)
def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate, admin: bool = Depends(auth_is_admin)): def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate, admin: bool = Depends(auth_is_admin)):
print("Creating accessauth with data: ", aa) logger.info(f"Creating accessauth with data: {aa}")
timetables = [Timetable.model_validate(t) for t in aa.timetables] timetables = [Timetable.model_validate(t) for t in aa.timetables]
db_aa = AccessAuthorizationDB( db_aa = AccessAuthorizationDB(
name=aa.name, name=aa.name,
@@ -44,7 +46,7 @@ def assign_accessauth(*, db: Session = Depends(get_session), group_id: int, aa_i
if db_aa is None: if db_aa is None:
raise HTTPException(status_code=404, detail="AA not found") raise HTTPException(status_code=404, detail="AA not found")
if db_aa in db_group.accessauths: if db_aa in db_group.accessauths:
raise HTTPException(status_code=200, detail="AA already assigned to group") raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="AA already assigned to group")
db_group.accessauths.append(db_aa) db_group.accessauths.append(db_aa)
return add_and_refresh(db, db_group) return add_and_refresh(db, db_group)
@@ -66,7 +68,12 @@ def change_accessauth(*, db: Session = Depends(get_session), aa_id: int, aa: Acc
db_aa = db.get(AccessAuthorizationDB, aa_id) db_aa = db.get(AccessAuthorizationDB, aa_id)
if db_aa is None: if db_aa is None:
raise HTTPException(status_code=404, detail="AccessAuthorization not found") raise HTTPException(status_code=404, detail="AccessAuthorization not found")
aa_data = aa.dict(exclude_unset=True) aa_data = aa.model_dump(exclude_unset=True)
if "timetables" in aa_data and aa_data["timetables"] is not None:
db_aa.timetables.clear()
timetables = [Timetable.model_validate(t) for t in aa_data["timetables"]]
db_aa.timetables = timetables
aa_data.pop("timetables")
db_aa.sqlmodel_update(aa_data) db_aa.sqlmodel_update(aa_data)
return add_and_refresh(db, db_aa) return add_and_refresh(db, db_aa)

View File

@@ -1,6 +1,9 @@
import logging
logger = logging.getLogger(__name__)
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import Session, select from sqlmodel import Session, select
from typing import List from typing import List
from sqlalchemy.exc import NoResultFound
from ..model.models import Card from ..model.models import Card
from ..services.database import engine, get_session, add_and_refresh from ..services.database import engine, get_session, add_and_refresh
@@ -13,7 +16,7 @@ card_router = APIRouter(prefix="/cards", tags=["Card"])
def register_card(group_id: int): def register_card(group_id: int):
key = WriteNewCard() key = WriteNewCard()
if key == None: if key == None:
print("No card registered. Check logs!") logger.info("No card registered. Check logs!")
raise HTTPException(status.HTTP_417_EXPECTATION_FAILED, detail="No card registered. Check logs!") raise HTTPException(status.HTTP_417_EXPECTATION_FAILED, detail="No card registered. Check logs!")
card = Card(group_id=group_id, uuid=key) card = Card(group_id=group_id, uuid=key)
return card return card
@@ -26,13 +29,16 @@ def add_card(*, db: Session = Depends(get_session), group_id: int, admin: bool =
@card_router.get("/delete") @card_router.get("/delete")
def del_card(*, db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)): def del_card(*, db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
key = DeleteCard() key = DeleteCard()
# card = db.get(Card, card_id) logger.info(key)
# if card is None: try:
# raise HTTPException(status_code=404, detail="Card not found") card = db.exec(select(Card).where(Card.uuid == key)).one()
# db.delete(card) except NoResultFound:
# db.commit() logger.info(f"The key:'{key}' was not found in db!")
# return {"message": "Card deleted successfully"} raise HTTPException(status_code=500, detail="Key on card not found in DB. Please tell an admin about this. KEY={key}")
##TBH not a big fan of having creation using group_id but deletion using card_id db.delete(card)
db.commit()
return {"message": "Card deleted successfully"}
@card_router.get("/{group_id}", response_model=List[Card]) @card_router.get("/{group_id}", response_model=List[Card])
def get_cards(*, db: Session = Depends(get_session), group_id: int, admin: bool = Depends(auth_is_admin)): def get_cards(*, db: Session = Depends(get_session), group_id: int, admin: bool = Depends(auth_is_admin)):
cards = db.exec(select(Card).where(Card.group_id == group_id)).all() cards = db.exec(select(Card).where(Card.group_id == group_id)).all()

View File

@@ -1,3 +1,5 @@
import logging
logger = logging.getLogger(__name__)
from fastapi import APIRouter, HTTPException, Depends from fastapi import APIRouter, HTTPException, Depends
from sqlmodel import Session, select from sqlmodel import Session, select
from typing import List from typing import List
@@ -10,7 +12,7 @@ user_router = APIRouter(tags=["Users"])
@user_router.post("/users/", response_model=UserResponse) @user_router.post("/users/", response_model=UserResponse)
def create_user(*, db: Session = Depends(get_session), user: UserCreate, admin: bool = Depends(auth_is_admin)): def create_user(*, db: Session = Depends(get_session), user: UserCreate, admin: bool = Depends(auth_is_admin)):
print("creating user with data ", user) logger.info(f"creating user with data: {user}")
hashed_password = {"passwordhash": get_password_hash(user.password)} hashed_password = {"passwordhash": get_password_hash(user.password)}
db_user = UserDB.model_validate(user, update=hashed_password) db_user = UserDB.model_validate(user, update=hashed_password)
return add_and_refresh(db, db_user) return add_and_refresh(db, db_user)

View File

@@ -1,22 +1,26 @@
import logging
logger = logging.getLogger(__name__)
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dotenv import load_dotenv from dotenv import load_dotenv
from .controllers import userManager, cardManager, groupManager, aaManager, doorManager from .controllers import userManager, cardManager, groupManager, aaManager, doorManager
from .services.database import create_db_and_tables, get_session from .services.database import create_db_and_tables, get_db_session
from .services.auth import token_router, create_first_user from .services.auth import token_router, create_first_user
from app.services.scanner import BackgroundScanner from app.services.scanner import BackgroundScanner
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
scanner = BackgroundScanner(db=get_session()) scanner = BackgroundScanner(db=get_db_session())
logging.basicConfig(level=logging.INFO)
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
load_dotenv() load_dotenv()
create_db_and_tables() create_db_and_tables()
create_first_user() create_first_user(db=get_db_session())
print("Database created and tables initialized.") logger.info("Database created and tables initialized.")
scanner.start() scanner.start()
yield yield
#scanner.stop() #scanner.stop()

View File

@@ -1,3 +1,5 @@
import logging
logger = logging.getLogger(__name__)
from typing import Annotated from typing import Annotated
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException, Depends, status from fastapi import APIRouter, HTTPException, Depends, status
@@ -12,7 +14,7 @@ import secrets, string
SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing
ALGORITHM = "HS256" ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 ACCESS_TOKEN_EXPIRE_MINUTES = 120
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@@ -83,20 +85,19 @@ def auth_is_admin(
) )
return True return True
def create_first_user(): def create_first_user(db: Session):
print("Checking for admin user") logger.info("Checking for admin user")
with Session(engine) as db:
admin_user = db.exec(select(UserDB)).first() admin_user = db.exec(select(UserDB)).first()
if admin_user is None: if admin_user is None:
password = ''.join(secrets.choice(string.digits) for i in range(8)) password = ''.join(secrets.choice(string.digits) for i in range(8))
print("Creating first admin user with password", password) logger.info(f"Creating first admin user with password: {password}")
user = UserDB( user = UserDB(
name="admin", name="admin",
passwordhash=get_password_hash(password), passwordhash=get_password_hash(password),
is_admin=True is_admin=True
) )
return add_and_refresh(db, user) return add_and_refresh(db, user)
print(f"Admin user already exists: {admin_user.name}") logger.info(f"Admin user already exists: {admin_user.name}")
@token_router.post("/token") @token_router.post("/token")

View File

@@ -13,6 +13,9 @@ def get_session():
with Session(engine) as db: with Session(engine) as db:
yield db yield db
def get_db_session():
return Session(engine)
def add_and_refresh(db: Session, obj): def add_and_refresh(db: Session, obj):
db.add(obj) db.add(obj)
db.commit() db.commit()

View File

@@ -1,7 +1,5 @@
import logging
import paho.mqtt.client as mqttClient logger = logging.getLogger(__name__)
import paho.mqtt.publish as publish
from sqlmodel import select from sqlmodel import select
from fastapi import Depends, HTTPException, status from fastapi import Depends, HTTPException, status
from sqlalchemy.orm import selectinload from sqlalchemy.orm import selectinload
@@ -12,43 +10,43 @@ from app.services.database import Session, get_session
from app.model.models import * from app.model.models import *
doorIsOpen = True doorIsOpen = True
client = mqttClient.Client(client_id="", userdata=None, protocol=mqttClient.MQTTv5)
client.tls_set(tls_version=mqttClient.ssl.PROTOCOL_TLS)
client.username_pw_set("username", "passwort")
#client.connect("host", port=8883)
# I think this could also be gpio controlled # I think this could also be gpio controlled
#See: https://github.com/technyon/nuki_hub#gpio-lock-control-optional #See: https://github.com/technyon/nuki_hub#gpio-lock-control-optional
def openDoor(): def openDoor():
global doorIsOpen
doorIsOpen = True doorIsOpen = True
publish.single(topic="/lock/action", payload="unlock") logger.info("Still needs gpio out")
pass pass
def closeDoor(): def closeDoor():
global doorIsOpen
doorIsOpen = False doorIsOpen = False
publish.single(topic="/lock/action", payload="lock") logger.info("Still needs gpio out")
pass pass
def isDoorOpen(): def isDoorOpen():
return doorIsOpen return doorIsOpen
def checkAccess(uuid: str, db: Session = Depends(get_session)): def checkAccess(uuid: str, db: Session):
try: try:
current_weekday = datetime.datetime.weekday(datetime.date.today()) current_weekday = datetime.datetime.weekday(datetime.date.today())
current_time = datetime.datetime.now() current_time = datetime.datetime.now()
card = db.exec(select(Card).where(Card.uuid == uuid)).one() card = db.exec(select(Card).where(Card.uuid == uuid)).one()
for auth in card.group.accessauths: for auth in card.group.accessauths:
print(f"checking auth: {auth.name}") logger.info(f"checking auth: {auth.name}")
for timetable in auth.timetables: for timetable in auth.timetables:
print(f" checking timetable {timetable.id}") logger.info(f" checking timetable {timetable.id}")
print(f" comparing weekday: CUR:{current_weekday} TT:{timetable.weekday}") logger.info(f" comparing weekday: CUR:{current_weekday} TT:{timetable.weekday}")
if current_weekday == timetable.weekday: if current_weekday == timetable.weekday:
starttime = datetime.datetime.combine(datetime.date.today(), timetable.starttime) starttime = datetime.datetime.combine(datetime.date.today(), timetable.starttime)
endtime = starttime + datetime.timedelta(minutes=timetable.duration) endtime = starttime + datetime.timedelta(minutes=timetable.duration)
print(f" comparing time: Start:{starttime} Current:{current_time} End:{endtime}") logger.info(f" comparing time: Start:{starttime} Current:{current_time} End:{endtime}")
if starttime < current_time < endtime: if starttime < current_time < endtime:
logger.info("Access Valid!")
return True return True
logger.info("No more auths found")
return False return False
except exc.NoResultFound: except exc.NoResultFound:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise Exception("No Access with that key found, this might be a db error")

View File

@@ -1,4 +1,6 @@
import logging import logging
logger = logging.getLogger(__name__)
import threading import threading
import time import time
import os import os
@@ -18,6 +20,8 @@ from desfire.enums import DESFireCommunicationMode, DESFireFileType, DESFireKeyS
from desfire.schemas import FilePermissions, FileSettings, KeySettings from desfire.schemas import FilePermissions, FileSettings, KeySettings
import desfire.exceptions as desExceptions import desfire.exceptions as desExceptions
from app.services.door import openDoor, closeDoor, isDoorOpen, checkAccess
#ENV vars #ENV vars
load_dotenv() load_dotenv()
@@ -32,10 +36,6 @@ MIFARE_ACL_WRITE_BASE_KEY_ID = 0x2
MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything
MIFARE_ENCRYPTED_FILE_ID = 0x1 MIFARE_ENCRYPTED_FILE_ID = 0x1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def checkForKey(): def checkForKey():
if MIFARE_APP_MASTER_KEY == None: if MIFARE_APP_MASTER_KEY == None:
logger.critical("NO MASTER KEY LOADED") logger.critical("NO MASTER KEY LOADED")
@@ -44,34 +44,80 @@ def checkForKey():
def getCardService(timeout: int = 10): def getCardService(timeout: int = 10):
cardtype = AnyCardType() cardtype = AnyCardType()
cardrequest = CardRequest(timeout=timeout, cardType=cardtype) cardrequest = CardRequest(timeout=timeout, cardType=cardtype)
print("Please present DESfire tag...")
try:
cardservice = cardrequest.waitforcard() cardservice = cardrequest.waitforcard()
except CardRequestTimeoutException: cardservice.connection.connect()
logger.error("No tag detected within the timeout.")
raise Exception
return cardservice return cardservice
def readFileOnCard(desfire: DESFire):
#create keys
#desfire = DESFire(PCSCDevice(cardservice.connection.component))
aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
keysettings = desfire.get_key_setting()
desKey = DESFireKey(keysettings, "00" * 8)
# Get real UID
desfire.authenticate(0x0, desKey)
#To get the uid you have to auth with an empty (default) key
uid = desfire.get_real_uid()
applications = desfire.get_application_ids()
try:
assert len(applications) == 1
assert applications[0] == get_list(MIFARE_APP_ID)
except AssertionError:
logger.error("No application found!")
time.sleep(4)
return
#Then use the key derivation with that uid, the appid, the sysid
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
#Log in with derived read key
logger.info("Start auth")
aes_app_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
desfire.select_application(MIFARE_APP_ID)
desfire.authenticate(MIFARE_ACL_READ_BASE_KEY_ID, aes_app_read_key)
logger.info("Read data")
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
#convert list of int to str
rdata = to_hex_string(rdata).replace(" ", "").lower()
logger.info(f"Data on card: {rdata}")
return rdata
def DeleteCard(): def DeleteCard():
try: try:
checkForKey() checkForKey()
from app.main import scanner as scannerThread from app.main import scanner as scannerThread
scannerThread.stop() scannerThread.stop()
cardservice = getCardService(15) cardservice = getCardService(15)
cardservice.connection.connect()
# Create Desfire object # Create Desfire object
desfire = DESFire(PCSCDevice(cardservice.connection.component)) desfire = DESFire(PCSCDevice(cardservice.connection.component))
rdata = readFileOnCard(desfire=desfire)
# Create Key objects # Create Key objects
AES_NULL_KEY_DATA = "00" * 8 aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
aes_keysettings = KeySettings( des_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_2K3DES)
key_type=DESFireKeyType.DF_KEY_AES, desKey = DESFireKey(des_keysettings, "00" * 8)
)
key_settings = desfire.get_key_setting()
aes_null_key = DESFireKey(key_settings, AES_NULL_KEY_DATA)
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY) aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
desfire.select_application(0x0)
try:
try:
logger.info("Auth1")
desfire.authenticate(0x0, desKey)
except:
logger.info("Auth2")
desfire.authenticate(0x0, aes_null_key) desfire.authenticate(0x0, aes_null_key)
except:
logger.info("Auth3")
desfire.authenticate(0x0, aes_master_key)
applications = desfire.get_application_ids() applications = desfire.get_application_ids()
logger.debug(f"Applications: {applications}") logger.debug(f"Applications: {applications}")
if len(applications) == 0: if len(applications) == 0:
@@ -79,13 +125,15 @@ def DeleteCard():
desfire.select_application(MIFARE_APP_ID) desfire.select_application(MIFARE_APP_ID)
desfire.authenticate(0x0, aes_master_key) desfire.authenticate(0x0, aes_master_key)
try: try:
desfire.delete_application(MIFARE_APP_ID) desfire.delete_application(MIFARE_APP_ID)
logger.info("App deleted!")
except Exception: except Exception:
pass pass
scannerThread.start() scannerThread.start()
return rdata
except Exception as e: except(Exception, AssertionError) as e:
logger.error(f"Error in deletion function: {e}", exc_info=True) logger.error(f"Error in deletion function: {e}", exc_info=True)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error: {e}")
@@ -95,43 +143,29 @@ def WriteNewCard():
from app.main import scanner as scannerThread from app.main import scanner as scannerThread
scannerThread.stop() scannerThread.stop()
cardtype = AnyCardType() cardservice = getCardService(20)
cardrequest = CardRequest(timeout=10, cardType=cardtype)
print("Please present DESfire tag...")
try:
cardservice = cardrequest.waitforcard()
except CardRequestTimeoutException:
logger.error("No tag detected within the timeout.")
return None
cardservice.connection.connect()
# Create Desfire object
desfire = DESFire(PCSCDevice(cardservice.connection.component)) desfire = DESFire(PCSCDevice(cardservice.connection.component))
# Create Key objects # Create Key objects
AES_NULL_KEY_DATA = "00" * 16 aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
aes_keysettings = KeySettings( aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
key_type=DESFireKeyType.DF_KEY_AES,
)
aes_null_key = DESFireKey(aes_keysettings, AES_NULL_KEY_DATA)
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY) aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
keysetting = desfire.get_key_setting() desKey = DESFireKey(desfire.get_key_setting(), "00" * 8)
desKey = DESFireKey(keysetting, "00" * 8)
# Authenticate with default DES key # Authenticate with default DES key
print("Authenticating with default DES key...") logger.info("Authenticating with default DES key...")
desfire.authenticate(0x0, desKey) desfire.authenticate(0x0, desKey)
#get uid #get uid
uid = desfire.get_real_uid() uid = desfire.get_real_uid()
# Set default key # Set default key
print("Setting default key...") logger.info("Setting default key...")
desfire.change_default_key(aes_null_key, 0x0) desfire.change_default_key(aes_null_key, 0x0)
# Create application # Create application
print("Creating application...") logger.info("Creating application...")
app_settings = KeySettings( app_settings = KeySettings(
settings=[ settings=[
DESFireKeySettings.KS_ALLOW_CHANGE_MK, DESFireKeySettings.KS_ALLOW_CHANGE_MK,
@@ -147,34 +181,34 @@ def WriteNewCard():
applications = desfire.get_application_ids() applications = desfire.get_application_ids()
assert len(applications) == 1 assert len(applications) == 1
assert applications[0] == get_list(MIFARE_APP_ID) assert applications[0] == get_list(MIFARE_APP_ID)
print(" - Application created successfully.") logger.info(" - Application created successfully.")
# Select application # Select application
desfire.select_application(MIFARE_APP_ID) desfire.select_application(MIFARE_APP_ID)
#Auth again as 0key
aes_null_auth_key = DESFireKey(aes_keysettings, AES_NULL_KEY_DATA)
desfire.authenticate(0x0, aes_null_auth_key)
# Authenticate with AES key, as this has been set as the default key
aes_app_mk = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
desfire.change_key(0x0, aes_null_key, aes_app_mk, 0x1)
print("new key auth") #recreate key object
desfire.authenticate(0x0, aes_app_mk) desfire.authenticate(0x0, aes_null_key)
desfire.change_key(0x0, aes_null_key, aes_master_key, 0x1)
logger.info("new key auth")
desfire.authenticate(0x0, aes_master_key)
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
#generate div data #generate div data
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID) diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False) read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
write_div_key_bytes = diversify_key(get_list(MIFARE_ACL_WRITE_BASE_KEY), diversification_data, pad_to_32=False) write_div_key_bytes = diversify_key(get_list(MIFARE_ACL_WRITE_BASE_KEY), diversification_data, pad_to_32=False)
print("Changing file read key...") logger.info("Changing file read key...")
aes_file_read_key = DESFireKey(aes_keysettings, read_div_key_bytes) aes_file_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
desfire.change_key(MIFARE_ACL_READ_BASE_KEY_ID, aes_null_key, aes_file_read_key, 0x1) desfire.change_key(MIFARE_ACL_READ_BASE_KEY_ID, aes_null_key, aes_file_read_key, 0x1)
print("Changing file write key...") logger.info("Changing file write key...")
aes_file_write_key = DESFireKey(aes_keysettings, write_div_key_bytes) aes_file_write_key = DESFireKey(aes_keysettings, write_div_key_bytes)
desfire.change_key(MIFARE_ACL_WRITE_BASE_KEY_ID, aes_null_key, aes_file_write_key, 0x1) desfire.change_key(MIFARE_ACL_WRITE_BASE_KEY_ID, aes_null_key, aes_file_write_key, 0x1)
print("Create encrypted file containing UUID...") logger.info("Create encrypted file containing key...")
file_settings = FileSettings( file_settings = FileSettings(
file_size=16, file_size=16,
encryption=DESFireCommunicationMode.ENCRYPTED, encryption=DESFireCommunicationMode.ENCRYPTED,
@@ -185,25 +219,16 @@ def WriteNewCard():
file_type=DESFireFileType.MDFT_STANDARD_DATA_FILE, file_type=DESFireFileType.MDFT_STANDARD_DATA_FILE,
) )
desfire.create_standard_file(MIFARE_ENCRYPTED_FILE_ID, file_settings) desfire.create_standard_file(MIFARE_ENCRYPTED_FILE_ID, file_settings)
print("Read and verify file settings again...")
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID) file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
assert file_data.file_size == 16
assert file_data.encryption == DESFireCommunicationMode.ENCRYPTED
assert file_data.permissions is not None
assert file_data.permissions.read_access == MIFARE_ACL_READ_BASE_KEY_ID
assert file_data.permissions.write_access == MIFARE_ACL_WRITE_BASE_KEY_ID
assert file_data.file_type == DESFireFileType.MDFT_STANDARD_DATA_FILE
print(" - File created successfully.")
print("Writing UID to encrypted file...") logger.info("Writing UID to encrypted file...")
key = secrets.token_hex(16) key = secrets.token_hex(16)
desfire.write_file_data(MIFARE_ENCRYPTED_FILE_ID, 0x0, file_data.encryption, get_list(key)) desfire.write_file_data(MIFARE_ENCRYPTED_FILE_ID, 0x0, file_data.encryption, get_list(key))
print("Reading from encrypted file...") logger.info("Reading from encrypted file...")
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data) rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
assert rdata == get_list(key) assert rdata == get_list(key)
print(" - Data written successfully.") logger.info(" - Data written successfully.")
scannerThread.start() scannerThread.start()
return key return key
@@ -238,12 +263,11 @@ class BackgroundScanner:
try: try:
card_content = self._read_card() card_content = self._read_card()
if card_content: if card_content:
logger.info(to_hex_string(card_content)) self._check_db(card_content)
time.sleep(5) time.sleep(5)
logger.debug("READY after success") logger.debug("READY after success")
#self._check_db(card_content)
else: else:
time.sleep(0.5) time.sleep(0.1)
logger.debug("READY after timout") logger.debug("READY after timout")
except Exception as e: except Exception as e:
@@ -251,53 +275,24 @@ class BackgroundScanner:
time.sleep(6) time.sleep(6)
def _read_card(self): def _read_card(self):
cardtype = AnyCardType() time.sleep(0.5)
cardrequest = CardRequest(timeout=5, cardType=cardtype)
try: try:
cardservice = cardrequest.waitforcard() cardservice = getCardService(3)
except CardRequestTimeoutException: except CardRequestTimeoutException:
logger.debug("No tag detected within the timeout.") logger.debug("No tag detected within the timeout.")
return return
try:
cardservice.connection.connect()
# Create Desfire object # Create Desfire object
desfire = DESFire(PCSCDevice(cardservice.connection.component)) desfire = DESFire(PCSCDevice(cardservice.connection.component))
aes_keysettings = KeySettings(
key_type=DESFireKeyType.DF_KEY_AES,
)
# Get real UID
mk = DESFireKey(desfire.get_key_setting(), "00" * 8)
desfire.authenticate(0x0, mk)
#To get the uid you have to auth with an empty (default) key
uid = desfire.get_real_uid()
applications = desfire.get_application_ids()
try: try:
assert len(applications) == 1 rdata = readFileOnCard(desfire=desfire)
assert applications[0] == get_list(MIFARE_APP_ID)
except AssertionError:
logger.error("No application found!")
time.sleep(4)
return None
#Then use the key derivation with that uid, the appid, the sysid
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
#Log in with derived read key
logger.info("Start auth")
aes_app_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
desfire.select_application(MIFARE_APP_ID)
desfire.authenticate(MIFARE_ACL_READ_BASE_KEY_ID, aes_app_read_key)
logger.info("Read data")
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
logger.info(f"File settings: {file_data}")
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
return rdata return rdata
except Exception as e: except Exception as e:
logger.error(f"something went wrong: {e}") logger.error(f"something went wrong: {e}")
time.sleep(5) time.sleep(5)
def _check_db(self, key):
check = checkAccess(key, self.db)
if check == True:
openDoor()
else:
logger.info("Access denied!")

View File

@@ -9,7 +9,6 @@ dependencies = [
"sqlmodel>=0.0.38", "sqlmodel>=0.0.38",
"poetry>=2.3.4", "poetry>=2.3.4",
"python-desfire", "python-desfire",
"paho-mqtt>=2.1.0",
"pyjwt[crypto]>=2.12.1", "pyjwt[crypto]>=2.12.1",
"pwdlib[argon2]>=0.3.0", "pwdlib[argon2]>=0.3.0",
"pytest>=9.0.3", "pytest>=9.0.3",

View File

@@ -4,13 +4,6 @@ def test_app_startup(client):
# Application should respond (even if it's a 404) # Application should respond (even if it's a 404)
assert response.status_code in [404, 200] assert response.status_code in [404, 200]
def test_health_check(client):
"""Test basic health check endpoint if it exists."""
# Note: This would require adding a health check endpoint
pass
def test_router_includes(): def test_router_includes():
"""Test that all routers are included in the app.""" """Test that all routers are included in the app."""
from app.main import app from app.main import app

View File

@@ -75,8 +75,9 @@ def test_assign_already_assigned_access_auth(client, auth_headers, test_group, t
f"/aa/assign/{test_group.id}/{test_aa.id}", f"/aa/assign/{test_group.id}/{test_aa.id}",
headers=auth_headers headers=auth_headers
) )
# According to the code, this returns 200 with "already assigned" message # According to the code, this returns 409 with "already assigned" message
assert response.status_code == 200 assert response.status_code == 409
assert "already assigned" in response.json()["detail"].lower()
def test_unassign_access_auth_from_group(client, auth_headers, test_group, test_aa): def test_unassign_access_auth_from_group(client, auth_headers, test_group, test_aa):
@@ -147,6 +148,11 @@ def test_update_access_auth_with_timetables(client, auth_headers, test_aa):
headers=auth_headers headers=auth_headers
) )
assert response.status_code == 200 assert response.status_code == 200
jresponse = response.json()
assert len(jresponse["timetables"]) == 1
assert jresponse["timetables"][0]["weekday"] == 5
assert jresponse["timetables"][0]["starttime"] == "10:00:00"
assert jresponse["timetables"][0]["duration"] == 120
def test_update_nonexistent_access_auth(client, auth_headers): def test_update_nonexistent_access_auth(client, auth_headers):

View File

@@ -131,7 +131,6 @@ def test_auth_is_admin(db_session, admin_user, regular_user):
def test_create_first_user(db_session): def test_create_first_user(db_session):
"""Test automatic creation of first admin user.""" """Test automatic creation of first admin user."""
#Currently broken because this uses the prod db because of how i wrote the create_first_user function
# Clear any existing users # Clear any existing users
from sqlmodel import select from sqlmodel import select
db_session.exec(select(UserDB)).all() db_session.exec(select(UserDB)).all()
@@ -140,7 +139,7 @@ def test_create_first_user(db_session):
db_session.commit() db_session.commit()
# Create first user # Create first user
result = create_first_user() result = create_first_user(db=db_session)
assert result is not None assert result is not None
assert result.name == "admin" assert result.name == "admin"
assert result.is_admin is True assert result.is_admin is True
@@ -151,7 +150,7 @@ def test_create_first_user(db_session):
assert user.is_admin is True assert user.is_admin is True
# Test that it doesn't create another admin if one exists # Test that it doesn't create another admin if one exists
second_result = create_first_user() second_result = create_first_user(db=db_session)
assert second_result is None # Should print "Admin user already exists" assert second_result is None # Should print "Admin user already exists"

View File

@@ -1,41 +1,6 @@
import pytest import pytest
from fastapi import status from fastapi import status
def test_add_card(client, auth_headers, test_group):
"""Test adding a card to a group."""
response = client.post(f"/cards/{test_group.id}", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "id" in data
assert "uuid" in data
assert data["group_id"] == test_group.id
assert len(data["uuid"]) > 0 # UUID should be generated
def test_add_card_to_nonexistent_group(client, auth_headers):
"""Test adding a card to a non-existent group."""
response = client.post("/cards/99999", headers=auth_headers)
# This might succeed and create a card with a non-existent group_id
# or fail depending on foreign key constraints
# For now, let's assume it might fail
# assert response.status_code == 404
def test_delete_card(client, auth_headers, test_card):
"""Test deleting a card."""
response = client.delete(f"/cards/{test_card.id}", headers=auth_headers)
assert response.status_code == 200
assert "deleted successfully" in response.json()["message"].lower()
def test_delete_nonexistent_card(client, auth_headers):
"""Test deleting a non-existent card."""
response = client.delete("/cards/99999", headers=auth_headers)
assert response.status_code == 404
def test_get_cards_for_group(client, auth_headers, test_group, test_card): def test_get_cards_for_group(client, auth_headers, test_group, test_card):
"""Test getting all cards for a group.""" """Test getting all cards for a group."""
response = client.get(f"/cards/{test_group.id}", headers=auth_headers) response = client.get(f"/cards/{test_group.id}", headers=auth_headers)

View File

@@ -0,0 +1,61 @@
import pytest
import datetime
from app.services.door import checkAccess
from app.model.models import Card, GroupDB, AccessAuthorizationDB, Timetable
def test_check_access_with_valid_timetable(db_session):
# Setup: create card with valid access
group = GroupDB(name="Test Group")
db_session.add(group)
db_session.commit()
card = Card(uuid="test-uuid-123", group_id=group.id)
db_session.add(card)
timetable = Timetable(
weekday=datetime.datetime.weekday(datetime.date.today()),
starttime=datetime.datetime.now().time(),
duration=120 # 2 hours
)
db_session.add(timetable)
aa = AccessAuthorizationDB(name="Test AA", is_active=True)
db_session.add(aa)
aa.timetables = [timetable]
group.accessauths = [aa]
db_session.commit()
# Test: access should be granted within time window
result = checkAccess("test-uuid-123", db_session)
assert result == True
def test_check_access_outside_hours(db_session):
# Test when current time is outside valid hours
group = GroupDB(name="Test Group")
db_session.add(group)
db_session.commit()
card = Card(uuid="test-uuid-123", group_id=group.id)
db_session.add(card)
timetable = Timetable(
weekday=datetime.datetime.weekday(datetime.date.today()),
starttime=datetime.time(1, 0),
duration=1 # 2 hours
)
db_session.add(timetable)
aa = AccessAuthorizationDB(name="Test AA", is_active=True)
db_session.add(aa)
aa.timetables = [timetable]
group.accessauths = [aa]
db_session.commit()
result = checkAccess("test-uuid-123", db_session)
assert result == False
def test_check_access_invalid_card(db_session):
# Should raise exception for non-existent card
with pytest.raises(Exception):
checkAccess("non-existent-uuid", db_session)

11
uv.lock generated
View File

@@ -609,7 +609,6 @@ version = "0.1.0"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "fastapi", extra = ["standard"] }, { name = "fastapi", extra = ["standard"] },
{ name = "paho-mqtt" },
{ name = "poetry" }, { name = "poetry" },
{ name = "pwdlib", extra = ["argon2"] }, { name = "pwdlib", extra = ["argon2"] },
{ name = "pyjwt", extra = ["crypto"] }, { name = "pyjwt", extra = ["crypto"] },
@@ -625,7 +624,6 @@ dependencies = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "fastapi", extras = ["standard"], specifier = ">=0.135.3" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.135.3" },
{ name = "paho-mqtt", specifier = ">=2.1.0" },
{ name = "poetry", specifier = ">=2.3.4" }, { name = "poetry", specifier = ">=2.3.4" },
{ name = "pwdlib", extras = ["argon2"], specifier = ">=0.3.0" }, { name = "pwdlib", extras = ["argon2"], specifier = ">=0.3.0" },
{ name = "pyjwt", extras = ["crypto"], specifier = ">=2.12.1" }, { name = "pyjwt", extras = ["crypto"], specifier = ">=2.12.1" },
@@ -952,15 +950,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/7a/c2/920ef838e2f0028c8262f16101ec09ebd5969864e5a64c4c05fad0617c56/packaging-26.1-py3-none-any.whl", hash = "sha256:5d9c0669c6285e491e0ced2eee587eaf67b670d94a19e94e3984a481aba6802f", size = 95831, upload-time = "2026-04-14T21:12:47.56Z" }, { url = "https://files.pythonhosted.org/packages/7a/c2/920ef838e2f0028c8262f16101ec09ebd5969864e5a64c4c05fad0617c56/packaging-26.1-py3-none-any.whl", hash = "sha256:5d9c0669c6285e491e0ced2eee587eaf67b670d94a19e94e3984a481aba6802f", size = 95831, upload-time = "2026-04-14T21:12:47.56Z" },
] ]
[[package]]
name = "paho-mqtt"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/39/15/0a6214e76d4d32e7f663b109cf71fb22561c2be0f701d67f93950cd40542/paho_mqtt-2.1.0.tar.gz", hash = "sha256:12d6e7511d4137555a3f6ea167ae846af2c7357b10bc6fa4f7c3968fc1723834", size = 148848, upload-time = "2024-04-29T19:52:55.591Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c4/cb/00451c3cf31790287768bb12c6bec834f5d292eaf3022afc88e14b8afc94/paho_mqtt-2.1.0-py3-none-any.whl", hash = "sha256:6db9ba9b34ed5bc6b6e3812718c7e06e2fd7444540df2455d2c51bd58808feee", size = 67219, upload-time = "2024-04-29T19:52:48.345Z" },
]
[[package]] [[package]]
name = "pbs-installer" name = "pbs-installer"
version = "2026.4.7" version = "2026.4.7"