Change all instances of print() to logger.info()
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from sqlmodel import Session, select
|
||||
from sqlalchemy.orm import selectinload
|
||||
@@ -12,7 +14,7 @@ aa_router = APIRouter(prefix="/aa", tags=["AccessAuth"])
|
||||
|
||||
@aa_router.post("/", response_model=AccessAuthorizationResponse)
|
||||
def add_accessauth(*, db: Session = Depends(get_session), aa: AccessAuthorizationCreate, admin: bool = Depends(auth_is_admin)):
|
||||
print("Creating accessauth with data: ", aa)
|
||||
logger.info("Creating accessauth with data: ", aa)
|
||||
timetables = [Timetable.model_validate(t) for t in aa.timetables]
|
||||
db_aa = AccessAuthorizationDB(
|
||||
name=aa.name,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from sqlmodel import Session, select
|
||||
from typing import List
|
||||
@@ -14,7 +16,7 @@ card_router = APIRouter(prefix="/cards", tags=["Card"])
|
||||
def register_card(group_id: int):
|
||||
key = WriteNewCard()
|
||||
if key == None:
|
||||
print("No card registered. Check logs!")
|
||||
logger.info("No card registered. Check logs!")
|
||||
raise HTTPException(status.HTTP_417_EXPECTATION_FAILED, detail="No card registered. Check logs!")
|
||||
card = Card(group_id=group_id, uuid=key)
|
||||
return card
|
||||
@@ -27,11 +29,11 @@ def add_card(*, db: Session = Depends(get_session), group_id: int, admin: bool =
|
||||
@card_router.get("/delete")
|
||||
def del_card(*, db: Session = Depends(get_session), admin: bool = Depends(auth_is_admin)):
|
||||
key = DeleteCard()
|
||||
print(key)
|
||||
logger.info(key)
|
||||
try:
|
||||
card = db.exec(select(Card).where(Card.uuid == key)).one()
|
||||
except NoResultFound:
|
||||
print(f"The key:'{key}' was not found in db!")
|
||||
logger.info(f"The key:'{key}' was not found in db!")
|
||||
raise HTTPException(status_code=500, detail="Key on card not found in DB. Please tell an admin about this. KEY={key}")
|
||||
db.delete(card)
|
||||
db.commit()
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
from fastapi import APIRouter, HTTPException, Depends
|
||||
from sqlmodel import Session, select
|
||||
from typing import List
|
||||
@@ -10,7 +12,7 @@ user_router = APIRouter(tags=["Users"])
|
||||
|
||||
@user_router.post("/users/", response_model=UserResponse)
|
||||
def create_user(*, db: Session = Depends(get_session), user: UserCreate, admin: bool = Depends(auth_is_admin)):
|
||||
print("creating user with data ", user)
|
||||
logger.info("creating user with data ", user)
|
||||
hashed_password = {"passwordhash": get_password_hash(user.password)}
|
||||
db_user = UserDB.model_validate(user, update=hashed_password)
|
||||
return add_and_refresh(db, db_user)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
from fastapi import FastAPI
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from contextlib import asynccontextmanager
|
||||
@@ -10,13 +12,15 @@ from app.services.scanner import BackgroundScanner
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
scanner = BackgroundScanner(db=get_db_session())
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
load_dotenv()
|
||||
create_db_and_tables()
|
||||
create_first_user(db=get_db_session())
|
||||
print("Database created and tables initialized.")
|
||||
logger.info("Database created and tables initialized.")
|
||||
scanner.start()
|
||||
yield
|
||||
#scanner.stop()
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
from typing import Annotated
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from fastapi import APIRouter, HTTPException, Depends, status
|
||||
@@ -84,18 +86,18 @@ def auth_is_admin(
|
||||
return True
|
||||
|
||||
def create_first_user(db: Session):
|
||||
print("Checking for admin user")
|
||||
logger.info("Checking for admin user")
|
||||
admin_user = db.exec(select(UserDB)).first()
|
||||
if admin_user is None:
|
||||
password = ''.join(secrets.choice(string.digits) for i in range(8))
|
||||
print("Creating first admin user with password", password)
|
||||
logger.info("Creating first admin user with password", password)
|
||||
user = UserDB(
|
||||
name="admin",
|
||||
passwordhash=get_password_hash(password),
|
||||
is_admin=True
|
||||
)
|
||||
return add_and_refresh(db, user)
|
||||
print(f"Admin user already exists: {admin_user.name}")
|
||||
logger.info(f"Admin user already exists: {admin_user.name}")
|
||||
|
||||
|
||||
@token_router.post("/token")
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
from sqlmodel import select
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from sqlalchemy.orm import selectinload
|
||||
@@ -14,13 +16,13 @@ doorIsOpen = True
|
||||
def openDoor():
|
||||
global doorIsOpen
|
||||
doorIsOpen = True
|
||||
print("Still needs gpio out")
|
||||
logger.info("Still needs gpio out")
|
||||
pass
|
||||
|
||||
def closeDoor():
|
||||
global doorIsOpen
|
||||
doorIsOpen = False
|
||||
print("Still needs gpio out")
|
||||
logger.info("Still needs gpio out")
|
||||
pass
|
||||
|
||||
def isDoorOpen():
|
||||
@@ -32,18 +34,18 @@ def checkAccess(uuid: str, db: Session):
|
||||
current_time = datetime.datetime.now()
|
||||
card = db.exec(select(Card).where(Card.uuid == uuid)).one()
|
||||
for auth in card.group.accessauths:
|
||||
print(f"checking auth: {auth.name}")
|
||||
logger.info(f"checking auth: {auth.name}")
|
||||
for timetable in auth.timetables:
|
||||
print(f" checking timetable {timetable.id}")
|
||||
print(f" comparing weekday: CUR:{current_weekday} TT:{timetable.weekday}")
|
||||
logger.info(f" checking timetable {timetable.id}")
|
||||
logger.info(f" comparing weekday: CUR:{current_weekday} TT:{timetable.weekday}")
|
||||
if current_weekday == timetable.weekday:
|
||||
starttime = datetime.datetime.combine(datetime.date.today(), timetable.starttime)
|
||||
endtime = starttime + datetime.timedelta(minutes=timetable.duration)
|
||||
print(f" comparing time: Start:{starttime} Current:{current_time} End:{endtime}")
|
||||
logger.info(f" comparing time: Start:{starttime} Current:{current_time} End:{endtime}")
|
||||
if starttime < current_time < endtime:
|
||||
print("Access Valid!")
|
||||
logger.info("Access Valid!")
|
||||
return True
|
||||
print("No more auths found")
|
||||
logger.info("No more auths found")
|
||||
return False
|
||||
except exc.NoResultFound:
|
||||
raise Exception("No Access with that key found, this might be a db error")
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
@@ -34,10 +36,6 @@ MIFARE_ACL_WRITE_BASE_KEY_ID = 0x2
|
||||
MIFARE_SYS_ID = "FF0000" # 3 bytes, can essentially be anything
|
||||
MIFARE_ENCRYPTED_FILE_ID = 0x1
|
||||
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def checkForKey():
|
||||
if MIFARE_APP_MASTER_KEY == None:
|
||||
logger.critical("NO MASTER KEY LOADED")
|
||||
@@ -156,18 +154,18 @@ def WriteNewCard():
|
||||
desKey = DESFireKey(desfire.get_key_setting(), "00" * 8)
|
||||
|
||||
# Authenticate with default DES key
|
||||
print("Authenticating with default DES key...")
|
||||
logger.info("Authenticating with default DES key...")
|
||||
desfire.authenticate(0x0, desKey)
|
||||
|
||||
#get uid
|
||||
uid = desfire.get_real_uid()
|
||||
|
||||
# Set default key
|
||||
print("Setting default key...")
|
||||
logger.info("Setting default key...")
|
||||
desfire.change_default_key(aes_null_key, 0x0)
|
||||
|
||||
# Create application
|
||||
print("Creating application...")
|
||||
logger.info("Creating application...")
|
||||
app_settings = KeySettings(
|
||||
settings=[
|
||||
DESFireKeySettings.KS_ALLOW_CHANGE_MK,
|
||||
@@ -183,7 +181,7 @@ def WriteNewCard():
|
||||
applications = desfire.get_application_ids()
|
||||
assert len(applications) == 1
|
||||
assert applications[0] == get_list(MIFARE_APP_ID)
|
||||
print(" - Application created successfully.")
|
||||
logger.info(" - Application created successfully.")
|
||||
|
||||
# Select application
|
||||
desfire.select_application(MIFARE_APP_ID)
|
||||
@@ -192,7 +190,7 @@ def WriteNewCard():
|
||||
desfire.authenticate(0x0, aes_null_key)
|
||||
desfire.change_key(0x0, aes_null_key, aes_master_key, 0x1)
|
||||
|
||||
print("new key auth")
|
||||
logger.info("new key auth")
|
||||
desfire.authenticate(0x0, aes_master_key)
|
||||
|
||||
aes_null_key = DESFireKey(aes_keysettings, "00" * 16)
|
||||
@@ -202,15 +200,15 @@ def WriteNewCard():
|
||||
read_div_key_bytes = diversify_key(get_list(MIFARE_ACL_READ_BASE_KEY), diversification_data, pad_to_32=False)
|
||||
write_div_key_bytes = diversify_key(get_list(MIFARE_ACL_WRITE_BASE_KEY), diversification_data, pad_to_32=False)
|
||||
|
||||
print("Changing file read key...")
|
||||
logger.info("Changing file read key...")
|
||||
aes_file_read_key = DESFireKey(aes_keysettings, read_div_key_bytes)
|
||||
desfire.change_key(MIFARE_ACL_READ_BASE_KEY_ID, aes_null_key, aes_file_read_key, 0x1)
|
||||
|
||||
print("Changing file write key...")
|
||||
logger.info("Changing file write key...")
|
||||
aes_file_write_key = DESFireKey(aes_keysettings, write_div_key_bytes)
|
||||
desfire.change_key(MIFARE_ACL_WRITE_BASE_KEY_ID, aes_null_key, aes_file_write_key, 0x1)
|
||||
|
||||
print("Create encrypted file containing key...")
|
||||
logger.info("Create encrypted file containing key...")
|
||||
file_settings = FileSettings(
|
||||
file_size=16,
|
||||
encryption=DESFireCommunicationMode.ENCRYPTED,
|
||||
@@ -223,14 +221,14 @@ def WriteNewCard():
|
||||
desfire.create_standard_file(MIFARE_ENCRYPTED_FILE_ID, file_settings)
|
||||
file_data = desfire.get_file_settings(MIFARE_ENCRYPTED_FILE_ID)
|
||||
|
||||
print("Writing UID to encrypted file...")
|
||||
logger.info("Writing UID to encrypted file...")
|
||||
key = secrets.token_hex(16)
|
||||
desfire.write_file_data(MIFARE_ENCRYPTED_FILE_ID, 0x0, file_data.encryption, get_list(key))
|
||||
|
||||
print("Reading from encrypted file...")
|
||||
logger.info("Reading from encrypted file...")
|
||||
rdata = desfire.read_file_data(MIFARE_ENCRYPTED_FILE_ID, file_data)
|
||||
assert rdata == get_list(key)
|
||||
print(" - Data written successfully.")
|
||||
logger.info(" - Data written successfully.")
|
||||
scannerThread.start()
|
||||
return key
|
||||
|
||||
@@ -297,4 +295,4 @@ class BackgroundScanner:
|
||||
if check == True:
|
||||
openDoor()
|
||||
else:
|
||||
print("Access denied!")
|
||||
logger.info("Access denied!")
|
||||
|
||||
Reference in New Issue
Block a user