Files
leaudit-platform-backend/tests/test_rule_write_scope.py
T

222 lines
7.9 KiB
Python

from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.services.impl.rbacAdminServiceImpl import RbacAdminServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.ruleServiceImpl import RuleServiceImpl
def test_pick_writable_rule_set_prefers_exact_tenant():
service = RuleServiceImpl()
rows = [
{"id": 10, "tenant_code": "PROVINCIAL"},
{"id": 11, "tenant_code": "MZ"},
{"id": 12, "tenant_code": "PUBLIC"},
]
result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": False})
assert result is not None
assert int(result["id"]) == 11
def test_pick_writable_rule_set_prefers_user_tenant_even_for_global_role():
service = RuleServiceImpl()
rows = [
{"id": 10, "tenant_code": "PROVINCIAL"},
{"id": 11, "tenant_code": "MZ"},
{"id": 12, "tenant_code": "PUBLIC"},
]
result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": True})
assert result is not None
assert int(result["id"]) == 11
def test_pick_writable_rule_set_prefers_public_for_global_user_without_tenant():
service = RuleServiceImpl()
rows = [
{"id": 10, "tenant_code": "PROVINCIAL"},
{"id": 12, "tenant_code": "PUBLIC"},
]
result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": None, "is_global": True})
assert result is not None
assert int(result["id"]) == 12
def test_pick_writable_rule_set_rejects_public_for_tenant_user():
service = RuleServiceImpl()
rows = [
{"id": 12, "tenant_code": "PUBLIC"},
]
try:
service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": False})
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_403_FORBIDDEN
def test_assert_version_belongs_to_writable_rule_set_rejects_cross_tenant_publish():
service = RuleServiceImpl()
writable_rule_set = {"id": 11, "tenant_code": "MZ"}
version_row = {"id": 1001, "rule_set_id": 10}
try:
service._assert_version_belongs_to_writable_rule_set(version_row, writable_rule_set)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_403_FORBIDDEN
def test_assert_rollback_target_rejects_current_version():
service = RuleServiceImpl()
rule_set = {"id": 11, "current_version_id": 1001}
version_row = {"id": 1001, "rule_set_id": 11, "status": "published"}
try:
service._assert_rollback_target(version_row, rule_set)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_400_BAD_REQUEST
def test_assert_rollback_target_rejects_draft_version():
service = RuleServiceImpl()
rule_set = {"id": 11, "current_version_id": 1001}
version_row = {"id": 1002, "rule_set_id": 11, "status": "draft"}
try:
service._assert_rollback_target(version_row, rule_set)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_400_BAD_REQUEST
def test_assert_rollback_target_allows_previous_version():
service = RuleServiceImpl()
rule_set = {"id": 11, "current_version_id": 1002}
version_row = {"id": 1001, "rule_set_id": 11, "status": "deprecated"}
service._assert_rollback_target(version_row, rule_set)
def test_tenant_user_requires_rule_tenant_schema_before_write():
service = RuleServiceImpl()
try:
service._assert_rule_tenant_schema_ready_for_write(
use_tenant_scope=False,
current_user={"tenant_code": "JY", "is_global": False},
)
assert False, "expected LeauditException"
except LeauditException as exc:
assert exc.status == StatusCodeEnum.HTTP_409_CONFLICT
def test_global_user_can_write_when_rule_tenant_schema_missing_for_legacy_compatibility():
service = RuleServiceImpl()
service._assert_rule_tenant_schema_ready_for_write(
use_tenant_scope=False,
current_user={"tenant_code": None, "is_global": True},
)
def test_build_tenant_binding_clone_payload_uses_tenant_scope():
service = RuleServiceImpl()
payload = service._build_tenant_binding_clone_payload(
current_user={"tenant_code": "JY", "tenant_name": "揭阳", "is_global": False},
source_binding={"group_id": 3, "priority": 100, "note": "省级绑定"},
tenant_rule_set_id=88,
)
assert payload == {
"group_id": 3,
"rule_set_id": 88,
"tenant_code": "JY",
"scope_type": "TENANT",
"tenant_name_snapshot": "揭阳",
"priority": 100,
"note": "由租户规则集派生自动补绑",
}
def test_legacy_region_for_tenant_scope_uses_tenant_code_to_avoid_old_unique_constraint():
service = RuleServiceImpl()
assert service._legacy_region_for_scope("JY", "TENANT") == "JY"
assert service._legacy_region_for_scope("PROVINCIAL", "PROVINCIAL") == "default"
assert service._legacy_region_for_scope("PUBLIC", "PUBLIC") == "PUBLIC"
def test_rbac_manageable_permissions_include_rule_version_lifecycle():
permission_keys = {
item["permission_key"]
for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS
if item["route_path"] == "/rules"
}
assert "rules:list:read" in permission_keys
assert "rules:version_list:read" in permission_keys
assert "rules:content:read" in permission_keys
assert "rules:validate:execute" in permission_keys
assert "rules:version_create:write" in permission_keys
assert "rules:publish:write" in permission_keys
assert "rules:rollback:write" in permission_keys
assert "rules:binding_list:read" in permission_keys
assert "rules:binding_create:write" in permission_keys
assert "rules:binding_update:write" in permission_keys
assert "rules:binding_delete:delete" in permission_keys
def test_rbac_rule_group_permissions_are_folded_into_rules_menu():
route_paths = {item["route_path"] for item in RbacAdminServiceImpl._MANAGEABLE_ROUTE_BLUEPRINTS}
group_permission_paths = {
item["route_path"]
for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS
if item["permission_key"].startswith("evaluation_group:")
}
assert "/rule-groups" not in route_paths
assert group_permission_paths == {"/rules"}
def test_user_route_compat_menu_does_not_expose_rule_groups():
service = RbacServiceImpl()
routes = service._buildCompatibilityRoutes(["admin"], {"evaluation_group:list:read", "rules:list:read"})
paths = service._collectRoutePaths(routes)
rules_route = next(route for route in routes if route.route_path == "/rules")
assert "/rule-groups" not in paths
assert "evaluation_group:list:read" in rules_route.permissions
def test_rbac_seed_cache_reuses_recent_route_map():
service = RbacAdminServiceImpl()
route_map = {
str(item["route_path"]): index
for index, item in enumerate(RbacAdminServiceImpl._MANAGEABLE_ROUTE_BLUEPRINTS, start=1)
}
service._remember_admin_seed_route_map(route_map)
assert service._get_cached_admin_seed_route_map() == route_map
def test_permission_cache_is_shared_and_can_invalidate_user():
first = PermissionServiceImpl()
second = PermissionServiceImpl()
first._permission_cache[12345] = (0.0, ({"rules:list:read"}, set()))
assert 12345 in second._permission_cache
PermissionServiceImpl.InvalidateUser(12345)
assert 12345 not in first._permission_cache
assert 12345 not in second._permission_cache