import pytest from datetime import datetime, timedelta, timezone from fastapi import HTTPException, status from app.services.auth import ( verify_password, get_password_hash, get_user, authenticate_user, create_access_token, get_current_user, auth_is_admin, create_first_user ) from app.model.models import UserDB from jwt.exceptions import InvalidTokenError def test_password_hashing(): """Test password hashing and verification.""" password = "test_password_123" # Hash password hashed = get_password_hash(password) assert hashed != password assert len(hashed) > 0 # Verify correct password assert verify_password(password, hashed) is True # Verify incorrect password assert verify_password("wrong_password", hashed) is False def test_get_user(db_session): """Test get_user function.""" from app.services.auth import get_password_hash # Create a user user = UserDB(name="testuser", passwordhash=get_password_hash("password")) db_session.add(user) db_session.commit() # Get existing user retrieved_user = get_user(db_session, "testuser") assert retrieved_user is not None assert retrieved_user.name == "testuser" # Try to get non-existent user with pytest.raises(HTTPException) as exc_info: get_user(db_session, "nonexistent") assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND def test_authenticate_user(db_session): """Test user authentication.""" from app.services.auth import get_password_hash # Create a user user = UserDB(name="authuser", passwordhash=get_password_hash("correctpass")) db_session.add(user) db_session.commit() # Authenticate with correct credentials authenticated = authenticate_user(db_session, "authuser", "correctpass") assert authenticated is not False assert authenticated.name == "authuser" # Authenticate with wrong password authenticated = authenticate_user(db_session, "authuser", "wrongpass") assert authenticated is False # Authenticate non-existent user authenticated = authenticate_user(db_session, "nonexistent", "password") assert authenticated is False def test_create_access_token(): """Test JWT token creation.""" data = {"sub": "testuser"} # Create token with default expiration token = create_access_token(data) assert isinstance(token, str) assert len(token) > 0 # Create token with custom expiration custom_expire = timedelta(hours=1) token = create_access_token(data, expires_delta=custom_expire) assert isinstance(token, str) def test_get_current_user(db_session, admin_user): """Test getting current user from token.""" from app.services.auth import create_access_token, get_current_user # Create token for admin user token = create_access_token(data={"sub": admin_user.name}) # Get user from token user = get_current_user(token=token) assert user is not None assert user.name == admin_user.name assert user.id == admin_user.id # Test invalid token with pytest.raises(HTTPException) as exc_info: get_current_user(token="invalid_token") assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED # Test expired token (create token with past expiration) past_expire = timedelta(minutes=-100) expired_token = create_access_token(data={"sub": admin_user.name}, expires_delta=past_expire) with pytest.raises(HTTPException) as exc_info: get_current_user(token=expired_token) assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED def test_auth_is_admin(db_session, admin_user, regular_user): """Test admin authorization check.""" from app.services.auth import create_access_token, auth_is_admin # Create token for admin user admin_token = create_access_token(data={"sub": admin_user.name}) # Admin should pass result = auth_is_admin(token=admin_token) assert result is True # Create token for regular user user_token = create_access_token(data={"sub": regular_user.name}) # Regular user should fail with pytest.raises(HTTPException) as exc_info: auth_is_admin(token=user_token) assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN def test_create_first_user(db_session): """Test automatic creation of first admin user.""" # Clear any existing users db_session.exec(select(UserDB)).all() for user in db_session.exec(select(UserDB)).all(): db_session.delete(user) db_session.commit() # Create first user result = create_first_user() assert result is not None assert result.name == "admin" assert result.is_admin is True # Verify user exists in database user = db_session.exec(select(UserDB).where(UserDB.name == "admin")).first() assert user is not None assert user.is_admin is True # Test that it doesn't create another admin if one exists second_result = create_first_user() assert second_result is None # Should print "Admin user already exists" def test_token_endpoint(client, admin_user): """Test the token endpoint for login.""" # Test successful login response = client.post( "/token", data={"username": admin_user.name, "password": "admin123"} ) assert response.status_code == 200 data = response.json() assert "access_token" in data assert data["token_type"] == "bearer" # Test failed login with wrong password response = client.post( "/token", data={"username": admin_user.name, "password": "wrongpassword"} ) assert response.status_code == 401 # Test failed login with non-existent user response = client.post( "/token", data={"username": "nonexistent", "password": "password"} ) assert response.status_code == 401 def test_test_login_endpoint(client, admin_user, auth_headers): """Test the test login endpoint.""" # Test with valid token response = client.get("/test/login", headers=auth_headers) assert response.status_code == 200 data = response.json() assert data["name"] == admin_user.name assert data["is_admin"] is True # Test without token response = client.get("/test/login") assert response.status_code == 401