Add ai generated tests
This commit is contained in:
126
test/conftest.py
Normal file
126
test/conftest.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from sqlmodel import Session, create_engine, SQLModel
|
||||||
|
from sqlalchemy.orm import sessionmaker
|
||||||
|
|
||||||
|
from app.main import app
|
||||||
|
from app.model.models import UserDB, Card, GroupDB, AccessAuthorizationDB, Timetable, AaGroupLink
|
||||||
|
from app.services.database import get_session
|
||||||
|
|
||||||
|
# Use in-memory SQLite for testing
|
||||||
|
TEST_SQLALCHEMY_DATABASE_URL = "sqlite:///:memory:"
|
||||||
|
|
||||||
|
engine = create_engine(TEST_SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
|
||||||
|
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def db_session():
|
||||||
|
"""Create a fresh database session for each test."""
|
||||||
|
SQLModel.metadata.create_all(engine)
|
||||||
|
session = TestingSessionLocal()
|
||||||
|
try:
|
||||||
|
yield session
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
SQLModel.metadata.drop_all(engine)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def client(db_session):
|
||||||
|
"""Create a test client with a database session override."""
|
||||||
|
def override_get_session():
|
||||||
|
try:
|
||||||
|
yield db_session
|
||||||
|
finally:
|
||||||
|
pass
|
||||||
|
|
||||||
|
app.dependency_overrides[get_session] = override_get_session
|
||||||
|
with TestClient(app) as test_client:
|
||||||
|
yield test_client
|
||||||
|
app.dependency_overrides.clear()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def admin_user(db_session):
|
||||||
|
"""Create an admin user for testing."""
|
||||||
|
from app.services.auth import get_password_hash
|
||||||
|
admin = UserDB(
|
||||||
|
name="admin",
|
||||||
|
passwordhash=get_password_hash("admin123"),
|
||||||
|
is_admin=True
|
||||||
|
)
|
||||||
|
db_session.add(admin)
|
||||||
|
db_session.commit()
|
||||||
|
db_session.refresh(admin)
|
||||||
|
return admin
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def regular_user(db_session):
|
||||||
|
"""Create a regular user for testing."""
|
||||||
|
from app.services.auth import get_password_hash
|
||||||
|
user = UserDB(
|
||||||
|
name="user",
|
||||||
|
passwordhash=get_password_hash("user123"),
|
||||||
|
is_admin=False
|
||||||
|
)
|
||||||
|
db_session.add(user)
|
||||||
|
db_session.commit()
|
||||||
|
db_session.refresh(user)
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def auth_headers(client, admin_user):
|
||||||
|
"""Get authentication headers for admin user."""
|
||||||
|
response = client.post(
|
||||||
|
"/token",
|
||||||
|
data={"username": admin_user.name, "password": "admin123"}
|
||||||
|
)
|
||||||
|
token = response.json()["access_token"]
|
||||||
|
return {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def user_auth_headers(client, regular_user):
|
||||||
|
"""Get authentication headers for regular user."""
|
||||||
|
response = client.post(
|
||||||
|
"/token",
|
||||||
|
data={"username": regular_user.name, "password": "user123"}
|
||||||
|
)
|
||||||
|
token = response.json()["access_token"]
|
||||||
|
return {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_group(db_session):
|
||||||
|
"""Create a test group."""
|
||||||
|
group = GroupDB(name="Test Group")
|
||||||
|
db_session.add(group)
|
||||||
|
db_session.commit()
|
||||||
|
db_session.refresh(group)
|
||||||
|
return group
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_card(db_session, test_group):
|
||||||
|
"""Create a test card."""
|
||||||
|
card = Card(uuid="test-uuid-123", group_id=test_group.id)
|
||||||
|
db_session.add(card)
|
||||||
|
db_session.commit()
|
||||||
|
db_session.refresh(card)
|
||||||
|
return card
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def test_aa(db_session):
|
||||||
|
"""Create a test access authorization."""
|
||||||
|
aa = AccessAuthorizationDB(
|
||||||
|
name="Test AA",
|
||||||
|
is_active=True
|
||||||
|
)
|
||||||
|
db_session.add(aa)
|
||||||
|
db_session.commit()
|
||||||
|
db_session.refresh(aa)
|
||||||
|
return aa
|
||||||
24
test/test_main.py
Normal file
24
test/test_main.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
def test_app_startup(client):
|
||||||
|
"""Test that the application starts up correctly."""
|
||||||
|
response = client.get("/")
|
||||||
|
# Application should respond (even if it's a 404)
|
||||||
|
assert response.status_code in [404, 200]
|
||||||
|
|
||||||
|
|
||||||
|
def test_health_check(client):
|
||||||
|
"""Test basic health check endpoint if it exists."""
|
||||||
|
# Note: This would require adding a health check endpoint
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_router_includes():
|
||||||
|
"""Test that all routers are included in the app."""
|
||||||
|
from app.main import app
|
||||||
|
routes = [route.path for route in app.routes]
|
||||||
|
|
||||||
|
# Check that router prefixes are present
|
||||||
|
assert any("/users" in route for route in routes)
|
||||||
|
assert any("/cards" in route for route in routes)
|
||||||
|
assert any("/groups" in route for route in routes)
|
||||||
|
assert any("/aa" in route for route in routes)
|
||||||
|
assert any("/token" in route for route in routes)
|
||||||
94
test/test_models.py
Normal file
94
test/test_models.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
import pytest
|
||||||
|
from app.model.models import (
|
||||||
|
UserBase, UserResponse, UserCreate, UserDB, UserUpdate,
|
||||||
|
GroupBase, GroupCreate, GroupDB, GroupResponse,
|
||||||
|
AccessAuthorizationBase, AccessAuthorizationCreate,
|
||||||
|
AccessAuthorizationDB, AccessAuthorizationResponse, AccessAuthorizationUpdate,
|
||||||
|
Card, Timetable, TimetableCreate, Token, TokenData, AaGroupLink
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_models():
|
||||||
|
"""Test user model creation and validation."""
|
||||||
|
# Test UserBase
|
||||||
|
user_base = UserBase(name="Test User", email="test@example.com", is_admin=False)
|
||||||
|
assert user_base.name == "Test User"
|
||||||
|
assert user_base.email == "test@example.com"
|
||||||
|
assert user_base.is_admin is False
|
||||||
|
|
||||||
|
# Test UserCreate
|
||||||
|
user_create = UserCreate(name="New User", email="new@example.com", password="secret123")
|
||||||
|
assert user_create.password == "secret123"
|
||||||
|
|
||||||
|
# Test UserUpdate
|
||||||
|
user_update = UserUpdate(name="Updated Name")
|
||||||
|
assert user_update.name == "Updated Name"
|
||||||
|
assert user_update.email is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_group_models():
|
||||||
|
"""Test group model creation and validation."""
|
||||||
|
# Test GroupBase
|
||||||
|
group_base = GroupBase(name="Test Group")
|
||||||
|
assert group_base.name == "Test Group"
|
||||||
|
|
||||||
|
# Test GroupCreate
|
||||||
|
group_create = GroupCreate(name="New Group")
|
||||||
|
assert group_create.name == "New Group"
|
||||||
|
|
||||||
|
|
||||||
|
def test_access_authorization_models():
|
||||||
|
"""Test access authorization model creation and validation."""
|
||||||
|
# Test AccessAuthorizationBase
|
||||||
|
aa_base = AccessAuthorizationBase(name="Test AA", is_active=True)
|
||||||
|
assert aa_base.name == "Test AA"
|
||||||
|
assert aa_base.is_active is True
|
||||||
|
|
||||||
|
# Test AccessAuthorizationCreate with timetables
|
||||||
|
timetable_create = TimetableCreate(weekday=1, starttime="08:00", duration=60)
|
||||||
|
aa_create = AccessAuthorizationCreate(
|
||||||
|
name="New AA",
|
||||||
|
is_active=False,
|
||||||
|
timetables=[timetable_create]
|
||||||
|
)
|
||||||
|
assert aa_create.name == "New AA"
|
||||||
|
assert aa_create.is_active is False
|
||||||
|
assert len(aa_create.timetables) == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_card_model():
|
||||||
|
"""Test card model creation and validation."""
|
||||||
|
card = Card(uuid="test-uuid", group_id=1)
|
||||||
|
assert card.uuid == "test-uuid"
|
||||||
|
assert card.group_id == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_timetable_models():
|
||||||
|
"""Test timetable model creation and validation."""
|
||||||
|
# Test TimetableBase with valid values
|
||||||
|
timetable = TimetableCreate(weekday=1, starttime="09:00", duration=120)
|
||||||
|
assert timetable.weekday == 1
|
||||||
|
assert timetable.starttime == "09:00"
|
||||||
|
assert timetable.duration == 120
|
||||||
|
|
||||||
|
# Test boundary values
|
||||||
|
max_duration = TimetableCreate(weekday=7, starttime="23:59", duration=1439)
|
||||||
|
assert max_duration.duration == 1439
|
||||||
|
assert max_duration.weekday == 7
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_models():
|
||||||
|
"""Test token model creation and validation."""
|
||||||
|
token = Token(access_token="test-token", token_type="bearer")
|
||||||
|
assert token.access_token == "test-token"
|
||||||
|
assert token.token_type == "bearer"
|
||||||
|
|
||||||
|
token_data = TokenData(username="testuser")
|
||||||
|
assert token_data.username == "testuser"
|
||||||
|
|
||||||
|
|
||||||
|
def test_aa_group_link_model():
|
||||||
|
"""Test many-to-many relationship link model."""
|
||||||
|
link = AaGroupLink(group_id=1, accessauth_id=2)
|
||||||
|
assert link.group_id == 1
|
||||||
|
assert link.accessauth_id == 2
|
||||||
192
test/test_services/test_aa_manager.py
Normal file
192
test/test_services/test_aa_manager.py
Normal file
@@ -0,0 +1,192 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi import status
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_access_auth(client, auth_headers):
|
||||||
|
"""Test creating a new access authorization."""
|
||||||
|
aa_data = {
|
||||||
|
"name": "New AA",
|
||||||
|
"is_active": True,
|
||||||
|
"timetables": [
|
||||||
|
{"weekday": 1, "starttime": "08:00", "duration": 60},
|
||||||
|
{"weekday": 2, "starttime": "09:00", "duration": 90}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/aa/", json=aa_data, headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["name"] == "New AA"
|
||||||
|
assert data["is_active"] is True
|
||||||
|
assert "id" in data
|
||||||
|
assert len(data["timetables"]) == 2
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_all_access_auths(client, auth_headers, test_aa):
|
||||||
|
"""Test retrieving all access authorizations."""
|
||||||
|
response = client.get("/aa/", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
aa_list = response.json()
|
||||||
|
assert len(aa_list) >= 1
|
||||||
|
|
||||||
|
aa_names = [aa["name"] for aa in aa_list]
|
||||||
|
assert test_aa.name in aa_names
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_access_auth_by_id(client, auth_headers, test_aa):
|
||||||
|
"""Test retrieving a specific access authorization by ID."""
|
||||||
|
response = client.get(f"/aa/{test_aa.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["id"] == test_aa.id
|
||||||
|
assert data["name"] == test_aa.name
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_nonexistent_access_auth(client, auth_headers):
|
||||||
|
"""Test retrieving a non-existent access authorization."""
|
||||||
|
response = client.get("/aa/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_assign_access_auth_to_group(client, auth_headers, test_group, test_aa):
|
||||||
|
"""Test assigning an access authorization to a group."""
|
||||||
|
response = client.put(
|
||||||
|
f"/aa/assign/{test_group.id}/{test_aa.id}",
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["id"] == test_group.id
|
||||||
|
# The AA should now be in the group's accessauths
|
||||||
|
# Note: The response model might not include the full relationship
|
||||||
|
|
||||||
|
|
||||||
|
def test_assign_already_assigned_access_auth(client, auth_headers, test_group, test_aa):
|
||||||
|
"""Test assigning an already assigned access authorization."""
|
||||||
|
# First assignment
|
||||||
|
client.put(f"/aa/assign/{test_group.id}/{test_aa.id}", headers=auth_headers)
|
||||||
|
|
||||||
|
# Second assignment should indicate it's already assigned
|
||||||
|
response = client.put(
|
||||||
|
f"/aa/assign/{test_group.id}/{test_aa.id}",
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
# According to the code, this returns 200 with "already assigned" message
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_unassign_access_auth_from_group(client, auth_headers, test_group, test_aa):
|
||||||
|
"""Test unassigning an access authorization from a group."""
|
||||||
|
# First assign
|
||||||
|
client.put(f"/aa/assign/{test_group.id}/{test_aa.id}", headers=auth_headers)
|
||||||
|
|
||||||
|
# Then unassign
|
||||||
|
response = client.put(
|
||||||
|
f"/aa/unassign/{test_group.id}/{test_aa.id}",
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_unassign_nonexistent_assignment(client, auth_headers, test_group, test_aa):
|
||||||
|
"""Test unassigning a non-existent assignment."""
|
||||||
|
response = client.put(
|
||||||
|
f"/aa/unassign/{test_group.id}/{test_aa.id}",
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
# According to the code, this returns 200 with "not assigned" message
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_assign_to_nonexistent_group(client, auth_headers, test_aa):
|
||||||
|
"""Test assigning an AA to a non-existent group."""
|
||||||
|
response = client.put(f"/aa/assign/99999/{test_aa.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_assign_nonexistent_aa(client, auth_headers, test_group):
|
||||||
|
"""Test assigning a non-existent AA to a group."""
|
||||||
|
response = client.put(f"/aa/assign/{test_group.id}/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_access_auth(client, auth_headers, test_aa):
|
||||||
|
"""Test updating an access authorization."""
|
||||||
|
update_data = {
|
||||||
|
"name": "Updated AA",
|
||||||
|
"is_active": False
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.patch(
|
||||||
|
f"/aa/{test_aa.id}",
|
||||||
|
json=update_data,
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["name"] == "Updated AA"
|
||||||
|
assert data["is_active"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_access_auth_with_timetables(client, auth_headers, test_aa):
|
||||||
|
"""Test updating an access authorization with new timetables."""
|
||||||
|
update_data = {
|
||||||
|
"timetables": [
|
||||||
|
{"weekday": 5, "starttime": "10:00", "duration": 120}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.patch(
|
||||||
|
f"/aa/{test_aa.id}",
|
||||||
|
json=update_data,
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_nonexistent_access_auth(client, auth_headers):
|
||||||
|
"""Test updating a non-existent access authorization."""
|
||||||
|
update_data = {"name": "Updated"}
|
||||||
|
response = client.patch("/aa/99999", json=update_data, headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_access_auth(client, auth_headers, test_aa):
|
||||||
|
"""Test deleting an access authorization."""
|
||||||
|
response = client.delete(f"/aa/{test_aa.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "deleted successfully" in response.json()["message"].lower()
|
||||||
|
|
||||||
|
# Verify AA is deleted
|
||||||
|
response = client.get(f"/aa/{test_aa.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_nonexistent_access_auth(client, auth_headers):
|
||||||
|
"""Test deleting a non-existent access authorization."""
|
||||||
|
response = client.delete("/aa/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_aa_operations_by_non_admin(client, test_aa, user_auth_headers):
|
||||||
|
"""Test that non-admin users cannot perform AA operations."""
|
||||||
|
# Try to create an AA
|
||||||
|
response = client.post(
|
||||||
|
"/aa/",
|
||||||
|
json={"name": "test", "is_active": True, "timetables": []},
|
||||||
|
headers=user_auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# Try to get all AAs
|
||||||
|
response = client.get("/aa/", headers=user_auth_headers)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# Try to assign AA
|
||||||
|
response = client.put(f"/aa/assign/1/{test_aa.id}", headers=user_auth_headers)
|
||||||
|
assert response.status_code == 403
|
||||||
195
test/test_services/test_auth.py
Normal file
195
test/test_services/test_auth.py
Normal file
@@ -0,0 +1,195 @@
|
|||||||
|
import pytest
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from fastapi import HTTPException, status
|
||||||
|
from app.services.auth import (
|
||||||
|
verify_password, get_password_hash, get_user, authenticate_user,
|
||||||
|
create_access_token, get_current_user, auth_is_admin, create_first_user
|
||||||
|
)
|
||||||
|
from app.model.models import UserDB
|
||||||
|
from jwt.exceptions import InvalidTokenError
|
||||||
|
|
||||||
|
|
||||||
|
def test_password_hashing():
|
||||||
|
"""Test password hashing and verification."""
|
||||||
|
password = "test_password_123"
|
||||||
|
|
||||||
|
# Hash password
|
||||||
|
hashed = get_password_hash(password)
|
||||||
|
assert hashed != password
|
||||||
|
assert len(hashed) > 0
|
||||||
|
|
||||||
|
# Verify correct password
|
||||||
|
assert verify_password(password, hashed) is True
|
||||||
|
|
||||||
|
# Verify incorrect password
|
||||||
|
assert verify_password("wrong_password", hashed) is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_user(db_session):
|
||||||
|
"""Test get_user function."""
|
||||||
|
from app.services.auth import get_password_hash
|
||||||
|
|
||||||
|
# Create a user
|
||||||
|
user = UserDB(name="testuser", passwordhash=get_password_hash("password"))
|
||||||
|
db_session.add(user)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Get existing user
|
||||||
|
retrieved_user = get_user(db_session, "testuser")
|
||||||
|
assert retrieved_user is not None
|
||||||
|
assert retrieved_user.name == "testuser"
|
||||||
|
|
||||||
|
# Try to get non-existent user
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
get_user(db_session, "nonexistent")
|
||||||
|
assert exc_info.value.status_code == status.HTTP_404_NOT_FOUND
|
||||||
|
|
||||||
|
|
||||||
|
def test_authenticate_user(db_session):
|
||||||
|
"""Test user authentication."""
|
||||||
|
from app.services.auth import get_password_hash
|
||||||
|
|
||||||
|
# Create a user
|
||||||
|
user = UserDB(name="authuser", passwordhash=get_password_hash("correctpass"))
|
||||||
|
db_session.add(user)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Authenticate with correct credentials
|
||||||
|
authenticated = authenticate_user(db_session, "authuser", "correctpass")
|
||||||
|
assert authenticated is not False
|
||||||
|
assert authenticated.name == "authuser"
|
||||||
|
|
||||||
|
# Authenticate with wrong password
|
||||||
|
authenticated = authenticate_user(db_session, "authuser", "wrongpass")
|
||||||
|
assert authenticated is False
|
||||||
|
|
||||||
|
# Authenticate non-existent user
|
||||||
|
authenticated = authenticate_user(db_session, "nonexistent", "password")
|
||||||
|
assert authenticated is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_access_token():
|
||||||
|
"""Test JWT token creation."""
|
||||||
|
data = {"sub": "testuser"}
|
||||||
|
|
||||||
|
# Create token with default expiration
|
||||||
|
token = create_access_token(data)
|
||||||
|
assert isinstance(token, str)
|
||||||
|
assert len(token) > 0
|
||||||
|
|
||||||
|
# Create token with custom expiration
|
||||||
|
custom_expire = timedelta(hours=1)
|
||||||
|
token = create_access_token(data, expires_delta=custom_expire)
|
||||||
|
assert isinstance(token, str)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_current_user(db_session, admin_user):
|
||||||
|
"""Test getting current user from token."""
|
||||||
|
from app.services.auth import create_access_token, get_current_user
|
||||||
|
|
||||||
|
# Create token for admin user
|
||||||
|
token = create_access_token(data={"sub": admin_user.name})
|
||||||
|
|
||||||
|
# Get user from token
|
||||||
|
user = get_current_user(token=token)
|
||||||
|
assert user is not None
|
||||||
|
assert user.name == admin_user.name
|
||||||
|
assert user.id == admin_user.id
|
||||||
|
|
||||||
|
# Test invalid token
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
get_current_user(token="invalid_token")
|
||||||
|
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
|
||||||
|
|
||||||
|
# Test expired token (create token with past expiration)
|
||||||
|
past_expire = timedelta(minutes=-100)
|
||||||
|
expired_token = create_access_token(data={"sub": admin_user.name}, expires_delta=past_expire)
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
get_current_user(token=expired_token)
|
||||||
|
assert exc_info.value.status_code == status.HTTP_401_UNAUTHORIZED
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_is_admin(db_session, admin_user, regular_user):
|
||||||
|
"""Test admin authorization check."""
|
||||||
|
from app.services.auth import create_access_token, auth_is_admin
|
||||||
|
|
||||||
|
# Create token for admin user
|
||||||
|
admin_token = create_access_token(data={"sub": admin_user.name})
|
||||||
|
|
||||||
|
# Admin should pass
|
||||||
|
result = auth_is_admin(token=admin_token)
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
# Create token for regular user
|
||||||
|
user_token = create_access_token(data={"sub": regular_user.name})
|
||||||
|
|
||||||
|
# Regular user should fail
|
||||||
|
with pytest.raises(HTTPException) as exc_info:
|
||||||
|
auth_is_admin(token=user_token)
|
||||||
|
assert exc_info.value.status_code == status.HTTP_403_FORBIDDEN
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_first_user(db_session):
|
||||||
|
"""Test automatic creation of first admin user."""
|
||||||
|
# Clear any existing users
|
||||||
|
db_session.exec(select(UserDB)).all()
|
||||||
|
for user in db_session.exec(select(UserDB)).all():
|
||||||
|
db_session.delete(user)
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
# Create first user
|
||||||
|
result = create_first_user()
|
||||||
|
assert result is not None
|
||||||
|
assert result.name == "admin"
|
||||||
|
assert result.is_admin is True
|
||||||
|
|
||||||
|
# Verify user exists in database
|
||||||
|
user = db_session.exec(select(UserDB).where(UserDB.name == "admin")).first()
|
||||||
|
assert user is not None
|
||||||
|
assert user.is_admin is True
|
||||||
|
|
||||||
|
# Test that it doesn't create another admin if one exists
|
||||||
|
second_result = create_first_user()
|
||||||
|
assert second_result is None # Should print "Admin user already exists"
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint(client, admin_user):
|
||||||
|
"""Test the token endpoint for login."""
|
||||||
|
# Test successful login
|
||||||
|
response = client.post(
|
||||||
|
"/token",
|
||||||
|
data={"username": admin_user.name, "password": "admin123"}
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert "access_token" in data
|
||||||
|
assert data["token_type"] == "bearer"
|
||||||
|
|
||||||
|
# Test failed login with wrong password
|
||||||
|
response = client.post(
|
||||||
|
"/token",
|
||||||
|
data={"username": admin_user.name, "password": "wrongpassword"}
|
||||||
|
)
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
# Test failed login with non-existent user
|
||||||
|
response = client.post(
|
||||||
|
"/token",
|
||||||
|
data={"username": "nonexistent", "password": "password"}
|
||||||
|
)
|
||||||
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
def test_test_login_endpoint(client, admin_user, auth_headers):
|
||||||
|
"""Test the test login endpoint."""
|
||||||
|
# Test with valid token
|
||||||
|
response = client.get("/test/login", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.json()
|
||||||
|
assert data["name"] == admin_user.name
|
||||||
|
assert data["is_admin"] is True
|
||||||
|
|
||||||
|
# Test without token
|
||||||
|
response = client.get("/test/login")
|
||||||
|
assert response.status_code == 401
|
||||||
66
test/test_services/test_card_manager.py
Normal file
66
test/test_services/test_card_manager.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi import status
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_card(client, auth_headers, test_group):
|
||||||
|
"""Test adding a card to a group."""
|
||||||
|
response = client.post(f"/cards/{test_group.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert "id" in data
|
||||||
|
assert "uuid" in data
|
||||||
|
assert data["group_id"] == test_group.id
|
||||||
|
assert len(data["uuid"]) > 0 # UUID should be generated
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_card_to_nonexistent_group(client, auth_headers):
|
||||||
|
"""Test adding a card to a non-existent group."""
|
||||||
|
response = client.post("/cards/99999", headers=auth_headers)
|
||||||
|
# This might succeed and create a card with a non-existent group_id
|
||||||
|
# or fail depending on foreign key constraints
|
||||||
|
# For now, let's assume it might fail
|
||||||
|
# assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_card(client, auth_headers, test_card):
|
||||||
|
"""Test deleting a card."""
|
||||||
|
response = client.delete(f"/cards/{test_card.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "deleted successfully" in response.json()["message"].lower()
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_nonexistent_card(client, auth_headers):
|
||||||
|
"""Test deleting a non-existent card."""
|
||||||
|
response = client.delete("/cards/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_cards_for_group(client, auth_headers, test_group, test_card):
|
||||||
|
"""Test getting all cards for a group."""
|
||||||
|
response = client.get(f"/cards/{test_group.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
cards = response.json()
|
||||||
|
assert len(cards) >= 1
|
||||||
|
assert any(card["id"] == test_card.id for card in cards)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_cards_for_nonexistent_group(client, auth_headers):
|
||||||
|
"""Test getting cards for a non-existent group."""
|
||||||
|
response = client.get("/cards/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
cards = response.json()
|
||||||
|
assert len(cards) == 0 # Empty list for non-existent group
|
||||||
|
|
||||||
|
|
||||||
|
def test_card_operations_by_non_admin(client, test_group, user_auth_headers):
|
||||||
|
"""Test that non-admin users cannot perform card operations."""
|
||||||
|
# Try to add a card
|
||||||
|
response = client.post(f"/cards/{test_group.id}", headers=user_auth_headers)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# Try to get cards
|
||||||
|
response = client.get(f"/cards/{test_group.id}", headers=user_auth_headers)
|
||||||
|
assert response.status_code == 403
|
||||||
64
test/test_services/test_database.py
Normal file
64
test/test_services/test_database.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
import pytest
|
||||||
|
from sqlmodel import Session, select
|
||||||
|
from app.services.database import create_db_and_tables, get_session, add_and_refresh
|
||||||
|
from app.model.models import UserDB, GroupDB, Card
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_db_and_tables():
|
||||||
|
"""Test database and tables creation."""
|
||||||
|
# This is primarily an integration test
|
||||||
|
from sqlalchemy import inspect
|
||||||
|
from app.services.database import engine
|
||||||
|
|
||||||
|
create_db_and_tables()
|
||||||
|
inspector = inspect(engine)
|
||||||
|
|
||||||
|
# Check that tables exist
|
||||||
|
tables = inspector.get_table_names()
|
||||||
|
assert "userdb" in tables
|
||||||
|
assert "groupdb" in tables
|
||||||
|
assert "card" in tables
|
||||||
|
assert "accessauthorizationdb" in tables
|
||||||
|
assert "timetable" in tables
|
||||||
|
assert "aagrouplink" in tables
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_session(db_session):
|
||||||
|
"""Test database session generator."""
|
||||||
|
# Test that we can get a session
|
||||||
|
session_gen = get_session()
|
||||||
|
session = next(session_gen)
|
||||||
|
|
||||||
|
assert isinstance(session, Session)
|
||||||
|
|
||||||
|
# Test that session works
|
||||||
|
user = UserDB(name="Test", passwordhash="hash")
|
||||||
|
session.add(user)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
retrieved_user = session.get(UserDB, user.id)
|
||||||
|
assert retrieved_user is not None
|
||||||
|
assert retrieved_user.name == "Test"
|
||||||
|
|
||||||
|
# Clean up generator
|
||||||
|
try:
|
||||||
|
next(session_gen)
|
||||||
|
except StopIteration:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_and_refresh(db_session):
|
||||||
|
"""Test add_and_refresh helper function."""
|
||||||
|
user = UserDB(name="Test User", passwordhash="hashed")
|
||||||
|
|
||||||
|
# Add user
|
||||||
|
result = add_and_refresh(db_session, user)
|
||||||
|
|
||||||
|
# Assert that user is now in database with ID
|
||||||
|
assert result.id is not None
|
||||||
|
assert result.name == "Test User"
|
||||||
|
|
||||||
|
# Verify in database
|
||||||
|
db_user = db_session.get(UserDB, result.id)
|
||||||
|
assert db_user is not None
|
||||||
|
assert db_user.name == "Test User"
|
||||||
68
test/test_services/test_group_manager.py
Normal file
68
test/test_services/test_group_manager.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi import status
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_group(client, auth_headers):
|
||||||
|
"""Test creating a new group."""
|
||||||
|
group_data = {"name": "New Test Group"}
|
||||||
|
|
||||||
|
response = client.post("/groups/", json=group_data, headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["name"] == "New Test Group"
|
||||||
|
assert "id" in data
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_duplicate_group(client, auth_headers, test_group):
|
||||||
|
"""Test creating a group with a duplicate name."""
|
||||||
|
group_data = {"name": test_group.name}
|
||||||
|
|
||||||
|
response = client.post("/groups/", json=group_data, headers=auth_headers)
|
||||||
|
# This should fail due to unique constraint
|
||||||
|
assert response.status_code == 422 # Validation error
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_groups(client, auth_headers, test_group):
|
||||||
|
"""Test retrieving all groups."""
|
||||||
|
response = client.get("/groups/", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
groups = response.json()
|
||||||
|
assert len(groups) >= 1
|
||||||
|
|
||||||
|
group_names = [group["name"] for group in groups]
|
||||||
|
assert test_group.name in group_names
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_group(client, auth_headers, test_group):
|
||||||
|
"""Test deleting a group."""
|
||||||
|
response = client.delete(f"/groups/{test_group.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "deleted successfully" in response.json()["message"].lower()
|
||||||
|
|
||||||
|
# Verify group is deleted
|
||||||
|
response = client.get("/groups/", headers=auth_headers)
|
||||||
|
groups = response.json()
|
||||||
|
assert not any(group["id"] == test_group.id for group in groups)
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_nonexistent_group(client, auth_headers):
|
||||||
|
"""Test deleting a non-existent group."""
|
||||||
|
response = client.delete("/groups/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_group_operations_by_non_admin(client, user_auth_headers):
|
||||||
|
"""Test that non-admin users cannot perform group operations."""
|
||||||
|
# Try to create a group
|
||||||
|
response = client.post(
|
||||||
|
"/groups/",
|
||||||
|
json={"name": "test"},
|
||||||
|
headers=user_auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# Try to get groups
|
||||||
|
response = client.get("/groups/", headers=user_auth_headers)
|
||||||
|
assert response.status_code == 403
|
||||||
150
test/test_services/test_user_manager.py
Normal file
150
test/test_services/test_user_manager.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
import pytest
|
||||||
|
from fastapi import status
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_user(client, auth_headers):
|
||||||
|
"""Test creating a new user."""
|
||||||
|
user_data = {
|
||||||
|
"name": "newuser",
|
||||||
|
"email": "newuser@example.com",
|
||||||
|
"is_admin": False,
|
||||||
|
"password": "newpassword123"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/users/", json=user_data, headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["name"] == "newuser"
|
||||||
|
assert data["email"] == "newuser@example.com"
|
||||||
|
assert data["is_admin"] is False
|
||||||
|
assert "id" in data
|
||||||
|
assert "passwordhash" not in data # Password hash should not be in response
|
||||||
|
|
||||||
|
|
||||||
|
def test_create_user_unauthorized(client):
|
||||||
|
"""Test creating a user without admin credentials."""
|
||||||
|
user_data = {
|
||||||
|
"name": "unauthorized_user",
|
||||||
|
"email": "unauthorized@example.com",
|
||||||
|
"password": "password123"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/users/", json=user_data)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_users(client, auth_headers, admin_user, regular_user):
|
||||||
|
"""Test retrieving all users."""
|
||||||
|
response = client.get("/users/", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
users = response.json()
|
||||||
|
assert len(users) >= 2 # At least admin_user and regular_user
|
||||||
|
|
||||||
|
user_names = [user["name"] for user in users]
|
||||||
|
assert admin_user.name in user_names
|
||||||
|
assert regular_user.name in user_names
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_user_by_id(client, auth_headers, regular_user):
|
||||||
|
"""Test retrieving a specific user by ID."""
|
||||||
|
response = client.get(f"/users/{regular_user.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["id"] == regular_user.id
|
||||||
|
assert data["name"] == regular_user.name
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_nonexistent_user(client, auth_headers):
|
||||||
|
"""Test retrieving a non-existent user."""
|
||||||
|
response = client.get("/users/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
assert "not found" in response.json()["detail"].lower()
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_user(client, auth_headers, regular_user):
|
||||||
|
"""Test updating a user."""
|
||||||
|
update_data = {
|
||||||
|
"name": "updated_name",
|
||||||
|
"email": "updated@example.com"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.patch(
|
||||||
|
f"/users/{regular_user.id}",
|
||||||
|
json=update_data,
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["name"] == "updated_name"
|
||||||
|
assert data["email"] == "updated@example.com"
|
||||||
|
# Unchanged fields should remain the same
|
||||||
|
assert data["is_admin"] == regular_user.is_admin
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_user_password(client, auth_headers, regular_user):
|
||||||
|
"""Test updating a user's password."""
|
||||||
|
update_data = {
|
||||||
|
"password": "new_password_456"
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.patch(
|
||||||
|
f"/users/{regular_user.id}",
|
||||||
|
json=update_data,
|
||||||
|
headers=auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
# Verify password can be used for login
|
||||||
|
login_response = client.post(
|
||||||
|
"/token",
|
||||||
|
data={"username": regular_user.name, "password": "new_password_456"}
|
||||||
|
)
|
||||||
|
assert login_response.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
def test_update_nonexistent_user(client, auth_headers):
|
||||||
|
"""Test updating a non-existent user."""
|
||||||
|
update_data = {"name": "updated"}
|
||||||
|
response = client.patch("/users/99999", json=update_data, headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_user(client, auth_headers, regular_user):
|
||||||
|
"""Test deleting a user."""
|
||||||
|
response = client.delete(f"/users/{regular_user.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "deleted successfully" in response.json()["message"].lower()
|
||||||
|
|
||||||
|
# Verify user is deleted
|
||||||
|
response = client.get(f"/users/{regular_user.id}", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_delete_nonexistent_user(client, auth_headers):
|
||||||
|
"""Test deleting a non-existent user."""
|
||||||
|
response = client.delete("/users/99999", headers=auth_headers)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_user_operations_by_non_admin(client, user_auth_headers):
|
||||||
|
"""Test that non-admin users cannot perform admin operations."""
|
||||||
|
# Try to create a user
|
||||||
|
response = client.post(
|
||||||
|
"/users/",
|
||||||
|
json={"name": "test", "password": "pass"},
|
||||||
|
headers=user_auth_headers
|
||||||
|
)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# Try to get users
|
||||||
|
response = client.get("/users/", headers=user_auth_headers)
|
||||||
|
assert response.status_code == 403
|
||||||
|
|
||||||
|
# Try to delete the admin user (if ID is known)
|
||||||
|
# This would require knowing the admin user ID
|
||||||
|
# response = client.delete(f"/users/{admin_id}", headers=user_auth_headers)
|
||||||
|
# assert response.status_code == 403
|
||||||
Reference in New Issue
Block a user