from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException import pytest from fastapi_modules.fastapi_leaudit.services.impl.rbacAdminServiceImpl import RbacAdminServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.ruleServiceImpl import RuleServiceImpl def test_pick_writable_rule_set_prefers_exact_tenant(): service = RuleServiceImpl() rows = [ {"id": 10, "tenant_code": "PROVINCIAL"}, {"id": 11, "tenant_code": "MZ"}, {"id": 12, "tenant_code": "PUBLIC"}, ] result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": False}) assert result is not None assert int(result["id"]) == 11 def test_pick_writable_rule_set_prefers_user_tenant_even_for_global_role(): service = RuleServiceImpl() rows = [ {"id": 10, "tenant_code": "PROVINCIAL"}, {"id": 11, "tenant_code": "MZ"}, {"id": 12, "tenant_code": "PUBLIC"}, ] result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": True}) assert result is not None assert int(result["id"]) == 11 def test_pick_writable_rule_set_prefers_public_for_global_user_without_tenant(): service = RuleServiceImpl() rows = [ {"id": 10, "tenant_code": "PROVINCIAL"}, {"id": 12, "tenant_code": "PUBLIC"}, ] result = service._pick_writable_rule_set_row(rows, current_user={"tenant_code": None, "is_global": True}) assert result is not None assert int(result["id"]) == 12 def test_pick_writable_rule_set_rejects_public_for_tenant_user(): service = RuleServiceImpl() rows = [ {"id": 12, "tenant_code": "PUBLIC"}, ] try: service._pick_writable_rule_set_row(rows, current_user={"tenant_code": "MZ", "is_global": False}) assert False, "expected LeauditException" except LeauditException as exc: assert exc.status == StatusCodeEnum.HTTP_403_FORBIDDEN def test_assert_version_belongs_to_writable_rule_set_rejects_cross_tenant_publish(): service = RuleServiceImpl() writable_rule_set = {"id": 11, "tenant_code": "MZ"} version_row = {"id": 1001, "rule_set_id": 10} try: service._assert_version_belongs_to_writable_rule_set(version_row, writable_rule_set) assert False, "expected LeauditException" except LeauditException as exc: assert exc.status == StatusCodeEnum.HTTP_403_FORBIDDEN def test_assert_rollback_target_rejects_current_version(): service = RuleServiceImpl() rule_set = {"id": 11, "current_version_id": 1001} version_row = {"id": 1001, "rule_set_id": 11, "status": "published"} try: service._assert_rollback_target(version_row, rule_set) assert False, "expected LeauditException" except LeauditException as exc: assert exc.status == StatusCodeEnum.HTTP_400_BAD_REQUEST def test_assert_rollback_target_rejects_draft_version(): service = RuleServiceImpl() rule_set = {"id": 11, "current_version_id": 1001} version_row = {"id": 1002, "rule_set_id": 11, "status": "draft"} try: service._assert_rollback_target(version_row, rule_set) assert False, "expected LeauditException" except LeauditException as exc: assert exc.status == StatusCodeEnum.HTTP_400_BAD_REQUEST def test_assert_rollback_target_allows_previous_version(): service = RuleServiceImpl() rule_set = {"id": 11, "current_version_id": 1002} version_row = {"id": 1001, "rule_set_id": 11, "status": "deprecated"} service._assert_rollback_target(version_row, rule_set) def test_rule_version_queries_exclude_soft_deleted_versions(): sql_text = str(RuleServiceImpl.GetVersions.__code__.co_consts) assert "rv.deleted_at IS NULL" in sql_text def test_tenant_user_requires_rule_tenant_schema_before_write(): service = RuleServiceImpl() try: service._assert_rule_tenant_schema_ready_for_write( use_tenant_scope=False, current_user={"tenant_code": "JY", "is_global": False}, ) assert False, "expected LeauditException" except LeauditException as exc: assert exc.status == StatusCodeEnum.HTTP_409_CONFLICT def test_global_user_can_write_when_rule_tenant_schema_missing_for_legacy_compatibility(): service = RuleServiceImpl() service._assert_rule_tenant_schema_ready_for_write( use_tenant_scope=False, current_user={"tenant_code": None, "is_global": True}, ) def test_build_tenant_binding_clone_payload_uses_tenant_scope(): service = RuleServiceImpl() payload = service._build_tenant_binding_clone_payload( current_user={"tenant_code": "JY", "tenant_name": "揭阳", "is_global": False}, source_binding={"group_id": 3, "priority": 100, "note": "省级绑定"}, tenant_rule_set_id=88, ) assert payload == { "group_id": 3, "rule_set_id": 88, "tenant_code": "JY", "scope_type": "TENANT", "tenant_name_snapshot": "揭阳", "priority": 100, "note": "由租户规则集派生自动补绑", } def test_legacy_region_for_tenant_scope_uses_tenant_code_to_avoid_old_unique_constraint(): service = RuleServiceImpl() assert service._legacy_region_for_scope("JY", "TENANT") == "JY" assert service._legacy_region_for_scope("PROVINCIAL", "PROVINCIAL") == "default" assert service._legacy_region_for_scope("PUBLIC", "PUBLIC") == "PUBLIC" def test_rbac_manageable_permissions_include_rule_version_lifecycle(): permission_keys = { item["permission_key"] for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS if item["route_path"] == "/rules" } assert "rules:list:read" in permission_keys assert "rules:version_list:read" in permission_keys assert "rules:content:read" in permission_keys assert "rules:validate:execute" in permission_keys assert "rules:version_create:write" in permission_keys assert "rules:publish:write" in permission_keys assert "rules:rollback:write" in permission_keys assert "rules:binding_list:read" in permission_keys assert "rules:binding_create:write" in permission_keys assert "rules:binding_update:write" in permission_keys assert "rules:binding_delete:delete" in permission_keys def test_rbac_rule_group_permissions_are_folded_into_rules_menu(): route_paths = {item["route_path"] for item in RbacAdminServiceImpl._MANAGEABLE_ROUTE_BLUEPRINTS} group_permission_paths = { item["route_path"] for item in RbacAdminServiceImpl._MANAGEABLE_PERMISSION_BLUEPRINTS if item["permission_key"].startswith("evaluation_group:") } assert "/rule-groups" not in route_paths assert group_permission_paths == {"/rules"} def test_user_route_compat_menu_does_not_expose_rule_groups(): service = RbacServiceImpl() routes = service._buildCompatibilityRoutes(["admin"], {"evaluation_group:list:read", "rules:list:read"}) paths = service._collectRoutePaths(routes) rules_route = next(route for route in routes if route.route_path == "/rules") assert "/rule-groups" not in paths assert "evaluation_group:list:read" in rules_route.permissions def test_rbac_seed_cache_reuses_recent_route_map(): service = RbacAdminServiceImpl() route_map = { str(item["route_path"]): index for index, item in enumerate(RbacAdminServiceImpl._MANAGEABLE_ROUTE_BLUEPRINTS, start=1) } service._remember_admin_seed_route_map(route_map) assert service._get_cached_admin_seed_route_map() == route_map def test_permission_cache_is_shared_and_can_invalidate_user(): first = PermissionServiceImpl() second = PermissionServiceImpl() first._permission_cache[12345] = (0.0, ({"rules:list:read"}, set())) assert 12345 in second._permission_cache PermissionServiceImpl.InvalidateUser(12345) assert 12345 not in first._permission_cache assert 12345 not in second._permission_cache @pytest.mark.asyncio async def test_rbac_admin_permission_assertion_uses_permission_service(monkeypatch): service = RbacAdminServiceImpl() checked_permissions: list[tuple[int, str]] = [] async def fake_context(user_id: int): return {"can_manage": True, "is_super_admin": False} async def fake_check_permission(self, user_id: int, permission_key: str): checked_permissions.append((user_id, permission_key)) return permission_key != "rbac:roles:update" monkeypatch.setattr(service, "_getCurrentUserContext", fake_context) monkeypatch.setattr(PermissionServiceImpl, "CheckPermission", fake_check_permission) with pytest.raises(LeauditException) as exc_info: await service._assertPermissions(99, ["rbac:roles:update"]) assert exc_info.value.status == StatusCodeEnum.HTTP_403_FORBIDDEN assert checked_permissions == [(99, "rbac:roles:update")]