diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 0000000..f572437 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,126 @@ +import pytest +from fastapi.testclient import TestClient +from sqlmodel import Session, create_engine, SQLModel +from sqlalchemy.orm import sessionmaker + +from app.main import app +from app.model.models import UserDB, Card, GroupDB, AccessAuthorizationDB, Timetable, AaGroupLink +from app.services.database import get_session + +# Use in-memory SQLite for testing +TEST_SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:" + +engine = create_engine(TEST_SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) +TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + +@pytest.fixture(scope="function") +def db_session(): + """Create a fresh database session for each test.""" + SQLModel.metadata.create_all(engine) + session = TestingSessionLocal() + try: + yield session + finally: + session.close() + SQLModel.metadata.drop_all(engine) + + +@pytest.fixture(scope="function") +def client(db_session): + """Create a test client with a database session override.""" + def override_get_session(): + try: + yield db_session + finally: + pass + + app.dependency_overrides[get_session] = override_get_session + with TestClient(app) as test_client: + yield test_client + app.dependency_overrides.clear() + + +@pytest.fixture +def admin_user(db_session): + """Create an admin user for testing.""" + from app.services.auth import get_password_hash + admin = UserDB( + name="admin", + passwordhash=get_password_hash("admin123"), + is_admin=True + ) + db_session.add(admin) + db_session.commit() + db_session.refresh(admin) + return admin + + +@pytest.fixture +def regular_user(db_session): + """Create a regular user for testing.""" + from app.services.auth import get_password_hash + user = UserDB( + name="user", + passwordhash=get_password_hash("user123"), + is_admin=False + ) + db_session.add(user) + db_session.commit() + db_session.refresh(user) + return user + + +@pytest.fixture +def auth_headers(client, admin_user): + """Get authentication headers for admin user.""" + response = client.post( + "/token", + data={"username": admin_user.name, "password": "admin123"} + ) + token = response.json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def user_auth_headers(client, regular_user): + """Get authentication headers for regular user.""" + response = client.post( + "/token", + data={"username": regular_user.name, "password": "user123"} + ) + token = response.json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def test_group(db_session): + """Create a test group.""" + group = GroupDB(name="Test Group") + db_session.add(group) + db_session.commit() + db_session.refresh(group) + return group + + +@pytest.fixture +def test_card(db_session, test_group): + """Create a test card.""" + card = Card(uuid="test-uuid-123", group_id=test_group.id) + db_session.add(card) + db_session.commit() + db_session.refresh(card) + return card + + +@pytest.fixture +def test_aa(db_session): + """Create a test access authorization.""" + aa = AccessAuthorizationDB( + name="Test AA", + is_active=True + ) + db_session.add(aa) + db_session.commit() + db_session.refresh(aa) + return aa diff --git a/test/test_main.py b/test/test_main.py new file mode 100644 index 0000000..877df3e --- /dev/null +++ b/test/test_main.py @@ -0,0 +1,24 @@ +def test_app_startup(client): + """Test that the application starts up correctly.""" + response = client.get("/") + # Application should respond (even if it's a 404) + assert response.status_code in [404, 200] + + +def test_health_check(client): + """Test basic health check endpoint if it exists.""" + # Note: This would require adding a health check endpoint + pass + + +def test_router_includes(): + """Test that all routers are included in the app.""" + from app.main import app + routes = [route.path for route in app.routes] + + # Check that router prefixes are present + assert any("/users" in route for route in routes) + assert any("/cards" in route for route in routes) + assert any("/groups" in route for route in routes) + assert any("/aa" in route for route in routes) + assert any("/token" in route for route in routes) diff --git a/test/test_models.py b/test/test_models.py new file mode 100644 index 0000000..42f805f --- /dev/null +++ b/test/test_models.py @@ -0,0 +1,94 @@ +import pytest +from app.model.models import ( + UserBase, UserResponse, UserCreate, UserDB, UserUpdate, + GroupBase, GroupCreate, GroupDB, GroupResponse, + AccessAuthorizationBase, AccessAuthorizationCreate, + AccessAuthorizationDB, AccessAuthorizationResponse, AccessAuthorizationUpdate, + Card, Timetable, TimetableCreate, Token, TokenData, AaGroupLink +) + + +def test_user_models(): + """Test user model creation and validation.""" + # Test UserBase + user_base = UserBase(name="Test User", email="test@example.com", is_admin=False) + assert user_base.name == "Test User" + assert user_base.email == "test@example.com" + assert user_base.is_admin is False + + # Test UserCreate + user_create = UserCreate(name="New User", email="new@example.com", password="secret123") + assert user_create.password == "secret123" + + # Test UserUpdate + user_update = UserUpdate(name="Updated Name") + assert user_update.name == "Updated Name" + assert user_update.email is None + + +def test_group_models(): + """Test group model creation and validation.""" + # Test GroupBase + group_base = GroupBase(name="Test Group") + assert group_base.name == "Test Group" + + # Test GroupCreate + group_create = GroupCreate(name="New Group") + assert group_create.name == "New Group" + + +def test_access_authorization_models(): + """Test access authorization model creation and validation.""" + # Test AccessAuthorizationBase + aa_base = AccessAuthorizationBase(name="Test AA", is_active=True) + assert aa_base.name == "Test AA" + assert aa_base.is_active is True + + # Test AccessAuthorizationCreate with timetables + timetable_create = TimetableCreate(weekday=1, starttime="08:00", duration=60) + aa_create = AccessAuthorizationCreate( + name="New AA", + is_active=False, + timetables=[timetable_create] + ) + assert aa_create.name == "New AA" + assert aa_create.is_active is False + assert len(aa_create.timetables) == 1 + + +def test_card_model(): + """Test card model creation and validation.""" + card = Card(uuid="test-uuid", group_id=1) + assert card.uuid == "test-uuid" + assert card.group_id == 1 + + +def test_timetable_models(): + """Test timetable model creation and validation.""" + # Test TimetableBase with valid values + timetable = TimetableCreate(weekday=1, starttime="09:00", duration=120) + assert timetable.weekday == 1 + assert timetable.starttime == "09:00" + assert timetable.duration == 120 + + # Test boundary values + max_duration = TimetableCreate(weekday=7, starttime="23:59", duration=1439) + assert max_duration.duration == 1439 + assert max_duration.weekday == 7 + + +def test_token_models(): + """Test token model creation and validation.""" + token = Token(access_token="test-token", token_type="bearer") + assert token.access_token == "test-token" + assert token.token_type == "bearer" + + token_data = TokenData(username="testuser") + assert token_data.username == "testuser" + + +def test_aa_group_link_model(): + """Test many-to-many relationship link model.""" + link = AaGroupLink(group_id=1, accessauth_id=2) + assert link.group_id == 1 + assert link.accessauth_id == 2 diff --git a/test/test_services/test_aa_manager.py b/test/test_services/test_aa_manager.py new file mode 100644 index 0000000..3d655b9 --- /dev/null +++ b/test/test_services/test_aa_manager.py @@ -0,0 +1,192 @@ +import pytest +from fastapi import status + + +def test_create_access_auth(client, auth_headers): + """Test creating a new access authorization.""" + aa_data = { + "name": "New AA", + "is_active": True, + "timetables": [ + {"weekday": 1, "starttime": "08:00", "duration": 60}, + {"weekday": 2, "starttime": "09:00", "duration": 90} + ] + } + + response = client.post("/aa/", json=aa_data, headers=auth_headers) + assert response.status_code == 200 + + data = response.json() + assert data["name"] == "New AA" + assert data["is_active"] is True + assert "id" in data + assert len(data["timetables"]) == 2 + + +def test_get_all_access_auths(client, auth_headers, test_aa): + """Test retrieving all access authorizations.""" + response = client.get("/aa/", headers=auth_headers) + assert response.status_code == 200 + + aa_list = response.json() + assert len(aa_list) >= 1 + + aa_names = [aa["name"] for aa in aa_list] + assert test_aa.name in aa_names + + +def test_get_access_auth_by_id(client, auth_headers, test_aa): + """Test retrieving a specific access authorization by ID.""" + response = client.get(f"/aa/{test_aa.id}", headers=auth_headers) + assert response.status_code == 200 + + data = response.json() + assert data["id"] == test_aa.id + assert data["name"] == test_aa.name + + +def test_get_nonexistent_access_auth(client, auth_headers): + """Test retrieving a non-existent access authorization.""" + response = client.get("/aa/99999", headers=auth_headers) + assert response.status_code == 404 + + +def test_assign_access_auth_to_group(client, auth_headers, test_group, test_aa): + """Test assigning an access authorization to a group.""" + response = client.put( + f"/aa/assign/{test_group.id}/{test_aa.id}", + headers=auth_headers + ) + assert response.status_code == 200 + + data = response.json() + assert data["id"] == test_group.id + # The AA should now be in the group's accessauths + # Note: The response model might not include the full relationship + + +def test_assign_already_assigned_access_auth(client, auth_headers, test_group, test_aa): + """Test assigning an already assigned access authorization.""" + # First assignment + client.put(f"/aa/assign/{test_group.id}/{test_aa.id}", headers=auth_headers) + + # Second assignment should indicate it's already assigned + response = client.put( + f"/aa/assign/{test_group.id}/{test_aa.id}", + headers=auth_headers + ) + # According to the code, this returns 200 with "already assigned" message + assert response.status_code == 200 + + +def test_unassign_access_auth_from_group(client, auth_headers, test_group, test_aa): + """Test unassigning an access authorization from a group.""" + # First assign + client.put(f"/aa/assign/{test_group.id}/{test_aa.id}", headers=auth_headers) + + # Then unassign + response = client.put( + f"/aa/unassign/{test_group.id}/{test_aa.id}", + headers=auth_headers + ) + assert response.status_code == 200 + + +def test_unassign_nonexistent_assignment(client, auth_headers, test_group, test_aa): + """Test unassigning a non-existent assignment.""" + response = client.put( + f"/aa/unassign/{test_group.id}/{test_aa.id}", + headers=auth_headers + ) + # According to the code, this returns 200 with "not assigned" message + assert response.status_code == 200 + + +def test_assign_to_nonexistent_group(client, auth_headers, test_aa): + """Test assigning an AA to a non-existent group.""" + response = client.put(f"/aa/assign/99999/{test_aa.id}", headers=auth_headers) + assert response.status_code == 404 + + +def test_assign_nonexistent_aa(client, auth_headers, test_group): + """Test assigning a non-existent AA to a group.""" + response = client.put(f"/aa/assign/{test_group.id}/99999", headers=auth_headers) + assert response.status_code == 404 + + +def test_update_access_auth(client, auth_headers, test_aa): + """Test updating an access authorization.""" + update_data = { + "name": "Updated AA", + "is_active": False + } + + response = client.patch( + f"/aa/{test_aa.id}", + json=update_data, + headers=auth_headers + ) + assert response.status_code == 200 + + data = response.json() + assert data["name"] == "Updated AA" + assert data["is_active"] is False + + +def test_update_access_auth_with_timetables(client, auth_headers, test_aa): + """Test updating an access authorization with new timetables.""" + update_data = { + "timetables": [ + {"weekday": 5, "starttime": "10:00", "duration": 120} + ] + } + + response = client.patch( + f"/aa/{test_aa.id}", + json=update_data, + headers=auth_headers + ) + assert response.status_code == 200 + + +def test_update_nonexistent_access_auth(client, auth_headers): + """Test updating a non-existent access authorization.""" + update_data = {"name": "Updated"} + response = client.patch("/aa/99999", json=update_data, headers=auth_headers) + assert response.status_code == 404 + + +def test_delete_access_auth(client, auth_headers, test_aa): + """Test deleting an access authorization.""" + response = client.delete(f"/aa/{test_aa.id}", headers=auth_headers) + assert response.status_code == 200 + assert "deleted successfully" in response.json()["message"].lower() + + # Verify AA is deleted + response = client.get(f"/aa/{test_aa.id}", headers=auth_headers) + assert response.status_code == 404 + + +def test_delete_nonexistent_access_auth(client, auth_headers): + """Test deleting a non-existent access authorization.""" + response = client.delete("/aa/99999", headers=auth_headers) + assert response.status_code == 404 + + +def test_aa_operations_by_non_admin(client, test_aa, user_auth_headers): + """Test that non-admin users cannot perform AA operations.""" + # Try to create an AA + response = client.post( + "/aa/", + json={"name": "test", "is_active": True, "timetables": []}, + headers=user_auth_headers + ) + assert response.status_code == 403 + + # Try to get all AAs + response = client.get("/aa/", headers=user_auth_headers) + assert response.status_code == 403 + + # Try to assign AA + response = client.put(f"/aa/assign/1/{test_aa.id}", headers=user_auth_headers) + assert response.status_code == 403 diff --git a/test/test_services/test_auth.py b/test/test_services/test_auth.py new file mode 100644 index 0000000..353b753 --- /dev/null +++ b/test/test_services/test_auth.py @@ -0,0 +1,195 @@ +import pytest +from datetime import datetime, timedelta, timezone +from fastapi import HTTPException, status +from app.services.auth import ( + verify_password, get_password_hash, get_user, authenticate_user, + create_access_token, get_current_user, auth_is_admin, create_first_user +) +from app.model.models import UserDB +from jwt.exceptions import InvalidTokenError + + +def test_password_hashing(): + """Test password hashing and verification.""" + password = "test_password_123" + + # Hash password + hashed = get_password_hash(password) + assert hashed != password + assert len(hashed) > 0 + + # Verify correct password + assert verify_password(password, hashed) is True + + # Verify incorrect password + assert verify_password("wrong_password", hashed) is False + + +def test_get_user(db_session): + """Test get_user function.""" + from app.services.auth import get_password_hash + + # Create a user + user = UserDB(name="testuser", passwordhash=get_password_hash("password")) + db_session.add(user) + db_session.commit() + + # Get existing user + retrieved_user = get_user(db_session, "testuser") + assert retrieved_user is not None + assert retrieved_user.name == "testuser" + + # Try to get non-existent user + with pytest.raises(HTTPException) as exc_info: + get_user(db_session, "nonexistent") + assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND + + +def test_authenticate_user(db_session): + """Test user authentication.""" + from app.services.auth import get_password_hash + + # Create a user + user = UserDB(name="authuser", passwordhash=get_password_hash("correctpass")) + db_session.add(user) + db_session.commit() + + # Authenticate with correct credentials + authenticated = authenticate_user(db_session, "authuser", "correctpass") + assert authenticated is not False + assert authenticated.name == "authuser" + + # Authenticate with wrong password + authenticated = authenticate_user(db_session, "authuser", "wrongpass") + assert authenticated is False + + # Authenticate non-existent user + authenticated = authenticate_user(db_session, "nonexistent", "password") + assert authenticated is False + + +def test_create_access_token(): + """Test JWT token creation.""" + data = {"sub": "testuser"} + + # Create token with default expiration + token = create_access_token(data) + assert isinstance(token, str) + assert len(token) > 0 + + # Create token with custom expiration + custom_expire = timedelta(hours=1) + token = create_access_token(data, expires_delta=custom_expire) + assert isinstance(token, str) + + +def test_get_current_user(db_session, admin_user): + """Test getting current user from token.""" + from app.services.auth import create_access_token, get_current_user + + # Create token for admin user + token = create_access_token(data={"sub": admin_user.name}) + + # Get user from token + user = get_current_user(token=token) + assert user is not None + assert user.name == admin_user.name + assert user.id == admin_user.id + + # Test invalid token + with pytest.raises(HTTPException) as exc_info: + get_current_user(token="invalid_token") + assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED + + # Test expired token (create token with past expiration) + past_expire = timedelta(minutes=-100) + expired_token = create_access_token(data={"sub": admin_user.name}, expires_delta=past_expire) + + with pytest.raises(HTTPException) as exc_info: + get_current_user(token=expired_token) + assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED + + +def test_auth_is_admin(db_session, admin_user, regular_user): + """Test admin authorization check.""" + from app.services.auth import create_access_token, auth_is_admin + + # Create token for admin user + admin_token = create_access_token(data={"sub": admin_user.name}) + + # Admin should pass + result = auth_is_admin(token=admin_token) + assert result is True + + # Create token for regular user + user_token = create_access_token(data={"sub": regular_user.name}) + + # Regular user should fail + with pytest.raises(HTTPException) as exc_info: + auth_is_admin(token=user_token) + assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN + + +def test_create_first_user(db_session): + """Test automatic creation of first admin user.""" + # Clear any existing users + db_session.exec(select(UserDB)).all() + for user in db_session.exec(select(UserDB)).all(): + db_session.delete(user) + db_session.commit() + + # Create first user + result = create_first_user() + assert result is not None + assert result.name == "admin" + assert result.is_admin is True + + # Verify user exists in database + user = db_session.exec(select(UserDB).where(UserDB.name == "admin")).first() + assert user is not None + assert user.is_admin is True + + # Test that it doesn't create another admin if one exists + second_result = create_first_user() + assert second_result is None # Should print "Admin user already exists" + + +def test_token_endpoint(client, admin_user): + """Test the token endpoint for login.""" + # Test successful login + response = client.post( + "/token", + data={"username": admin_user.name, "password": "admin123"} + ) + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + assert data["token_type"] == "bearer" + + # Test failed login with wrong password + response = client.post( + "/token", + data={"username": admin_user.name, "password": "wrongpassword"} + ) + assert response.status_code == 401 + + # Test failed login with non-existent user + response = client.post( + "/token", + data={"username": "nonexistent", "password": "password"} + ) + assert response.status_code == 401 + + +def test_test_login_endpoint(client, admin_user, auth_headers): + """Test the test login endpoint.""" + # Test with valid token + response = client.get("/test/login", headers=auth_headers) + assert response.status_code == 200 + data = response.json() + assert data["name"] == admin_user.name + assert data["is_admin"] is True + + # Test without token + response = client.get("/test/login") + assert response.status_code == 401 diff --git a/test/test_services/test_card_manager.py b/test/test_services/test_card_manager.py new file mode 100644 index 0000000..e39db7a --- /dev/null +++ b/test/test_services/test_card_manager.py @@ -0,0 +1,66 @@ +import pytest +from fastapi import status + + +def test_add_card(client, auth_headers, test_group): + """Test adding a card to a group.""" + response = client.post(f"/cards/{test_group.id}", headers=auth_headers) + assert response.status_code == 200 + + data = response.json() + assert "id" in data + assert "uuid" in data + assert data["group_id"] == test_group.id + assert len(data["uuid"]) > 0 # UUID should be generated + + +def test_add_card_to_nonexistent_group(client, auth_headers): + """Test adding a card to a non-existent group.""" + response = client.post("/cards/99999", headers=auth_headers) + # This might succeed and create a card with a non-existent group_id + # or fail depending on foreign key constraints + # For now, let's assume it might fail + # assert response.status_code == 404 + + +def test_delete_card(client, auth_headers, test_card): + """Test deleting a card.""" + response = client.delete(f"/cards/{test_card.id}", headers=auth_headers) + assert response.status_code == 200 + assert "deleted successfully" in response.json()["message"].lower() + + +def test_delete_nonexistent_card(client, auth_headers): + """Test deleting a non-existent card.""" + response = client.delete("/cards/99999", headers=auth_headers) + assert response.status_code == 404 + + +def test_get_cards_for_group(client, auth_headers, test_group, test_card): + """Test getting all cards for a group.""" + response = client.get(f"/cards/{test_group.id}", headers=auth_headers) + assert response.status_code == 200 + + cards = response.json() + assert len(cards) >= 1 + assert any(card["id"] == test_card.id for card in cards) + + +def test_get_cards_for_nonexistent_group(client, auth_headers): + """Test getting cards for a non-existent group.""" + response = client.get("/cards/99999", headers=auth_headers) + assert response.status_code == 200 + + cards = response.json() + assert len(cards) == 0 # Empty list for non-existent group + + +def test_card_operations_by_non_admin(client, test_group, user_auth_headers): + """Test that non-admin users cannot perform card operations.""" + # Try to add a card + response = client.post(f"/cards/{test_group.id}", headers=user_auth_headers) + assert response.status_code == 403 + + # Try to get cards + response = client.get(f"/cards/{test_group.id}", headers=user_auth_headers) + assert response.status_code == 403 diff --git a/test/test_services/test_database.py b/test/test_services/test_database.py new file mode 100644 index 0000000..8496032 --- /dev/null +++ b/test/test_services/test_database.py @@ -0,0 +1,64 @@ +import pytest +from sqlmodel import Session, select +from app.services.database import create_db_and_tables, get_session, add_and_refresh +from app.model.models import UserDB, GroupDB, Card + + +def test_create_db_and_tables(): + """Test database and tables creation.""" + # This is primarily an integration test + from sqlalchemy import inspect + from app.services.database import engine + + create_db_and_tables() + inspector = inspect(engine) + + # Check that tables exist + tables = inspector.get_table_names() + assert "userdb" in tables + assert "groupdb" in tables + assert "card" in tables + assert "accessauthorizationdb" in tables + assert "timetable" in tables + assert "aagrouplink" in tables + + +def test_get_session(db_session): + """Test database session generator.""" + # Test that we can get a session + session_gen = get_session() + session = next(session_gen) + + assert isinstance(session, Session) + + # Test that session works + user = UserDB(name="Test", passwordhash="hash") + session.add(user) + session.commit() + + retrieved_user = session.get(UserDB, user.id) + assert retrieved_user is not None + assert retrieved_user.name == "Test" + + # Clean up generator + try: + next(session_gen) + except StopIteration: + pass + + +def test_add_and_refresh(db_session): + """Test add_and_refresh helper function.""" + user = UserDB(name="Test User", passwordhash="hashed") + + # Add user + result = add_and_refresh(db_session, user) + + # Assert that user is now in database with ID + assert result.id is not None + assert result.name == "Test User" + + # Verify in database + db_user = db_session.get(UserDB, result.id) + assert db_user is not None + assert db_user.name == "Test User" diff --git a/test/test_services/test_group_manager.py b/test/test_services/test_group_manager.py new file mode 100644 index 0000000..f4460b0 --- /dev/null +++ b/test/test_services/test_group_manager.py @@ -0,0 +1,68 @@ +import pytest +from fastapi import status + + +def test_create_group(client, auth_headers): + """Test creating a new group.""" + group_data = {"name": "New Test Group"} + + response = client.post("/groups/", json=group_data, headers=auth_headers) + assert response.status_code == 200 + + data = response.json() + assert data["name"] == "New Test Group" + assert "id" in data + + +def test_create_duplicate_group(client, auth_headers, test_group): + """Test creating a group with a duplicate name.""" + group_data = {"name": test_group.name} + + response = client.post("/groups/", json=group_data, headers=auth_headers) + # This should fail due to unique constraint + assert response.status_code == 422 # Validation error + + +def test_get_groups(client, auth_headers, test_group): + """Test retrieving all groups.""" + response = client.get("/groups/", headers=auth_headers) + assert response.status_code == 200 + + groups = response.json() + assert len(groups) >= 1 + + group_names = [group["name"] for group in groups] + assert test_group.name in group_names + + +def test_delete_group(client, auth_headers, test_group): + """Test deleting a group.""" + response = client.delete(f"/groups/{test_group.id}", headers=auth_headers) + assert response.status_code == 200 + assert "deleted successfully" in response.json()["message"].lower() + + # Verify group is deleted + response = client.get("/groups/", headers=auth_headers) + groups = response.json() + assert not any(group["id"] == test_group.id for group in groups) + + +def test_delete_nonexistent_group(client, auth_headers): + """Test deleting a non-existent group.""" + response = client.delete("/groups/99999", headers=auth_headers) + assert response.status_code == 404 + + +def test_group_operations_by_non_admin(client, user_auth_headers): + """Test that non-admin users cannot perform group operations.""" + # Try to create a group + response = client.post( + "/groups/", + json={"name": "test"}, + headers=user_auth_headers + ) + assert response.status_code == 403 + + # Try to get groups + response = client.get("/groups/", headers=user_auth_headers) + assert response.status_code == 403 diff --git a/test/test_services/test_user_manager.py b/test/test_services/test_user_manager.py new file mode 100644 index 0000000..c62e366 --- /dev/null +++ b/test/test_services/test_user_manager.py @@ -0,0 +1,150 @@ +import pytest +from fastapi import status + + +def test_create_user(client, auth_headers): + """Test creating a new user.""" + user_data = { + "name": "newuser", + "email": "newuser@example.com", + "is_admin": False, + "password": "newpassword123" + } + + response = client.post("/users/", json=user_data, headers=auth_headers) + assert response.status_code == 200 + + data = response.json() + assert data["name"] == "newuser" + assert data["email"] == "newuser@example.com" + assert data["is_admin"] is False + assert "id" in data + assert "passwordhash" not in data # Password hash should not be in response + + +def test_create_user_unauthorized(client): + """Test creating a user without admin credentials.""" + user_data = { + "name": "unauthorized_user", + "email": "unauthorized@example.com", + "password": "password123" + } + + response = client.post("/users/", json=user_data) + assert response.status_code == 403 + + +def test_get_users(client, auth_headers, admin_user, regular_user): + """Test retrieving all users.""" + response = client.get("/users/", headers=auth_headers) + assert response.status_code == 200 + + users = response.json() + assert len(users) >= 2 # At least admin_user and regular_user + + user_names = [user["name"] for user in users] + assert admin_user.name in user_names + assert regular_user.name in user_names + + +def test_get_user_by_id(client, auth_headers, regular_user): + """Test retrieving a specific user by ID.""" + response = client.get(f"/users/{regular_user.id}", headers=auth_headers) + assert response.status_code == 200 + + data = response.json() + assert data["id"] == regular_user.id + assert data["name"] == regular_user.name + + +def test_get_nonexistent_user(client, auth_headers): + """Test retrieving a non-existent user.""" + response = client.get("/users/99999", headers=auth_headers) + assert response.status_code == 404 + assert "not found" in response.json()["detail"].lower() + + +def test_update_user(client, auth_headers, regular_user): + """Test updating a user.""" + update_data = { + "name": "updated_name", + "email": "updated@example.com" + } + + response = client.patch( + f"/users/{regular_user.id}", + json=update_data, + headers=auth_headers + ) + assert response.status_code == 200 + + data = response.json() + assert data["name"] == "updated_name" + assert data["email"] == "updated@example.com" + # Unchanged fields should remain the same + assert data["is_admin"] == regular_user.is_admin + + +def test_update_user_password(client, auth_headers, regular_user): + """Test updating a user's password.""" + update_data = { + "password": "new_password_456" + } + + response = client.patch( + f"/users/{regular_user.id}", + json=update_data, + headers=auth_headers + ) + assert response.status_code == 200 + + # Verify password can be used for login + login_response = client.post( + "/token", + data={"username": regular_user.name, "password": "new_password_456"} + ) + assert login_response.status_code == 200 + + +def test_update_nonexistent_user(client, auth_headers): + """Test updating a non-existent user.""" + update_data = {"name": "updated"} + response = client.patch("/users/99999", json=update_data, headers=auth_headers) + assert response.status_code == 404 + + +def test_delete_user(client, auth_headers, regular_user): + """Test deleting a user.""" + response = client.delete(f"/users/{regular_user.id}", headers=auth_headers) + assert response.status_code == 200 + assert "deleted successfully" in response.json()["message"].lower() + + # Verify user is deleted + response = client.get(f"/users/{regular_user.id}", headers=auth_headers) + assert response.status_code == 404 + + +def test_delete_nonexistent_user(client, auth_headers): + """Test deleting a non-existent user.""" + response = client.delete("/users/99999", headers=auth_headers) + assert response.status_code == 404 + + +def test_user_operations_by_non_admin(client, user_auth_headers): + """Test that non-admin users cannot perform admin operations.""" + # Try to create a user + response = client.post( + "/users/", + json={"name": "test", "password": "pass"}, + headers=user_auth_headers + ) + assert response.status_code == 403 + + # Try to get users + response = client.get("/users/", headers=user_auth_headers) + assert response.status_code == 403 + + # Try to delete the admin user (if ID is known) + # This would require knowing the admin user ID + # response = client.delete(f"/users/{admin_id}", headers=user_auth_headers) + # assert response.status_code == 403