from typing import Annotated from datetime import datetime, timedelta, timezone from fastapi import APIRouter, HTTPException, Depends, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from sqlmodel import Session, select from pwdlib import PasswordHash import jwt from jwt.exceptions import InvalidTokenError from ..model.models import UserDB, Token, TokenData, UserCreate from ..services.database import * import secrets, string SECRET_KEY = "8b14d0b447bff7efa24d5019cc59a999786e31f6f865173bbd642bf18de5ad85" #Encrypt and change later or store in env file or somehthing ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") token_router = APIRouter(tags=["Token"]) password_hash = PasswordHash.recommended() def verify_password(plain_password, hashed_password): return password_hash.verify(plain_password, hashed_password) def get_password_hash(password): return password_hash.hash(password) def get_user(db, username: str): user = db.exec(select(UserDB).where(UserDB.name == username)).first() if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Username not found in get_user, this shouldn't happen") return user def authenticate_user(db, username: str, password: str): user = get_user(db, username) if not user: return False if not verify_password(password, user.passwordhash): return False return user def create_access_token(data: dict, expires_delta: timedelta | None = None): to_encode = data.copy() if expires_delta: expire = datetime.now(timezone.utc) + expires_delta else: expire = datetime.now(timezone.utc) + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]): with Session(engine) as db: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"} ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except InvalidTokenError: raise credentials_exception user = get_user(db, username=token_data.username) if user is None: raise credentials_exception return user def auth_is_admin(token: str = Depends(oauth2_scheme)): user = get_current_user(token=token) if not user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized to perform this action", headers={"WWW-Authenticate": "Bearer"} ) return True def create_first_user(): print("Checking for admin user") with Session(engine) as db: admin_user = db.exec(select(UserDB)).first() if admin_user is None: password = ''.join(secrets.choice(string.digits) for i in range(8)) print("Creating first admin user with password", password) user = UserDB( name="admin", passwordhash=get_password_hash(password), is_admin=True ) return add_and_refresh(db, user) print(f"Admin user already exists: {admin_user.name}") @token_router.post("/token") def login_for_access_token( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Session = Depends(get_session) ) -> Token: user = authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or pw", headers={"WWW-Authenticate": "Bearer"} ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.name}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") @token_router.get("/test/login") def test_login( current_user: Annotated[UserDB, Depends(get_current_user)] ) -> UserDB: return current_user