Files
gatekeeper/app/services/scanner.py

292 lines
11 KiB
Python

import logging
import threading
import time
import os
import secrets
from typing import Optional
from sqlmodel import Session
from dotenv import load_dotenv
from fastapi import HTTPException, status
from smartcard.CardRequest import CardRequest
from smartcard.CardType import AnyCardType
from smartcard.Exceptions import CardRequestTimeoutException
from desfire import DESFire, DESFireKey, PCSCDevice, diversify_key, get_list, to_hex_string
from desfire.enums import DESFireCommunicationMode, DESFireFileType, DESFireKeySettings, DESFireKeyType
from desfire.schemas import FilePermissions, FileSettings, KeySettings
import desfire.exceptions as desExceptions
#ENV vars
load_dotenv()
MIFARE_APP_MASTER_KEY = os.getenv('MIFARE_APP_MASTER_KEY')
MIFARE_ACL_READ_BASE_KEY = os.getenv('MIFARE_ACL_READ_BASE_KEY')
MIFARE_ACL_WRITE_BASE_KEY = os.getenv('MIFARE_ACL_WRITE_BASE_KEY')
# Constants
MIFARE_APP_ID = "DEAFFE" # 7 bytes
MIFARE_ACL_READ_BASE_KEY_ID = 0x1
MIFARE_ACL_WRITE_BASE_KEY_ID = 0x2
MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything
MIFARE_ENCRYPTED_FILE_ID = 0x1
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def checkForKey():
if MIFARE_APP_MASTER_KEY == None:
logger.critical("NO MASTER KEY LOADED")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="No key loaded! Check application.")
def getCardService(timeout: int = 10):
cardtype = AnyCardType()
cardrequest = CardRequest(timeout=timeout, cardType=cardtype)
cardservice = cardrequest.waitforcard()
cardservice.connection.connect()
return cardservice
def readFileOnCard(desfire: DESFire):
#create keys
#desfire = DESFire(PCSCDevice(cardservice.connection.component))
aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
keysettings = desfire.get_key_setting()
desKey = DESFireKey(keysettings, "00" * 8)
# Get real UID
desfire.authenticate(0x0, desKey)
#To get the uid you have to auth with an empty (default) key
uid = desfire.get_real_uid()
applications = desfire.get_application_ids()
try:
assert len(applications) == 1
assert applications[0] == get_list(MIFARE_APP_ID)
except AssertionError:
logger.error("No application found!")
time.sleep(4)
return
#Then use the key derivation with that uid, the appid, the sysid
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
#Log in with derived read key
logger.info("Start auth")
aes_app_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
desfire.select_application(MIFARE_APP_ID)
desfire.authenticate(MIFARE_ACL_READ_BASE_KEY_ID, aes_app_read_key)
logger.info("Read data")
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
#convert list of int to str
rdata = to_hex_string(rdata).replace(" ", "").lower()
logger.info(f"Data on card: {rdata}")
return rdata
def DeleteCard():
try:
checkForKey()
from app.main import scanner as scannerThread
scannerThread.stop()
cardservice = getCardService(15)
# Create Desfire object
desfire = DESFire(PCSCDevice(cardservice.connection.component))
rdata = readFileOnCard(desfire=desfire)
# Create Key objects
aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
des_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_2K3DES)
desKey = DESFireKey(des_keysettings, "00" * 8)
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
desfire.select_application(0x0)
try:
try:
logger.info("Auth1")
desfire.authenticate(0x0, desKey)
except:
logger.info("Auth2")
desfire.authenticate(0x0, aes_null_key)
except:
logger.info("Auth3")
desfire.authenticate(0x0, aes_master_key)
applications = desfire.get_application_ids()
logger.debug(f"Applications: {applications}")
if len(applications) == 0:
raise HTTPException(status_code=status.HTTP_410_GONE, detail="No applications on card")
desfire.select_application(MIFARE_APP_ID)
desfire.authenticate(0x0, aes_master_key)
try:
desfire.delete_application(MIFARE_APP_ID)
logger.info("App deleted!")
except Exception:
pass
scannerThread.start()
return rdata
except(Exception, AssertionError) as e:
logger.error(f"Error in deletion function: {e}", exc_info=True)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error: {e}")
def WriteNewCard():
try:
checkForKey()
from app.main import scanner as scannerThread
scannerThread.stop()
cardservice = getCardService(20)
desfire = DESFire(PCSCDevice(cardservice.connection.component))
# Create Key objects
aes_keysettings = KeySettings(key_type=DESFireKeyType.DF_KEY_AES)
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
aes_master_key = DESFireKey(aes_keysettings, MIFARE_APP_MASTER_KEY)
desKey = DESFireKey(desfire.get_key_setting(), "00" * 8)
# Authenticate with default DES key
print("Authenticating with default DES key...")
desfire.authenticate(0x0, desKey)
#get uid
uid = desfire.get_real_uid()
# Set default key
print("Setting default key...")
desfire.change_default_key(aes_null_key, 0x0)
# Create application
print("Creating application...")
app_settings = KeySettings(
settings=[
DESFireKeySettings.KS_ALLOW_CHANGE_MK,
DESFireKeySettings.KS_LISTING_WITHOUT_MK,
DESFireKeySettings.KS_CREATE_DELETE_WITHOUT_MK,
DESFireKeySettings.KS_CONFIGURATION_CHANGEABLE,
],
key_type=DESFireKeyType.DF_KEY_AES,
)
desfire.create_application(MIFARE_APP_ID, app_settings, 4)
# Verify application creation
applications = desfire.get_application_ids()
assert len(applications) == 1
assert applications[0] == get_list(MIFARE_APP_ID)
print(" - Application created successfully.")
# Select application
desfire.select_application(MIFARE_APP_ID)
#recreate key object
desfire.authenticate(0x0, aes_null_key)
desfire.change_key(0x0, aes_null_key, aes_master_key, 0x1)
print("new key auth")
desfire.authenticate(0x0, aes_master_key)
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
#generate div data
diversification_data = [0x01] + uid + get_list(MIFARE_APP_ID) + get_list(MIFARE_SYS_ID)
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
write_div_key_bytes = diversify_key(get_list(MIFARE_ACL_WRITE_BASE_KEY), diversification_data, pad_to_32=False)
print("Changing file read key...")
aes_file_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
desfire.change_key(MIFARE_ACL_READ_BASE_KEY_ID, aes_null_key, aes_file_read_key, 0x1)
print("Changing file write key...")
aes_file_write_key = DESFireKey(aes_keysettings, write_div_key_bytes)
desfire.change_key(MIFARE_ACL_WRITE_BASE_KEY_ID, aes_null_key, aes_file_write_key, 0x1)
print("Create encrypted file containing key...")
file_settings = FileSettings(
file_size=16,
encryption=DESFireCommunicationMode.ENCRYPTED,
permissions=FilePermissions(
read_key=MIFARE_ACL_READ_BASE_KEY_ID,
write_key=MIFARE_ACL_WRITE_BASE_KEY_ID,
),
file_type=DESFireFileType.MDFT_STANDARD_DATA_FILE,
)
desfire.create_standard_file(MIFARE_ENCRYPTED_FILE_ID, file_settings)
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
print("Writing UID to encrypted file...")
key = secrets.token_hex(16)
desfire.write_file_data(MIFARE_ENCRYPTED_FILE_ID, 0x0, file_data.encryption, get_list(key))
print("Reading from encrypted file...")
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
assert rdata == get_list(key)
print(" - Data written successfully.")
scannerThread.start()
return key
except Exception as e:
logger.error(f"Error in write function: {e}", exc_info=True)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error: {e}")
class BackgroundScanner:
def __init__(self, db):
self.db = db
self.is_running = False
self.thread: Optional[threading.Thread] = None
def start(self):
if self.is_running:
logger.info("Scanner already running")
return
self.is_running = True
self.thread = threading.Thread(target=self._scan_loop, daemon=True)
self.thread.start()
logger.info("Scanner started")
def stop(self):
self.is_running = False
if self.thread:
self.thread.join()
logger.info("Scanner stopped")
def _scan_loop(self):
while self.is_running:
try:
card_content = self._read_card()
if card_content:
logger.info(f"content: {card_content}")
time.sleep(5)
logger.debug("READY after success")
#self._check_db(card_content)
else:
time.sleep(0.5)
logger.debug("READY after timout")
except Exception as e:
logger.error(f"Error in scan function: {e}", exc_info=True)
time.sleep(6)
def _read_card(self):
time.sleep(0.5)
try:
cardservice = getCardService(3)
except CardRequestTimeoutException:
logger.debug("No tag detected within the timeout.")
return
# Create Desfire object
desfire = DESFire(PCSCDevice(cardservice.connection.component))
try:
rdata = readFileOnCard(desfire=desfire)
return rdata
except Exception as e:
logger.error(f"something went wrong: {e}")
time.sleep(5)