Files
leaudit-platform-backend/tests/test_rule_write_scope.py
T

289 lines
11 KiB
Python

from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
import pytest
from fastapi_modules.fastapi_leaudit.services.impl.rbacAdminServiceImpl import RbacAdminServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.ruleServiceImpl import RuleServiceImpl
def test_pick_writable_rule_set_prefers_exact_tenant():
service = RuleServiceImpl()
rows = [
{"id": 10, "tenant_code": "PROVINCIAL"},
{"id": 11, "tenant_code": "MZ"},
{"id": 12, "tenant_code": "PUBLIC"},
]
result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": False})
assert result is not None
assert int(result["id"]) == 11
def test_pick_writable_rule_set_prefers_user_tenant_even_for_global_role():
service = RuleServiceImpl()
rows = [
{"id": 10, "tenant_code": "PROVINCIAL"},
{"id": 11, "tenant_code": "MZ"},
{"id": 12, "tenant_code": "PUBLIC"},
]
result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": True})
assert result is not None
assert int(result["id"]) == 11
def test_pick_writable_rule_set_prefers_public_for_global_user_without_tenant():
service = RuleServiceImpl()
rows = [
{"id": 10, "tenant_code": "PROVINCIAL"},
{"id": 12, "tenant_code": "PUBLIC"},
]
result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": None, "is_global": True})
assert result is not None
assert int(result["id"]) == 12
def test_pick_writable_rule_set_rejects_public_for_tenant_user():
service = RuleServiceImpl()
rows = [
{"id": 12, "tenant_code": "PUBLIC"},
]
try:
service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": False})
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_403_FORBIDDEN
def test_assert_version_belongs_to_writable_rule_set_rejects_cross_tenant_publish():
service = RuleServiceImpl()
writable_rule_set = {"id": 11, "tenant_code": "MZ"}
version_row = {"id": 1001, "rule_set_id": 10}
try:
service._assert_version_belongs_to_writable_rule_set(version_row, writable_rule_set)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_403_FORBIDDEN
def test_assert_rollback_target_rejects_current_version():
service = RuleServiceImpl()
rule_set = {"id": 11, "current_version_id": 1001}
version_row = {"id": 1001, "rule_set_id": 11, "status": "published"}
try:
service._assert_rollback_target(version_row, rule_set)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_400_BAD_REQUEST
def test_assert_rollback_target_rejects_draft_version():
service = RuleServiceImpl()
rule_set = {"id": 11, "current_version_id": 1001}
version_row = {"id": 1002, "rule_set_id": 11, "status": "draft"}
try:
service._assert_rollback_target(version_row, rule_set)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_400_BAD_REQUEST
def test_assert_rollback_target_allows_previous_version():
service = RuleServiceImpl()
rule_set = {"id": 11, "current_version_id": 1002}
version_row = {"id": 1001, "rule_set_id": 11, "status": "deprecated"}
service._assert_rollback_target(version_row, rule_set)
def test_rule_version_queries_exclude_soft_deleted_versions():
sql_text = str(RuleServiceImpl.GetVersions.__code__.co_consts)
assert "rv.deleted_at IS NULL" in sql_text
def test_tenant_user_requires_rule_tenant_schema_before_write():
service = RuleServiceImpl()
try:
service._assert_rule_tenant_schema_ready_for_write(
use_tenant_scope=False,
current_user={"tenant_code": "JY", "is_global": False},
)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_409_CONFLICT
def test_global_user_can_write_when_rule_tenant_schema_missing_for_legacy_compatibility():
service = RuleServiceImpl()
service._assert_rule_tenant_schema_ready_for_write(
use_tenant_scope=False,
current_user={"tenant_code": None, "is_global": True},
)
def test_build_tenant_binding_clone_payload_uses_tenant_scope():
service = RuleServiceImpl()
payload = service._build_tenant_binding_clone_payload(
current_user={"tenant_code": "JY", "tenant_name": "揭阳", "is_global": False},
source_binding={"group_id": 3, "priority": 100, "note": "省级绑定"},
tenant_rule_set_id=88,
)
assert payload == {
"group_id": 3,
"rule_set_id": 88,
"tenant_code": "JY",
"scope_type": "TENANT",
"tenant_name_snapshot": "揭阳",
"priority": 100,
"note": "由租户规则集派生自动补绑",
}
def test_legacy_region_for_tenant_scope_uses_tenant_code_to_avoid_old_unique_constraint():
service = RuleServiceImpl()
assert service._legacy_region_for_scope("JY", "TENANT") == "JY"
assert service._legacy_region_for_scope("PROVINCIAL", "PROVINCIAL") == "default"
assert service._legacy_region_for_scope("PUBLIC", "PUBLIC") == "PUBLIC"
def test_rbac_manageable_permissions_include_rule_version_lifecycle():
permission_keys = {
item["permission_key"]
for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS
if item["route_path"] == "/rules"
}
assert "evaluation_point:list:read" in permission_keys
assert "rules:list:read" in permission_keys
assert "rules:version_list:read" in permission_keys
assert "rules:content:read" in permission_keys
assert "rules:validate:execute" in permission_keys
assert "rules:version_create:write" in permission_keys
assert "rules:publish:write" in permission_keys
assert "rules:rollback:write" in permission_keys
assert "rules:binding_list:read" in permission_keys
assert "rules:binding_create:write" in permission_keys
assert "rules:binding_update:write" in permission_keys
assert "rules:binding_delete:delete" in permission_keys
def test_rbac_rule_groups_route_is_exposed_under_settings():
route_paths = {item["route_path"] for item in RbacAdminServiceImpl._MANAGEABLE_ROUTE_BLUEPRINTS}
group_permission_paths = {
item["route_path"]
for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS
if item["permission_key"].startswith("evaluation_group:")
}
assert "/rule-groups" in route_paths
assert group_permission_paths == {"/rules"}
def test_user_route_compat_menu_exposes_rule_groups_under_settings():
service = RbacServiceImpl()
routes = service._buildCompatibilityRoutes(["admin"], {"evaluation_group:list:read", "rules:list:read"})
paths = service._collectRoutePaths(routes)
rules_route = next(route for route in routes if route.route_path == "/rules")
settings_route = next(route for route in routes if route.route_path == "/settings")
rule_groups_route = next(route for route in (settings_route.children or []) if route.route_path == "/rule-groups")
assert "/rule-groups" in paths
assert rule_groups_route.parent_id == settings_route.id
assert "evaluation_group:list:read" in rules_route.permissions
def test_rbac_seed_cache_reuses_recent_route_map():
service = RbacAdminServiceImpl()
route_map = {
str(item["route_path"]): index
for index, item in enumerate(RbacAdminServiceImpl._MANAGEABLE_ROUTE_BLUEPRINTS, start=1)
}
service._remember_admin_seed_route_map(route_map)
assert service._get_cached_admin_seed_route_map() == route_map
@pytest.mark.asyncio
async def test_rbac_rejects_lower_role_editing_higher_role_permissions(monkeypatch):
service = RbacAdminServiceImpl()
async def fake_context(_current_user_id):
return {"is_super_admin": False, "max_role_priority": 50}
async def fake_role_row(_session, _role_id):
return {"id": 1, "priority": 90}
monkeypatch.setattr(service, "_getCurrentUserContext", fake_context)
monkeypatch.setattr(service, "_getRoleRow", fake_role_row)
with pytest.raises(LeauditException) as exc:
await service._assertCanManageTargetRole(None, 100, 1)
assert exc.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN
@pytest.mark.asyncio
async def test_rbac_allows_super_admin_editing_higher_role_permissions(monkeypatch):
service = RbacAdminServiceImpl()
async def fake_context(_current_user_id):
return {"is_super_admin": True, "max_role_priority": 100}
async def fail_if_called(_session, _role_id):
raise AssertionError("super_admin should bypass target role lookup")
monkeypatch.setattr(service, "_getCurrentUserContext", fake_context)
monkeypatch.setattr(service, "_getRoleRow", fail_if_called)
await service._assertCanManageTargetRole(None, 100, 1)
def test_permission_cache_is_shared_and_can_invalidate_user():
first = PermissionServiceImpl()
second = PermissionServiceImpl()
first._permission_cache[12345] = (0.0, ({"rules:list:read"}, set()))
assert 12345 in second._permission_cache
PermissionServiceImpl.InvalidateUser(12345)
assert 12345 not in first._permission_cache
assert 12345 not in second._permission_cache
@pytest.mark.asyncio
async def test_rbac_admin_permission_assertion_uses_permission_service(monkeypatch):
service = RbacAdminServiceImpl()
checked_permissions: list[tuple[int, str]] = []
async def fake_context(user_id: int):
return {"can_manage": True, "is_super_admin": False}
async def fake_check_permission(self, user_id: int, permission_key: str):
checked_permissions.append((user_id, permission_key))
return permission_key != "rbac:roles:update"
monkeypatch.setattr(service, "_getCurrentUserContext", fake_context)
monkeypatch.setattr(PermissionServiceImpl, "CheckPermission", fake_check_permission)
with pytest.raises(LeauditException) as exc_info:
await service._assertPermissions(99, ["rbac:roles:update"])
assert exc_info.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN
assert checked_permissions == [(99, "rbac:roles:update")]