Files
leaudit-platform-backend/docs/权限与地区隔离/统一执行器落地代码骨架与接入示例.md

1151 lines
28 KiB
Markdown

# 统一执行器落地代码骨架与接入示例
> 适用范围:`leaudit-platform` 后端权限改造实施阶段
> 文档定位:把“统一数据范围执行器设计”进一步落到代码骨架、接口签名、目录建议和模块接入示例层面,供研发直接照着实施。
---
## 1. 文档目标
前面几份文档已经回答了:
- 为什么要做统一执行器
- 哪些接口会受影响
- 应该按什么阶段实施
- 字段和 SQL 应该如何规范化
这一份只解决一个问题:
后端代码到底怎么改,才能在不推翻现有 service 结构的前提下,把统一执行器落进去。
核心要求:
1. 尽量兼容现有 FastAPI + service 分层风格
2. 优先通过“新增能力 + 渐进替换”推进
3. 先不大改 controller 签名
4. 先把 service 内部角色判断替换掉
---
## 2. 当前代码风格约束
从现有代码看,后端基本遵循:
1. `Controller`
- 负责接 JWT payload
- 负责做基础 permission 判断
- 调用 `ServiceImpl`
2. `Service`
- 负责业务逻辑
- 直接写 SQL / 组装查询
3. `PermissionServiceImpl`
- 当前仅提供布尔权限判断
因此统一执行器落地时,不建议第一步就做成特别重的 AOP 或 decorator 体系。
更适合的落地方式是:
1. 先保留 controller 现状
2. 在 service 内引入统一 `PermissionDecisionService`
3. 逐步把旧的 `_getCurrentUserContext``_build...Filters` 替换掉
---
## 3. 推荐目录结构
建议新增:
```text
fastapi_modules/fastapi_leaudit/services/permission_scope/
__init__.py
models.py
enums.py
exceptions.py
repositories/
__init__.py
permissionGrantRepository.py
resourceScopeRepository.py
resolvers/
__init__.py
dataScopeResolver.py
permissionDecisionService.py
queryScopeBuilder.py
policies/
__init__.py
base.py
documentPolicy.py
govdocPolicy.py
usageStatsPolicy.py
ragPolicy.py
crossReviewPolicy.py
rbacAdminPolicy.py
contractTemplatePolicy.py
facade/
__init__.py
scopeAwarePermissionFacade.py
```
说明:
- `repositories` 负责查授权和资源归属
- `resolvers` 负责算决策
- `policies` 负责模块特例
- `facade` 负责给业务 service 暴露统一入口
---
## 4. 建议新增核心对象
## 4.1 `models.py`
建议定义:
```python
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class ScopeContext:
user_id: int
permission_key: str
module: str
action: str
user_area: str | None
request_area: str | None = None
target_user_id: int | None = None
resource_id: int | None = None
route_path: str | None = None
extra: dict[str, Any] = field(default_factory=dict)
@dataclass(slots=True)
class PermissionGrant:
role_id: int
role_key: str
permission_key: str
grant_type: str
role_scope: str | None
permission_scope: str | None
priority: int
is_system_role: bool
condition_filter: dict[str, Any] | None = None
@dataclass(slots=True)
class PermissionDecision:
allowed: bool
deny_reason: str | None
effective_scope: str | None
scope_mode: str
allowed_areas: list[str] = field(default_factory=list)
allowed_user_ids: list[int] = field(default_factory=list)
allow_public: bool = False
matched_roles: list[str] = field(default_factory=list)
matched_permissions: list[str] = field(default_factory=list)
conditions: dict[str, Any] = field(default_factory=dict)
@dataclass(slots=True)
class ScopeFieldMapping:
area_field: str | None = None
creator_field: str | None = None
owner_field: str | None = None
user_field: str | None = None
public_field: str | None = None
@dataclass(slots=True)
class ScopeClause:
sql: str
params: dict[str, Any]
scope_mode: str
description: str
```
## 4.2 `enums.py`
建议定义:
```python
class ScopeMode:
NONE = "NONE"
ALL = "ALL"
DEPT = "DEPT"
SELF = "SELF"
RELATION = "RELATION"
PUBLIC_MIXED = "PUBLIC_MIXED"
CUSTOM = "CUSTOM"
```
---
## 5. 推荐新增权限服务能力
## 5.1 不直接推翻 `IPermissionService`
当前接口是:
- `CheckPermission`
- `HasAnyPermission`
- `HasAllPermissions`
不建议直接改掉这 3 个方法,否则牵涉面太广。
建议做法:
1. 保留原接口
2. 额外新增一个 scope 感知服务接口
## 5.2 推荐新增接口
建议新增文件:
`fastapi_modules/fastapi_leaudit/services/permissionScopeService.py`
```python
from abc import ABC, abstractmethod
from fastapi_modules.fastapi_leaudit.services.permission_scope.models import PermissionDecision, ScopeContext
class IPermissionScopeService(ABC):
@abstractmethod
async def Decide(self, Context: ScopeContext) -> PermissionDecision:
...
@abstractmethod
async def Require(self, Context: ScopeContext) -> PermissionDecision:
...
```
说明:
- `Decide()` 返回决策,不抛错
- `Require()` 返回决策,若拒绝直接抛 `403`
## 5.3 为什么不直接塞进 `IPermissionService`
因为当前 `IPermissionService` 的语义非常清晰:
- 它是“权限布尔检查器”
如果直接把 scope 决策一起塞进去:
- 现有 controller 依赖会变混乱
- service 层也很难渐进迁移
更合理的做法是:
- `PermissionServiceImpl` 继续做功能权限布尔校验
- `PermissionScopeServiceImpl` 负责完整决策
---
## 6. Repository 骨架
## 6.1 `permissionGrantRepository.py`
职责:
- 加载用户所有 role / permission grant 明细
建议接口:
```python
class PermissionGrantRepository:
async def ListUserPermissionGrants(self, UserId: int, PermissionKey: str) -> list[PermissionGrant]:
...
async def GetUserBaseIdentity(self, UserId: int) -> dict[str, Any]:
...
```
推荐 SQL 思路:
```sql
SELECT
r.id AS role_id,
r.role_key,
p.permission_key,
rp.grant_type,
r.data_scope AS role_scope,
rp.data_scope AS permission_scope,
COALESCE(r.priority, 0) AS priority,
COALESCE(r.is_system_role, FALSE) AS is_system_role,
rp.condition_filter
FROM user_role ur
JOIN roles r ON r.id = ur.role_id
JOIN role_permissions rp ON rp.role_id = r.id
JOIN permissions p ON p.id = rp.permission_id
WHERE ur.user_id = :user_id
```
然后在 Python 层执行:
- 精确匹配
- wildcard 匹配
- `DENY/GRANT` 分类
## 6.2 `resourceScopeRepository.py`
职责:
- 为详情、下载、删除、导出类接口提供主资源回溯
建议接口:
```python
class ResourceScopeRepository:
async def GetDocumentScopeSnapshot(self, DocumentId: int) -> dict[str, Any] | None: ...
async def GetGovdocRunScopeSnapshot(self, RunId: int) -> dict[str, Any] | None: ...
async def GetRagDatasetScopeSnapshot(self, DatasetId: int) -> dict[str, Any] | None: ...
async def GetCrossReviewProposalScopeSnapshot(self, ProposalId: int) -> dict[str, Any] | None: ...
```
这些方法不是做最终权限判断,而是提供:
- 主资源地区
- 主资源创建者
- 关联任务/数据集/文档 ID
供 policy 判断使用。
---
## 7. Resolver 骨架
## 7.1 `dataScopeResolver.py`
职责:
-`PermissionGrant` 做范围合并
建议骨架:
```python
class DataScopeResolver:
_SCOPE_RANK = {
"SELF": 1,
"DEPT": 2,
"ALL": 3,
}
def resolve(self, grants: list[PermissionGrant]) -> tuple[bool, str | None, list[str]]:
exact_denies = [item for item in grants if item.grant_type == "DENY"]
if exact_denies:
return False, None, []
effective_scope = None
matched_roles: list[str] = []
for item in grants:
if item.grant_type != "GRANT":
continue
scope = item.permission_scope or item.role_scope or "SELF"
if effective_scope is None or self._SCOPE_RANK.get(scope, 0) > self._SCOPE_RANK.get(effective_scope, 0):
effective_scope = scope
matched_roles.append(item.role_key)
return True, effective_scope, matched_roles
```
说明:
- 第一版先解决 `ALL/DEPT/SELF`
- `PUBLIC_MIXED``RELATION` 不从库里直接出,而由 `ModulePolicy` 生成
## 7.2 `permissionDecisionService.py`
职责:
- 串联 grant repository、scope resolver、module policy
建议骨架:
```python
class PermissionDecisionService:
def __init__(
self,
grantRepository: PermissionGrantRepository,
scopeResolver: DataScopeResolver,
policyRegistry: ModulePolicyRegistry,
) -> None:
self._grantRepository = grantRepository
self._scopeResolver = scopeResolver
self._policyRegistry = policyRegistry
async def Decide(self, Context: ScopeContext) -> PermissionDecision:
grants = await self._grantRepository.ListUserPermissionGrants(Context.user_id, Context.permission_key)
allowed, effective_scope, matched_roles = self._scopeResolver.resolve(grants)
if not allowed:
return PermissionDecision(
allowed=False,
deny_reason="permission_denied",
effective_scope=None,
scope_mode="NONE",
)
base_decision = PermissionDecision(
allowed=True,
deny_reason=None,
effective_scope=effective_scope,
scope_mode=effective_scope or "SELF",
matched_roles=matched_roles,
matched_permissions=[Context.permission_key],
)
policy = self._policyRegistry.Get(Context.module)
return await policy.Decorate(Context, base_decision)
```
---
## 8. QueryScopeBuilder 骨架
## 8.1 核心职责
`PermissionDecision` 转成可直接拼接到 SQL 的子句。
## 8.2 建议骨架
```python
class QueryScopeBuilder:
def BuildByMapping(
self,
Decision: PermissionDecision,
Mapping: ScopeFieldMapping,
ScopeUserId: int,
ScopeArea: str | None,
RequestedArea: str | None = None,
RequestedUserId: int | None = None,
) -> ScopeClause:
if not Decision.allowed:
return ScopeClause(sql="1 = 0", params={}, scope_mode="NONE", description="permission denied")
if Decision.scope_mode == "ALL":
if RequestedArea and Mapping.area_field:
return ScopeClause(
sql=f"COALESCE({Mapping.area_field}, '') = :requested_area",
params={"requested_area": RequestedArea},
scope_mode="ALL",
description="global scope with requested area",
)
return ScopeClause(sql="1 = 1", params={}, scope_mode="ALL", description="global scope")
if Decision.scope_mode == "DEPT":
if not ScopeArea:
return ScopeClause(sql="1 = 0", params={}, scope_mode="DEPT", description="missing user area")
if RequestedArea and RequestedArea != ScopeArea:
return ScopeClause(sql="1 = 0", params={}, scope_mode="DEPT", description="requested area out of scope")
return ScopeClause(
sql=f"COALESCE({Mapping.area_field}, '') = :scope_area",
params={"scope_area": ScopeArea},
scope_mode="DEPT",
description="same area scope",
)
if Decision.scope_mode == "SELF":
user_field = Mapping.creator_field or Mapping.owner_field or Mapping.user_field
if not user_field:
return ScopeClause(sql="1 = 0", params={}, scope_mode="SELF", description="missing self mapping")
if RequestedUserId is not None and RequestedUserId != ScopeUserId:
return ScopeClause(sql="1 = 0", params={}, scope_mode="SELF", description="requested user out of scope")
return ScopeClause(
sql=f"{user_field} = :scope_user_id",
params={"scope_user_id": ScopeUserId},
scope_mode="SELF",
description="self scope",
)
return ScopeClause(sql="1 = 0", params={}, scope_mode="CUSTOM", description="unsupported generic scope")
```
---
## 9. Policy 骨架
## 9.1 基础接口
建议文件:`policies/base.py`
```python
from abc import ABC, abstractmethod
from fastapi_modules.fastapi_leaudit.services.permission_scope.models import PermissionDecision, ScopeClause, ScopeContext
class BaseModulePolicy(ABC):
@abstractmethod
async def Decorate(self, Context: ScopeContext, Decision: PermissionDecision) -> PermissionDecision:
...
@abstractmethod
async def BuildClause(self, Context: ScopeContext, Decision: PermissionDecision) -> ScopeClause | None:
...
```
## 9.2 `DocumentPolicy`
建议职责:
1. 保持 `ALL/DEPT/SELF`
2. 固定文档模块字段映射为:
- `d.region`
- `f.created_by`
3. 对详情、状态、附件、确认等接口复用同一资源规则
建议骨架:
```python
class DocumentPolicy(BaseModulePolicy):
def __init__(self, Builder: QueryScopeBuilder) -> None:
self._builder = Builder
async def Decorate(self, Context: ScopeContext, Decision: PermissionDecision) -> PermissionDecision:
return Decision
async def BuildClause(self, Context: ScopeContext, Decision: PermissionDecision) -> ScopeClause:
return self._builder.BuildByMapping(
Decision=Decision,
Mapping=ScopeFieldMapping(area_field="d.region", creator_field="f.created_by"),
ScopeUserId=Context.user_id,
ScopeArea=Context.user_area,
RequestedArea=Context.request_area,
RequestedUserId=Context.target_user_id,
)
```
## 9.3 `RagPolicy`
建议职责:
1. 读接口转为 `PUBLIC_MIXED`
2. 管理接口仍回到 `ALL/DEPT/SELF`
建议骨架:
```python
class RagPolicy(BaseModulePolicy):
def __init__(self, Builder: QueryScopeBuilder) -> None:
self._builder = Builder
async def Decorate(self, Context: ScopeContext, Decision: PermissionDecision) -> PermissionDecision:
if Context.permission_key in {"rag:app:read", "rag:chat:use", "rag:dataset:read"}:
Decision.scope_mode = "PUBLIC_MIXED"
Decision.allow_public = True
Decision.allowed_areas = [area for area in [Context.user_area, "省级", ""] if area is not None]
return Decision
async def BuildClause(self, Context: ScopeContext, Decision: PermissionDecision) -> ScopeClause:
if Decision.scope_mode == "PUBLIC_MIXED":
return ScopeClause(
sql="(COALESCE(dataset.area, '') IN :visible_areas OR dataset.is_public = TRUE)",
params={"visible_areas": Decision.allowed_areas},
scope_mode="PUBLIC_MIXED",
description="rag public mixed scope",
)
return self._builder.BuildByMapping(
Decision=Decision,
Mapping=ScopeFieldMapping(area_field="dataset.area", creator_field="dataset.created_by", public_field="dataset.is_public"),
ScopeUserId=Context.user_id,
ScopeArea=Context.user_area,
RequestedArea=Context.request_area,
)
```
## 9.4 `CrossReviewPolicy`
建议职责:
- 把普通 `SELF/DEPT` 转成关系型 `RELATION`
示意:
```python
class CrossReviewPolicy(BaseModulePolicy):
async def Decorate(self, Context: ScopeContext, Decision: PermissionDecision) -> PermissionDecision:
if Decision.allowed:
Decision.scope_mode = "RELATION"
return Decision
async def BuildClause(self, Context: ScopeContext, Decision: PermissionDecision) -> ScopeClause:
return ScopeClause(
sql=(
"EXISTS ("
"SELECT 1 FROM leaudit_cross_review_task_member tm "
"WHERE tm.task_id = task.id "
"AND tm.user_id = :scope_user_id "
"AND tm.deleted_at IS NULL"
")"
),
params={"scope_user_id": Context.user_id},
scope_mode="RELATION",
description="cross review relation scope",
)
```
---
## 10. Facade 骨架
建议文件:`facade/scopeAwarePermissionFacade.py`
职责:
给各个业务 service 一个最直接的调用入口。
```python
class ScopeAwarePermissionFacade:
def __init__(
self,
DecisionService: PermissionDecisionService,
PolicyRegistry: ModulePolicyRegistry,
) -> None:
self._decisionService = DecisionService
self._policyRegistry = PolicyRegistry
async def Require(self, Context: ScopeContext) -> tuple[PermissionDecision, ScopeClause | None]:
decision = await self._decisionService.Decide(Context)
if not decision.allowed:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前用户没有访问权限")
policy = self._policyRegistry.Get(Context.module)
clause = await policy.BuildClause(Context, decision)
return decision, clause
```
这样业务 service 的接入就会很轻:
```python
decision, scope_clause = await self.ScopeFacade.Require(...)
```
---
## 11. Controller 接入策略
## 11.1 第一阶段不强制改 controller
当前 controller 大多已经有:
- `verify_access_token`
- `PermissionService.CheckPermission()`
第一阶段不建议全部改掉。
建议策略:
1. controller 继续做功能权限预筛
2. service 内再做统一 scope 决策
原因:
- 改动更可控
- 避免一次性动太多 controller
## 11.2 第二阶段再统一 controller
等统一执行器稳定后,可以进一步做:
- controller 不再自行 `CheckPermission`
- 改为 service 内一站式处理
但这不是第一批落地的必要条件。
---
## 12. `documentServiceImpl` 接入示例
## 12.1 当前问题
当前文档模块有:
- `_getCurrentUserContext`
- `_buildDocumentScopeFilters`
它们本质上就是统一执行器的前身,但还停留在角色硬编码层。
## 12.2 推荐替换方式
### 原有思路
```python
currentUser = await self._getCurrentUserContext(CurrentUserId)
filters = self._buildDocumentScopeFilters(...)
```
### 改造后思路
```python
context = ScopeContext(
user_id=CurrentUserId,
permission_key="documents:list:read",
module="documents",
action="read",
user_area=payload_area,
request_area=Region,
target_user_id=UserId,
)
decision, scope_clause = await self.ScopeFacade.Require(context)
```
然后在 SQL 中:
```python
filters = [
"d.deleted_at IS NULL",
"f.deleted_at IS NULL",
scope_clause.sql,
]
params = {
**scope_clause.params,
}
```
## 12.3 示例骨架
```python
async def ListDocuments(...):
identity = await self.PermissionGrantRepository.GetUserBaseIdentity(CurrentUserId)
context = ScopeContext(
user_id=CurrentUserId,
permission_key="documents:list:read",
module="documents",
action="read",
user_area=identity.get("area"),
request_area=Region,
target_user_id=UserId,
)
_, scope_clause = await self.ScopeFacade.Require(context)
where_clauses = [
"d.deleted_at IS NULL",
"f.deleted_at IS NULL",
scope_clause.sql,
]
params: dict[str, Any] = {
**scope_clause.params,
}
if Keyword:
where_clauses.append("d.document_name ILIKE :keyword")
params["keyword"] = f"%{Keyword}%"
sql = f"""
SELECT ...
FROM leaudit_documents d
JOIN leaudit_document_files f ON f.document_id = d.id
WHERE {' AND '.join(where_clauses)}
"""
```
## 12.4 详情接口示例
```python
async def GetDocument(self, CurrentUserId: int, Id: int) -> DocumentDetailVO:
identity = await self.PermissionGrantRepository.GetUserBaseIdentity(CurrentUserId)
context = ScopeContext(
user_id=CurrentUserId,
permission_key="documents:detail:read",
module="documents",
action="read",
user_area=identity.get("area"),
resource_id=Id,
)
_, scope_clause = await self.ScopeFacade.Require(context)
row = (
await session.execute(
text(
f"""
SELECT ...
FROM leaudit_documents d
JOIN leaudit_document_files f ON f.document_id = d.id
WHERE d.id = :document_id
AND {scope_clause.sql}
"""
),
{"document_id": Id, **scope_clause.params},
)
).mappings().first()
```
重点:
- 把 scope 直接写进详情查询
- 不再“先查后判”
---
## 13. `govdocServiceImpl` 接入示例
## 13.1 当前问题
公文模块里最大风险不是列表,而是:
- `runId`
- `report/docx`
- `original`
这些派生接口。
## 13.2 推荐接法
先统一一个内部方法:
```python
async def _getScopedGovdocDocument(
self,
CurrentUserId: int,
PermissionKey: str,
DocumentId: int,
) -> dict[str, Any]:
...
```
所有:
- `GetDocumentDetail`
- `CreateRun`
- `DownloadOriginal`
都先用它。
### 对 `runId` 类接口
新增一个仓储回溯:
```python
snapshot = await self.ResourceScopeRepository.GetGovdocRunScopeSnapshot(runId)
document_id = snapshot["document_id"]
document = await self._getScopedGovdocDocument(CurrentUserId, "govdoc:run:read", document_id)
```
这样所有 run 结果、报告下载都回到 document scope。
---
## 14. `ragDatasetServiceImpl` 接入示例
## 14.1 当前问题
当前签名大量透传:
- `UserArea`
- `UserRole`
这就是旧架构的直接暴露。
## 14.2 第一阶段兼容策略
第一阶段不需要立刻改所有接口签名。
可以先保留:
```python
async def GetAdminDatasets(self, CurrentUserId, UserArea, UserRole, Area, OnlyEnabled, Page, PageSize)
```
但内部不再使用 `UserRole`
### 示例
```python
async def GetAdminDatasets(...):
context = ScopeContext(
user_id=CurrentUserId,
permission_key="rag:dataset:manage",
module="rag",
action="read",
user_area=UserArea,
request_area=Area,
)
_, scope_clause = await self.ScopeFacade.Require(context)
```
然后把旧逻辑:
```python
if UserRole not in ("provincial_admin", "admin", "super_admin"):
raise ...
```
全部删掉。
## 14.3 第二阶段签名优化
稳定后再把 service 接口签名收敛为:
```python
async def GetAdminDatasets(self, CurrentUserId: int, Area: str | None, OnlyEnabled: bool | None, Page: int, PageSize: int)
```
即:
- `UserArea`
- `UserRole`
不再从 controller 显式透传。
而是在 service 内统一从 identity/repository 获取。
## 14.4 数据集子资源示例
```python
async def GetDatasetDocumentDetail(...):
identity = await self.PermissionGrantRepository.GetUserBaseIdentity(CurrentUserId)
context = ScopeContext(
user_id=CurrentUserId,
permission_key="rag:dataset:read",
module="rag",
action="read",
user_area=identity.get("area"),
resource_id=DatasetId,
)
_, dataset_scope_clause = await self.ScopeFacade.Require(context)
sql = f"""
SELECT ...
FROM rag_datasets dataset
JOIN rag_documents doc ON doc.dataset_id = dataset.id
WHERE dataset.id = :dataset_id
AND doc.id = :document_id
AND {dataset_scope_clause.sql}
"""
```
重点:
- 文档详情跟随 dataset scope
- 不再自己判断角色
---
## 15. `rbacAdminServiceImpl` 接入示例
## 15.1 当前问题
当前 `_assertManagePermission``_assertPermission` 双轨并存。
还存在:
```sql
bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')) AS can_manage
```
## 15.2 推荐做法
角色管理对象本身和用户管理对象分开处理。
### 角色元数据接口
例如:
- `ListRoles`
- `CreateRole`
- `SaveRolePermissions`
主要按功能权限控制,不做 area scope。
### 用户相关接口
例如:
- `ListUsers`
- `AssignUserRoles`
- `GetUserRoles`
要做用户 area scope。
## 15.3 `ListUsers` 示例
```python
async def ListUsers(...):
identity = await self.PermissionGrantRepository.GetUserBaseIdentity(CurrentUserId)
context = ScopeContext(
user_id=CurrentUserId,
permission_key="rbac:users:read",
module="rbac_admin",
action="read",
user_area=identity.get("area"),
request_area=Area,
)
_, scope_clause = await self.ScopeFacade.Require(context)
sql = f"""
SELECT ...
FROM sso_users u
WHERE u.deleted_at IS NULL
AND {scope_clause.sql}
"""
```
`RbacAdminPolicy` 内固定映射:
- `area_field = u.area`
## 15.4 `AssignUserRoles` 示例
建议先做用户合法性校验:
```python
target_context = ScopeContext(
user_id=CurrentUserId,
permission_key="rbac:user_roles:write",
module="rbac_admin",
action="write",
user_area=identity.get("area"),
resource_id=UserId,
)
_, scope_clause = await self.ScopeFacade.Require(target_context)
```
然后先查:
```sql
SELECT u.id
FROM sso_users u
WHERE u.id = :target_user_id
AND {scope_clause.sql}
```
只有通过后才执行角色绑定。
---
## 16. 接口签名迁移建议
## 16.1 第一阶段
原则:
- 不强行修改所有抽象接口签名
- 优先兼容现有 controller
例如 RAG service 可以暂时保留:
- `UserArea`
- `UserRole`
但内部只真正使用:
- `CurrentUserId`
- `UserArea`
并逐步废弃 `UserRole`
## 16.2 第二阶段
当统一执行器稳定后,建议收敛为:
1. controller 只传 `CurrentUserId`
2. service 自己取基础身份
3. `ScopeContext` 统一构造
这一步是“收口优化”,不是第一批落地阻塞项。
---
## 17. 日志与异常建议
## 17.1 统一异常
建议新增:
```python
class PermissionScopeDeniedException(LeauditException):
...
```
用于区分:
- 功能权限拒绝
- 数据范围拒绝
- 资源不存在
## 17.2 日志建议
每次 scope 拒绝建议记录:
```python
logger.warning(
"scope denied",
extra={
"user_id": Context.user_id,
"permission_key": Context.permission_key,
"module": Context.module,
"resource_id": Context.resource_id,
"request_area": Context.request_area,
"scope_mode": decision.scope_mode,
"deny_reason": decision.deny_reason,
},
)
```
这对后续排查误拒绝和越权都很重要。
---
## 18. 单元测试骨架建议
建议至少补以下测试:
### `DataScopeResolver`
- `GRANT SELF + GRANT DEPT -> DEPT`
- `GRANT ALL + DENY exact -> denied`
- `permission_scope 覆盖 role_scope`
### `QueryScopeBuilder`
- `ALL`
- `DEPT`
- `SELF`
- `SELF + requested_user_id != scope_user_id`
### `RagPolicy`
- `rag:dataset:read -> PUBLIC_MIXED`
- `rag:dataset:update -> 保持 ALL/DEPT/SELF`
### `CrossReviewPolicy`
- `allowed -> RELATION`
---
## 19. 推荐实施顺序
代码落地建议按下面顺序推进:
1.`models / resolver / builder / policy base`
2.`DocumentPolicy`
3. 用文档模块先跑通一条完整链路
4.`RagPolicy`,清理 RAG 角色白名单
5.`GovdocPolicy`,收口 run/report/download
6.`UsageStatsPolicy`
7.`RbacAdminPolicy`
8. 最后收敛 service 签名
这样做的好处是:
- 先拿最典型模块验证设计
- 再扩到复杂特例
- 避免一开始就改满全仓库
---
## 20. 最终判断标准
这份代码骨架真正落地完成,不是看文件建没建,而是看下面几件事是否成立:
1. 文档模块不再依赖 `_getCurrentUserContext + role_key IN (...)`
2. RAG 模块不再依赖 `UserRole` 白名单
3. 公文下载/报告接口已通过主资源回溯做 scope 判断
4. RBAC 用户管理域不再通过角色名派生 `can_manage`
5. 新增一个接口时,研发可以直接按 `ScopeContext -> Require -> ScopeClause` 方式接入
如果还做不到第 5 条,说明统一执行器还没有真正成为平台能力。