Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/rbacServiceImpl.py
T

851 lines
34 KiB
Python

"""RBAC 路由服务实现。"""
from __future__ import annotations
from copy import deepcopy
from typing import Any
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.domian.vo.rbacVo import RbacRouteVO, RbacUserRoutesVO
from fastapi_modules.fastapi_leaudit.services.rbacService import IRbacService
class RbacServiceImpl(IRbacService):
"""RBAC 路由服务实现。"""
_MINIMAL_VISIBLE_ROUTE_PREFIXES: tuple[str, ...] = (
"/home",
"/chat-with-llm",
"/contract-template",
"/cross-checking",
"/files",
"/documents",
"/rules",
"/rule-groups",
"/rules-files",
"/settings",
"/entry-modules",
"/role-permissions",
"/document-types",
"/usage-stats",
)
_COMPAT_ROUTE_BLUEPRINTS: dict[str, list[dict[str, Any]]] = {
"admin": [
{
"id": 1001,
"route_path": "/home",
"route_name": "home",
"component": "home",
"parent_id": None,
"route_title": "系统概览",
"icon": "ri-home-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "overview"},
"children": None,
},
{
"id": 1002,
"route_path": "/chat-with-llm",
"route_name": "chat-with-llm",
"component": "chat-with-llm",
"parent_id": None,
"route_title": "AI对话",
"icon": "ri-chat-smile-2-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "assistant"},
"children": None,
},
{
"id": 1003,
"route_path": "/files",
"route_name": "file-management",
"component": "files",
"parent_id": None,
"route_title": "文件管理",
"icon": "ri-folder-line",
"sort_order": 3,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "documents"},
"children": [
{
"id": 1004,
"route_path": "/files/upload",
"route_name": "file-upload",
"component": "files.upload",
"parent_id": 1003,
"route_title": "文件上传",
"icon": "ri-upload-cloud-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "documents"},
"children": None,
},
{
"id": 1005,
"route_path": "/documents",
"route_name": "documents",
"component": "documents",
"parent_id": 1003,
"route_title": "文档列表",
"icon": "ri-file-list-3-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "documents"},
"children": None,
},
],
},
{
"id": 1006,
"route_path": "/rules",
"route_name": "rule-management",
"component": "rules",
"parent_id": None,
"route_title": "规则管理",
"icon": "ri-book-3-line",
"sort_order": 4,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": [
{
"id": 1007,
"route_path": "/rule-groups",
"route_name": "rule-groups",
"component": "rule-groups",
"parent_id": 1006,
"route_title": "评查点分组",
"icon": "ri-folder-open-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": None,
},
{
"id": 1008,
"route_path": "/rules/list",
"route_name": "rules-list",
"component": "rules.list",
"parent_id": 1006,
"route_title": "评查点列表",
"icon": "ri-list-check-3",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": None,
},
{
"id": 1009,
"route_path": "/rules-files",
"route_name": "rules-file",
"component": "rules-files",
"parent_id": 1006,
"route_title": "评查文件列表",
"icon": "ri-list-check-2",
"sort_order": 3,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": None,
},
],
},
{
"id": 1010,
"route_path": "/contract-template",
"route_name": "contract-template",
"component": "contract-template",
"parent_id": None,
"route_title": "合同管理",
"icon": "ri-file-search-line",
"sort_order": 5,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "contract"},
"children": [
{
"id": 1011,
"route_path": "/contract-template/search",
"route_name": "contract-search-ai",
"component": "contract-template.search",
"parent_id": 1010,
"route_title": "模板搜索",
"icon": "ri-search-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "contract"},
"children": None,
},
{
"id": 1012,
"route_path": "/contract-template/list",
"route_name": "contract-list",
"component": "contract-template.list",
"parent_id": 1010,
"route_title": "模板列表",
"icon": "ri-folder-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "contract"},
"children": None,
},
],
},
{
"id": 1013,
"route_path": "/settings",
"route_name": "system-settings",
"component": "settings",
"parent_id": None,
"route_title": "系统设置",
"icon": "ri-settings-4-line",
"sort_order": 6,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": [
{
"id": 1014,
"route_path": "/entry-modules",
"route_name": "entry-modules",
"component": "entry-modules",
"parent_id": 1013,
"route_title": "入口模块管理",
"icon": "ri-apps-2-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": None,
},
{
"id": 1015,
"route_path": "/role-permissions",
"route_name": "role-permissions",
"component": "role-permissions",
"parent_id": 1013,
"route_title": "角色权限管理",
"icon": "ri-shield-user-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": None,
},
{
"id": 1016,
"route_path": "/document-types",
"route_name": "document-types",
"component": "document-types",
"parent_id": 1013,
"route_title": "文档类型管理",
"icon": "ri-file-list-3-line",
"sort_order": 3,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": None,
},
{
"id": 1017,
"route_path": "/usage-stats",
"route_name": "usage-stats",
"component": "usage-stats",
"parent_id": 1013,
"route_title": "系统使用统计",
"icon": "ri-bar-chart-box-line",
"sort_order": 4,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": None,
},
],
},
{
"id": 1018,
"route_path": "/cross-checking",
"route_name": "cross-checking",
"component": "cross-checking",
"parent_id": None,
"route_title": "交叉评查",
"icon": "ri-color-filter-line",
"sort_order": 7,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "cross-review"},
"children": [
{
"id": 1019,
"route_path": "/cross-checking/upload",
"route_name": "cross-checking-upload",
"component": "cross-checking.upload",
"parent_id": 1018,
"route_title": "创建任务",
"icon": "ri-upload-cloud-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "cross-review"},
"children": None,
},
{
"id": 1020,
"route_path": "/cross-checking/result",
"route_name": "cross-checking-result",
"component": "cross-checking.result",
"parent_id": 1018,
"route_title": "评查任务列表",
"icon": "ri-file-list-3-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "cross-review"},
"children": None,
},
],
},
],
"common": [
{
"id": 2001,
"route_path": "/home",
"route_name": "home",
"component": "home",
"parent_id": None,
"route_title": "系统概览",
"icon": "ri-home-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "overview"},
"children": None,
},
{
"id": 2002,
"route_path": "/files",
"route_name": "file-management",
"component": "files",
"parent_id": None,
"route_title": "文件管理",
"icon": "ri-folder-line",
"sort_order": 3,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "documents"},
"children": [
{
"id": 2003,
"route_path": "/files/upload",
"route_name": "file-upload",
"component": "files.upload",
"parent_id": 2002,
"route_title": "文件上传",
"icon": "ri-upload-cloud-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "documents"},
"children": None,
},
{
"id": 2004,
"route_path": "/documents",
"route_name": "documents",
"component": "documents",
"parent_id": 2002,
"route_title": "文档列表",
"icon": "ri-file-list-3-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "documents"},
"children": None,
},
],
},
{
"id": 2005,
"route_path": "/rules",
"route_name": "rule-management",
"component": "rules",
"parent_id": None,
"route_title": "规则管理",
"icon": "ri-book-3-line",
"sort_order": 4,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": [
{
"id": 2006,
"route_path": "/rule-groups",
"route_name": "rule-groups",
"component": "rule-groups",
"parent_id": 2005,
"route_title": "评查点分组",
"icon": "ri-folder-open-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": None,
},
{
"id": 2007,
"route_path": "/rules/list",
"route_name": "rules-list",
"component": "rules.list",
"parent_id": 2005,
"route_title": "评查点列表",
"icon": "ri-list-check-3",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": None,
},
{
"id": 2008,
"route_path": "/rules-files",
"route_name": "rules-file",
"component": "rules-files",
"parent_id": 2005,
"route_title": "评查文件列表",
"icon": "ri-list-check-2",
"sort_order": 3,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "rules"},
"children": None,
},
],
},
{
"id": 2009,
"route_path": "/contract-template",
"route_name": "contract-template",
"component": "contract-template",
"parent_id": None,
"route_title": "合同管理",
"icon": "ri-file-search-line",
"sort_order": 5,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "contract"},
"children": [
{
"id": 2010,
"route_path": "/contract-template/search",
"route_name": "contract-search-ai",
"component": "contract-template.search",
"parent_id": 2009,
"route_title": "模板搜索",
"icon": "ri-search-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "contract"},
"children": None,
},
{
"id": 2011,
"route_path": "/contract-template/list",
"route_name": "contract-list",
"component": "contract-template.list",
"parent_id": 2009,
"route_title": "模板列表",
"icon": "ri-folder-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "contract"},
"children": None,
},
],
},
{
"id": 2012,
"route_path": "/settings",
"route_name": "system-settings",
"component": "settings",
"parent_id": None,
"route_title": "系统设置",
"icon": "ri-settings-4-line",
"sort_order": 6,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": [
{
"id": 2013,
"route_path": "/entry-modules",
"route_name": "entry-modules",
"component": "entry-modules",
"parent_id": 2012,
"route_title": "入口模块管理",
"icon": "ri-apps-2-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": None,
},
{
"id": 2014,
"route_path": "/role-permissions",
"route_name": "role-permissions",
"component": "role-permissions",
"parent_id": 2012,
"route_title": "角色权限管理",
"icon": "ri-shield-user-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "settings"},
"children": None,
},
],
},
{
"id": 2015,
"route_path": "/cross-checking",
"route_name": "cross-checking",
"component": "cross-checking",
"parent_id": None,
"route_title": "交叉评查",
"icon": "ri-color-filter-line",
"sort_order": 7,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "cross-review"},
"children": [
{
"id": 2016,
"route_path": "/cross-checking/upload",
"route_name": "cross-checking-upload",
"component": "cross-checking.upload",
"parent_id": 2012,
"route_title": "创建任务",
"icon": "ri-upload-cloud-line",
"sort_order": 1,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "cross-review"},
"children": None,
},
{
"id": 2017,
"route_path": "/cross-checking/result",
"route_name": "cross-checking-result",
"component": "cross-checking.result",
"parent_id": 2012,
"route_title": "评查任务列表",
"icon": "ri-file-list-3-line",
"sort_order": 2,
"is_hidden": False,
"is_cache": True,
"meta": {"group": "cross-review"},
"children": None,
},
],
},
],
}
_PERMISSION_PREFIXES_BY_PATH: dict[str, list[str]] = {
"/files": ["documents:"],
"/files/upload": ["documents:upload:"],
"/documents": ["documents:"],
"/settings": ["entry_module:", "rbac:", "doc_type:"],
"/entry-modules": ["entry_module:"],
"/role-permissions": ["rbac:"],
"/document-types": ["doc_type:"],
"/usage-stats": ["usage_stats:"],
"/rules": ["rules:", "evaluation_point:"],
"/rule-groups": ["evaluation_group:", "rules:"],
"/rules/list": ["rules:", "evaluation_point:"],
"/rules-files": ["rules:"],
}
async def GetCurrentUserRoutes(self, UserId: int) -> RbacUserRoutesVO:
"""获取当前登录用户可访问的前端路由树。"""
async with GetAsyncSession() as Session:
userRow = (
await Session.execute(
text(
"""
SELECT id, username
FROM sso_users
WHERE id = :user_id
AND deleted_at IS NULL
AND status = 0
"""
),
{"user_id": UserId},
)
).mappings().first()
if not userRow:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用")
roleRows = (
await Session.execute(
text(
"""
SELECT r.id, r.role_key, COALESCE(r.priority, 0) AS priority
FROM user_role ur
JOIN roles r ON ur.role_id = r.id
WHERE ur.user_id = :user_id
ORDER BY COALESCE(r.priority, 0) DESC, r.id ASC
"""
),
{"user_id": UserId},
)
).mappings().all()
roleIds = [int(row["id"]) for row in roleRows]
roleKeys = [str(row["role_key"]) for row in roleRows] or ["common"]
grantedPermissions = await self._loadGrantedPermissionKeys(Session, roleIds)
databaseRoutes = await self._loadDatabaseRoutes(Session, roleIds, grantedPermissions)
if self._isFrontendRouteSetReady(databaseRoutes):
routes = self._filterRoutesByMinimalScope(databaseRoutes)
else:
routes = self._buildCompatibilityRoutes(roleKeys, grantedPermissions)
return RbacUserRoutesVO(
user_id=int(userRow["id"]),
username=str(userRow["username"] or ""),
roles=roleKeys,
routes=routes,
)
async def _loadGrantedPermissionKeys(self, Session, RoleIds: list[int]) -> set[str]:
"""加载当前角色集合最终授予的权限键。"""
if not RoleIds:
return set()
rows = (
await Session.execute(
text(
"""
SELECT p.permission_key, rp.grant_type
FROM role_permissions rp
JOIN permissions p ON p.id = rp.permission_id
WHERE rp.role_id = ANY(:role_ids)
"""
).bindparams(role_ids=RoleIds),
)
).fetchall()
grants: set[str] = set()
denies: set[str] = set()
for permissionKey, grantType in rows:
if not permissionKey:
continue
if grantType == "DENY":
denies.add(str(permissionKey))
else:
grants.add(str(permissionKey))
return grants - denies
async def _loadDatabaseRoutes(self, Session, RoleIds: list[int], GrantedPermissions: set[str]) -> list[RbacRouteVO]:
"""按当前 role_route + sys_routes 生成路由树。"""
if not RoleIds:
return []
routeRows = (
await Session.execute(
text(
"""
SELECT
sr.id,
sr.route_path,
sr.route_name,
sr.component,
sr.parent_id,
sr.route_title,
sr.icon,
sr.sort_order,
sr.is_hidden,
sr.is_cache,
sr.meta
FROM role_route rr
JOIN sys_routes sr ON sr.id = rr.route_id
WHERE rr.role_id = ANY(:role_ids)
AND rr.status = 1
AND sr.status = 0
AND sr.deleted_at IS NULL
ORDER BY sr.sort_order ASC, sr.id ASC
"""
).bindparams(role_ids=RoleIds),
)
).mappings().all()
routeMap: dict[int, dict[str, Any]] = {}
for row in routeRows:
routeId = int(row["id"])
routeMap[routeId] = {
"id": routeId,
"route_path": str(row["route_path"] or ""),
"route_name": str(row["route_name"] or f"route-{routeId}"),
"component": row["component"],
"parent_id": int(row["parent_id"]) if row["parent_id"] is not None else None,
"route_title": str(row["route_title"] or row["route_name"] or ""),
"icon": row["icon"],
"sort_order": int(row["sort_order"] or 0),
"is_hidden": bool(row["is_hidden"]),
"is_cache": bool(row["is_cache"]),
"meta": self._normalizeMeta(row["meta"]),
"permissions": self._resolvePermissionsForPath(str(row["route_path"] or ""), GrantedPermissions),
"children": [],
}
rootRoutes: list[dict[str, Any]] = []
for route in routeMap.values():
parentId = route["parent_id"]
if parentId is None or parentId not in routeMap:
rootRoutes.append(route)
else:
routeMap[parentId]["children"].append(route)
return self._dictRoutesToVo(rootRoutes)
def _isFrontendRouteSetReady(self, Routes: list[RbacRouteVO]) -> bool:
"""判断数据库路由是否已经切换到当前前端真实路径集合。"""
frontendPaths = self._collectRoutePaths(Routes)
expected = {
"/files",
"/settings",
"/chat-with-llm",
"/contract-template",
"/cross-checking",
}
return any(path in frontendPaths for path in expected)
def _buildCompatibilityRoutes(self, RoleKeys: list[str], GrantedPermissions: set[str]) -> list[RbacRouteVO]:
"""当数据库路由尚未迁移到新前端路径时,返回兼容菜单树。"""
roleBucket = "admin" if any(role in {"super_admin", "provincial_admin", "admin"} for role in RoleKeys) else "common"
blueprints = deepcopy(self._COMPAT_ROUTE_BLUEPRINTS[roleBucket])
self._attachPermissionsRecursively(blueprints, GrantedPermissions)
return self._dictRoutesToVo(self._filterBlueprintsByMinimalScope(blueprints))
def _filterRoutesByMinimalScope(self, Routes: list[RbacRouteVO]) -> list[RbacRouteVO]:
"""按当前最小可用范围裁剪路由树。"""
filtered: list[RbacRouteVO] = []
for route in Routes:
if not self._isRoutePathEnabled(route.route_path):
continue
routeCopy = route.model_copy(deep=True)
routeCopy.children = self._filterRoutesByMinimalScope(route.children or []) or None
filtered.append(routeCopy)
return filtered
def _filterBlueprintsByMinimalScope(self, Blueprints: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""按当前最小可用范围裁剪兼容蓝图。"""
filtered: list[dict[str, Any]] = []
for blueprint in Blueprints:
routePath = str(blueprint.get("route_path") or "")
if not self._isRoutePathEnabled(routePath):
continue
copied = deepcopy(blueprint)
children = copied.get("children")
if isinstance(children, list):
copied["children"] = self._filterBlueprintsByMinimalScope(children) or None
filtered.append(copied)
return filtered
def _isRoutePathEnabled(self, RoutePath: str | None) -> bool:
"""判断当前阶段是否允许暴露该前端路径。"""
if not RoutePath:
return False
return any(
RoutePath == prefix or RoutePath.startswith(f"{prefix}/")
for prefix in self._MINIMAL_VISIBLE_ROUTE_PREFIXES
)
def _attachPermissionsRecursively(self, Routes: list[dict[str, Any]], GrantedPermissions: set[str]) -> None:
"""递归挂接当前用户在各前端路径上的权限键。"""
for route in Routes:
routePath = str(route.get("route_path") or "")
route["permissions"] = self._resolvePermissionsForPath(routePath, GrantedPermissions)
children = route.get("children")
if isinstance(children, list) and children:
self._attachPermissionsRecursively(children, GrantedPermissions)
def _resolvePermissionsForPath(self, RoutePath: str, GrantedPermissions: set[str]) -> list[str]:
"""按当前前端路径聚合对应权限键。"""
prefixes = self._PERMISSION_PREFIXES_BY_PATH.get(RoutePath, [])
if not prefixes:
return []
matched = sorted(
permissionKey
for permissionKey in GrantedPermissions
if any(permissionKey.startswith(prefix) for prefix in prefixes)
)
return matched
def _dictRoutesToVo(self, Routes: list[dict[str, Any]]) -> list[RbacRouteVO]:
"""把字典路由树转换成 VO。"""
ordered = sorted(Routes, key=lambda item: (int(item.get("sort_order") or 0), int(item.get("id") or 0)))
output: list[RbacRouteVO] = []
for route in ordered:
children = route.get("children")
childVos = self._dictRoutesToVo(children) if isinstance(children, list) and children else None
output.append(
RbacRouteVO(
id=int(route["id"]),
route_path=str(route["route_path"]),
route_name=str(route["route_name"]),
component=route.get("component"),
parent_id=int(route["parent_id"]) if route.get("parent_id") is not None else None,
route_title=str(route["route_title"]),
icon=route.get("icon"),
sort_order=int(route.get("sort_order") or 0),
is_hidden=bool(route.get("is_hidden", False)),
is_cache=bool(route.get("is_cache", False)),
meta=self._normalizeMeta(route.get("meta")),
permissions=list(route.get("permissions") or []),
children=childVos,
)
)
return output
def _collectRoutePaths(self, Routes: list[RbacRouteVO]) -> set[str]:
"""递归收集路由树里的全部路径。"""
paths: set[str] = set()
for route in Routes:
paths.add(route.route_path)
if route.children:
paths.update(self._collectRoutePaths(route.children))
return paths
@staticmethod
def _normalizeMeta(Meta: Any) -> dict | None:
"""兼容 meta 为 JSON 字符串、字典或空值的情况。"""
if Meta is None:
return None
if isinstance(Meta, dict):
return Meta
return None