From a8b5bea8cb39664cdf65757dfc6104c74ad72b30 Mon Sep 17 00:00:00 2001 From: ahtlon Date: Thu, 21 May 2026 19:20:07 +0200 Subject: [PATCH] Implement background scanner for now just outputs uid as configured by test.py --- app/main.py | 6 ++- app/services/scanner.py | 107 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 app/services/scanner.py diff --git a/app/main.py b/app/main.py index 5af8547..9f6fde7 100644 --- a/app/main.py +++ b/app/main.py @@ -4,8 +4,9 @@ from contextlib import asynccontextmanager from dotenv import load_dotenv from .controllers import userManager, cardManager, groupManager, aaManager, doorManager -from .services.database import create_db_and_tables +from .services.database import create_db_and_tables, get_session from .services.auth import token_router, create_first_user +from app.services.scanner import BackgroundScanner oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") @@ -15,7 +16,10 @@ async def lifespan(app: FastAPI): create_db_and_tables() create_first_user() print("Database created and tables initialized.") + scanner = BackgroundScanner(db=get_session()) + scanner.start() yield + scanner.stop() app = FastAPI(lifespan=lifespan) diff --git a/app/services/scanner.py b/app/services/scanner.py new file mode 100644 index 0000000..3e20084 --- /dev/null +++ b/app/services/scanner.py @@ -0,0 +1,107 @@ +import logging +import threading +import time +import os + +from typing import Optional +from sqlmodel import Session +from dotenv import load_dotenv + +from smartcard.CardRequest import CardRequest +from smartcard.CardType import AnyCardType +from smartcard.Exceptions import CardRequestTimeoutException + +from desfire import DESFire, DESFireKey, PCSCDevice, diversify_key, get_list, to_hex_string +from desfire.enums import DESFireCommunicationMode, DESFireFileType, DESFireKeySettings, DESFireKeyType +from desfire.schemas import FilePermissions, FileSettings, KeySettings + +#ENV vars +load_dotenv() +MIFARE_APP_MASTER_KEY = os.getenv('MIFARE_APP_MASTER_KEY') +MIFARE_ACL_READ_BASE_KEY = os.getenv('MIFARE_ACL_READ_BASE_KEY') +MIFARE_ACL_WRITE_BASE_KEY = os.getenv('MIFARE_ACL_WRITE_BASE_KEY') + +# Constants +MIFARE_APP_ID = "DEAFFE" # 7 bytes +MIFARE_ACL_READ_BASE_KEY_ID = 0x1 +MIFARE_ACL_WRITE_BASE_KEY_ID = 0x2 +MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything +MIFARE_ENCRYPTED_FILE_ID = 0x1 + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +if MIFARE_APP_MASTER_KEY == None: + logger.critical("NO MASTER KEY LOADED") + exit(1) + +class BackgroundScanner: + def __init__(self, db): + self.db = db + self.is_running = False + self.thread: Optional[threading.Thread] = None + + def start(self): + if self.is_running: + logger.info("Scanner already running") + return + self.is_running = True + self.thread = threading.Thread(target=self._scan_loop, daemon=True) + self.thread.start() + logger.info("Scanner started") + + def stop(self): + self.is_running = False + if self.thread: + self.thread.join() + logger.info("Scanner stopped") + + def _scan_loop(self): + while self.is_running: + try: + card_content = self._read_card() + if card_content: + logger.info(to_hex_string(card_content)) + time.sleep(5) + logger.debug("READY after success") + #self._check_db(card_content) + else: + time.sleep(0.5) + logger.debug("READY after timout") + + except Exception as e: + logger.error(f"Error in scan function: {e}", exc_info=True) + time.sleep(6) + + def _read_card(self): + cardtype = AnyCardType() + cardrequest = CardRequest(timeout=5, cardType=cardtype) + try: + cardservice = cardrequest.waitforcard() + except CardRequestTimeoutException: + logger.debug("No tag detected within the timeout.") + return + + cardservice.connection.connect() + # Create Desfire object + desfire = DESFire(PCSCDevice(cardservice.connection.component)) + aes_keysettings = KeySettings( + key_type=DESFireKeyType.DF_KEY_AES, + ) + + #Auth + logger.info("Start auth") + aes_app_mk = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY) + + apps = desfire.get_application_ids() + logger.info(f"app id's are: {apps}") + desfire.select_application(MIFARE_APP_ID) + + desfire.authenticate(0x0, aes_app_mk) + + logger.info("Read data") + file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID) + logger.info(f"File settings: {file_data}") + rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data) + return rdata \ No newline at end of file