197 lines
6.3 KiB
Python
197 lines
6.3 KiB
Python
import pytest
|
|
from datetime import datetime, timedelta, timezone
|
|
from fastapi import HTTPException, status
|
|
from app.services.auth import (
|
|
verify_password, get_password_hash, get_user, authenticate_user,
|
|
create_access_token, get_current_user, auth_is_admin, create_first_user
|
|
)
|
|
from app.model.models import UserDB
|
|
from jwt.exceptions import InvalidTokenError
|
|
|
|
|
|
def test_password_hashing():
|
|
"""Test password hashing and verification."""
|
|
password = "test_password_123"
|
|
|
|
# Hash password
|
|
hashed = get_password_hash(password)
|
|
assert hashed != password
|
|
assert len(hashed) > 0
|
|
|
|
# Verify correct password
|
|
assert verify_password(password, hashed) is True
|
|
|
|
# Verify incorrect password
|
|
assert verify_password("wrong_password", hashed) is False
|
|
|
|
|
|
def test_get_user(db_session):
|
|
"""Test get_user function."""
|
|
from app.services.auth import get_password_hash
|
|
|
|
# Create a user
|
|
user = UserDB(name="testuser", passwordhash=get_password_hash("password"))
|
|
db_session.add(user)
|
|
db_session.commit()
|
|
|
|
# Get existing user
|
|
retrieved_user = get_user(db_session, "testuser")
|
|
assert retrieved_user is not None
|
|
assert retrieved_user.name == "testuser"
|
|
|
|
# Try to get non-existent user
|
|
retrieved_user = get_user(db_session, "nonexistent")
|
|
assert retrieved_user is None
|
|
|
|
|
|
def test_authenticate_user(db_session):
|
|
"""Test user authentication."""
|
|
from app.services.auth import get_password_hash
|
|
|
|
# Create a user
|
|
user = UserDB(name="authuser", passwordhash=get_password_hash("correctpass"))
|
|
db_session.add(user)
|
|
db_session.commit()
|
|
|
|
# Authenticate with correct credentials
|
|
authenticated = authenticate_user(db_session, "authuser", "correctpass")
|
|
assert authenticated is not False
|
|
assert authenticated.name == "authuser"
|
|
|
|
# Authenticate with wrong password
|
|
authenticated = authenticate_user(db_session, "authuser", "wrongpass")
|
|
assert authenticated is False
|
|
|
|
# Authenticate non-existent user
|
|
authenticated = authenticate_user(db_session, "nonexistent", "password")
|
|
assert authenticated is False
|
|
|
|
|
|
def test_create_access_token():
|
|
"""Test JWT token creation."""
|
|
data = {"sub": "testuser"}
|
|
|
|
# Create token with default expiration
|
|
token = create_access_token(data)
|
|
assert isinstance(token, str)
|
|
assert len(token) > 0
|
|
|
|
# Create token with custom expiration
|
|
custom_expire = timedelta(hours=1)
|
|
token = create_access_token(data, expires_delta=custom_expire)
|
|
assert isinstance(token, str)
|
|
|
|
|
|
def test_get_current_user(db_session, admin_user):
|
|
"""Test getting current user from token."""
|
|
from app.services.auth import create_access_token, get_current_user
|
|
|
|
# Create token for admin user
|
|
token = create_access_token(data={"sub": admin_user.name})
|
|
|
|
# Get user from token
|
|
user = get_current_user(token=token, db=db_session)
|
|
assert user is not None
|
|
assert user.name == admin_user.name
|
|
assert user.id == admin_user.id
|
|
|
|
# Test invalid token
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
get_current_user(token="invalid_token")
|
|
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
# Test expired token (create token with past expiration)
|
|
past_expire = timedelta(minutes=-100)
|
|
expired_token = create_access_token(data={"sub": admin_user.name}, expires_delta=past_expire)
|
|
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
get_current_user(token=expired_token)
|
|
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
|
|
def test_auth_is_admin(db_session, admin_user, regular_user):
|
|
"""Test admin authorization check."""
|
|
from app.services.auth import create_access_token, auth_is_admin
|
|
|
|
# Create token for admin user
|
|
admin_token = create_access_token(data={"sub": admin_user.name})
|
|
|
|
# Admin should pass
|
|
result = auth_is_admin(token=admin_token, db=db_session)
|
|
assert result is True
|
|
|
|
# Create token for regular user
|
|
user_token = create_access_token(data={"sub": regular_user.name})
|
|
|
|
# Regular user should fail
|
|
with pytest.raises(HTTPException) as exc_info:
|
|
auth_is_admin(token=user_token, db=db_session)
|
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
|
|
def test_create_first_user(db_session):
|
|
"""Test automatic creation of first admin user."""
|
|
#Currently broken because this uses the prod db because of how i wrote the create_first_user function
|
|
# Clear any existing users
|
|
from sqlmodel import select
|
|
db_session.exec(select(UserDB)).all()
|
|
for user in db_session.exec(select(UserDB)).all():
|
|
db_session.delete(user)
|
|
db_session.commit()
|
|
|
|
# Create first user
|
|
result = create_first_user()
|
|
assert result is not None
|
|
assert result.name == "admin"
|
|
assert result.is_admin is True
|
|
|
|
# Verify user exists in database
|
|
user = db_session.exec(select(UserDB).where(UserDB.name == "admin")).first()
|
|
assert user is not None
|
|
assert user.is_admin is True
|
|
|
|
# Test that it doesn't create another admin if one exists
|
|
second_result = create_first_user()
|
|
assert second_result is None # Should print "Admin user already exists"
|
|
|
|
|
|
def test_token_endpoint(client, admin_user):
|
|
"""Test the token endpoint for login."""
|
|
# Test successful login
|
|
response = client.post(
|
|
"/token",
|
|
data={"username": admin_user.name, "password": "admin123"}
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
assert data["token_type"] == "bearer"
|
|
|
|
# Test failed login with wrong password
|
|
response = client.post(
|
|
"/token",
|
|
data={"username": admin_user.name, "password": "wrongpassword"}
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
# Test failed login with non-existent user
|
|
response = client.post(
|
|
"/token",
|
|
data={"username": "nonexistent", "password": "password"}
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
def test_test_login_endpoint(client, admin_user, auth_headers):
|
|
"""Test the test login endpoint."""
|
|
# Test with valid token
|
|
response = client.get("/test/login", headers=auth_headers)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["name"] == admin_user.name
|
|
assert data["is_admin"] is True
|
|
|
|
# Test without token
|
|
response = client.get("/test/login")
|
|
assert response.status_code == 401
|