Files
gatekeeper/app/services/scanner.py
ahtlon a8b5bea8cb Implement background scanner
for now just outputs uid as configured by test.py
2026-05-21 19:20:07 +02:00

107 lines
3.4 KiB
Python

import logging
import threading
import time
import os
from typing import Optional
from sqlmodel import Session
from dotenv import load_dotenv
from smartcard.CardRequest import CardRequest
from smartcard.CardType import AnyCardType
from smartcard.Exceptions import CardRequestTimeoutException
from desfire import DESFire, DESFireKey, PCSCDevice, diversify_key, get_list, to_hex_string
from desfire.enums import DESFireCommunicationMode, DESFireFileType, DESFireKeySettings, DESFireKeyType
from desfire.schemas import FilePermissions, FileSettings, KeySettings
#ENV vars
load_dotenv()
MIFARE_APP_MASTER_KEY = os.getenv('MIFARE_APP_MASTER_KEY')
MIFARE_ACL_READ_BASE_KEY = os.getenv('MIFARE_ACL_READ_BASE_KEY')
MIFARE_ACL_WRITE_BASE_KEY = os.getenv('MIFARE_ACL_WRITE_BASE_KEY')
# Constants
MIFARE_APP_ID = "DEAFFE" # 7 bytes
MIFARE_ACL_READ_BASE_KEY_ID = 0x1
MIFARE_ACL_WRITE_BASE_KEY_ID = 0x2
MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything
MIFARE_ENCRYPTED_FILE_ID = 0x1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
if MIFARE_APP_MASTER_KEY == None:
logger.critical("NO MASTER KEY LOADED")
exit(1)
class BackgroundScanner:
def __init__(self, db):
self.db = db
self.is_running = False
self.thread: Optional[threading.Thread] = None
def start(self):
if self.is_running:
logger.info("Scanner already running")
return
self.is_running = True
self.thread = threading.Thread(target=self._scan_loop, daemon=True)
self.thread.start()
logger.info("Scanner started")
def stop(self):
self.is_running = False
if self.thread:
self.thread.join()
logger.info("Scanner stopped")
def _scan_loop(self):
while self.is_running:
try:
card_content = self._read_card()
if card_content:
logger.info(to_hex_string(card_content))
time.sleep(5)
logger.debug("READY after success")
#self._check_db(card_content)
else:
time.sleep(0.5)
logger.debug("READY after timout")
except Exception as e:
logger.error(f"Error in scan function: {e}", exc_info=True)
time.sleep(6)
def _read_card(self):
cardtype = AnyCardType()
cardrequest = CardRequest(timeout=5, cardType=cardtype)
try:
cardservice = cardrequest.waitforcard()
except CardRequestTimeoutException:
logger.debug("No tag detected within the timeout.")
return
cardservice.connection.connect()
# Create Desfire object
desfire = DESFire(PCSCDevice(cardservice.connection.component))
aes_keysettings = KeySettings(
key_type=DESFireKeyType.DF_KEY_AES,
)
#Auth
logger.info("Start auth")
aes_app_mk = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
apps = desfire.get_application_ids()
logger.info(f"app id's are: {apps}")
desfire.select_application(MIFARE_APP_ID)
desfire.authenticate(0x0, aes_app_mk)
logger.info("Read data")
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
logger.info(f"File settings: {file_data}")
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
return rdata