"""RBAC 路由服务实现。""" from __future__ import annotations from copy import deepcopy from typing import Any from sqlalchemy import text from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.domian.vo.rbacVo import RbacRouteVO, RbacUserRoutesVO from fastapi_modules.fastapi_leaudit.services.rbacService import IRbacService class RbacServiceImpl(IRbacService): """RBAC 路由服务实现。""" _MINIMAL_VISIBLE_ROUTE_PREFIXES: tuple[str, ...] = ( "/home", "/chat-with-llm", "/files", "/documents", "/settings", "/entry-modules", "/role-permissions", "/document-types", ) _COMPAT_ROUTE_BLUEPRINTS: dict[str, list[dict[str, Any]]] = { "admin": [ { "id": 1001, "route_path": "/home", "route_name": "home", "component": "home", "parent_id": None, "route_title": "系统概览", "icon": "ri-home-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "overview"}, "children": None, }, { "id": 1002, "route_path": "/chat-with-llm", "route_name": "chat-with-llm", "component": "chat-with-llm", "parent_id": None, "route_title": "AI对话", "icon": "ri-chat-smile-2-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "assistant"}, "children": None, }, { "id": 1003, "route_path": "/files", "route_name": "file-management", "component": "files", "parent_id": None, "route_title": "文件管理", "icon": "ri-folder-line", "sort_order": 3, "is_hidden": False, "is_cache": True, "meta": {"group": "documents"}, "children": [ { "id": 1004, "route_path": "/files/upload", "route_name": "file-upload", "component": "files.upload", "parent_id": 1003, "route_title": "文件上传", "icon": "ri-upload-cloud-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "documents"}, "children": None, }, { "id": 1005, "route_path": "/documents", "route_name": "documents", "component": "documents", "parent_id": 1003, "route_title": "文档列表", "icon": "ri-file-list-3-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "documents"}, "children": None, }, ], }, { "id": 1006, "route_path": "/rules", "route_name": "rule-management", "component": "rules", "parent_id": None, "route_title": "评查规则库", "icon": "ri-book-3-line", "sort_order": 4, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": [ { "id": 1007, "route_path": "/rule-groups", "route_name": "rule-groups", "component": "rule-groups", "parent_id": 1006, "route_title": "评查点分组", "icon": "ri-folder-open-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": None, }, { "id": 1008, "route_path": "/rules/list", "route_name": "rules-list", "component": "rules.list", "parent_id": 1006, "route_title": "评查点列表", "icon": "ri-list-check-3", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": None, }, { "id": 1009, "route_path": "/rules-files", "route_name": "rules-file", "component": "rules-files", "parent_id": 1006, "route_title": "评查文件列表", "icon": "ri-list-check-2", "sort_order": 3, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": None, }, ], }, { "id": 1010, "route_path": "/contract-template", "route_name": "contract-template", "component": "contract-template", "parent_id": None, "route_title": "合同模板", "icon": "ri-file-search-line", "sort_order": 5, "is_hidden": False, "is_cache": True, "meta": {"group": "contract"}, "children": [ { "id": 1011, "route_path": "/contract-template/search", "route_name": "contract-search-ai", "component": "contract-template.search", "parent_id": 1010, "route_title": "智能搜索", "icon": "ri-search-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "contract"}, "children": None, }, { "id": 1012, "route_path": "/contract-template/list", "route_name": "contract-list", "component": "contract-template.list", "parent_id": 1010, "route_title": "合同列表", "icon": "ri-folder-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "contract"}, "children": None, }, ], }, { "id": 1013, "route_path": "/settings", "route_name": "system-settings", "component": "settings", "parent_id": None, "route_title": "系统设置", "icon": "ri-settings-4-line", "sort_order": 6, "is_hidden": False, "is_cache": True, "meta": {"group": "settings"}, "children": [ { "id": 1014, "route_path": "/entry-modules", "route_name": "entry-modules", "component": "entry-modules", "parent_id": 1013, "route_title": "入口模块管理", "icon": "ri-apps-2-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "settings"}, "children": None, }, { "id": 1015, "route_path": "/role-permissions", "route_name": "role-permissions", "component": "role-permissions", "parent_id": 1013, "route_title": "角色权限管理", "icon": "ri-shield-user-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "settings"}, "children": None, }, ], }, { "id": 1018, "route_path": "/cross-checking", "route_name": "cross-checking", "component": "cross-checking", "parent_id": None, "route_title": "交叉评查", "icon": "ri-color-filter-line", "sort_order": 7, "is_hidden": False, "is_cache": True, "meta": {"group": "cross-review"}, "children": [ { "id": 1019, "route_path": "/cross-checking/upload", "route_name": "cross-checking-upload", "component": "cross-checking.upload", "parent_id": 1018, "route_title": "创建任务", "icon": "ri-upload-cloud-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "cross-review"}, "children": None, }, { "id": 1020, "route_path": "/cross-checking/result", "route_name": "cross-checking-result", "component": "cross-checking.result", "parent_id": 1018, "route_title": "评查结果", "icon": "ri-file-list-3-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "cross-review"}, "children": None, }, ], }, ], "common": [ { "id": 2001, "route_path": "/home", "route_name": "home", "component": "home", "parent_id": None, "route_title": "系统概览", "icon": "ri-home-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "overview"}, "children": None, }, { "id": 2002, "route_path": "/files", "route_name": "file-management", "component": "files", "parent_id": None, "route_title": "文件管理", "icon": "ri-folder-line", "sort_order": 3, "is_hidden": False, "is_cache": True, "meta": {"group": "documents"}, "children": [ { "id": 2003, "route_path": "/files/upload", "route_name": "file-upload", "component": "files.upload", "parent_id": 2002, "route_title": "文件上传", "icon": "ri-upload-cloud-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "documents"}, "children": None, }, { "id": 2004, "route_path": "/documents", "route_name": "documents", "component": "documents", "parent_id": 2002, "route_title": "文档列表", "icon": "ri-file-list-3-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "documents"}, "children": None, }, ], }, { "id": 2005, "route_path": "/rules", "route_name": "rule-management", "component": "rules", "parent_id": None, "route_title": "评查规则库", "icon": "ri-book-3-line", "sort_order": 4, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": [ { "id": 2006, "route_path": "/rule-groups", "route_name": "rule-groups", "component": "rule-groups", "parent_id": 2005, "route_title": "评查点分组", "icon": "ri-folder-open-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": None, }, { "id": 2007, "route_path": "/rules/list", "route_name": "rules-list", "component": "rules.list", "parent_id": 2005, "route_title": "评查点列表", "icon": "ri-list-check-3", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": None, }, { "id": 2008, "route_path": "/rules-files", "route_name": "rules-file", "component": "rules-files", "parent_id": 2005, "route_title": "评查文件列表", "icon": "ri-list-check-2", "sort_order": 3, "is_hidden": False, "is_cache": True, "meta": {"group": "rules"}, "children": None, }, ], }, { "id": 2009, "route_path": "/contract-template", "route_name": "contract-template", "component": "contract-template", "parent_id": None, "route_title": "合同模板", "icon": "ri-file-search-line", "sort_order": 5, "is_hidden": False, "is_cache": True, "meta": {"group": "contract"}, "children": [ { "id": 2010, "route_path": "/contract-template/search", "route_name": "contract-search-ai", "component": "contract-template.search", "parent_id": 2009, "route_title": "智能搜索", "icon": "ri-search-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "contract"}, "children": None, }, { "id": 2011, "route_path": "/contract-template/list", "route_name": "contract-list", "component": "contract-template.list", "parent_id": 2009, "route_title": "合同列表", "icon": "ri-folder-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "contract"}, "children": None, }, ], }, { "id": 2012, "route_path": "/settings", "route_name": "system-settings", "component": "settings", "parent_id": None, "route_title": "系统设置", "icon": "ri-settings-4-line", "sort_order": 6, "is_hidden": False, "is_cache": True, "meta": {"group": "settings"}, "children": [ { "id": 2013, "route_path": "/entry-modules", "route_name": "entry-modules", "component": "entry-modules", "parent_id": 2012, "route_title": "入口模块管理", "icon": "ri-apps-2-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "settings"}, "children": None, }, { "id": 2014, "route_path": "/role-permissions", "route_name": "role-permissions", "component": "role-permissions", "parent_id": 2012, "route_title": "角色权限管理", "icon": "ri-shield-user-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "settings"}, "children": None, }, ], }, { "id": 2015, "route_path": "/cross-checking", "route_name": "cross-checking", "component": "cross-checking", "parent_id": None, "route_title": "交叉评查", "icon": "ri-color-filter-line", "sort_order": 7, "is_hidden": False, "is_cache": True, "meta": {"group": "cross-review"}, "children": [ { "id": 2016, "route_path": "/cross-checking/upload", "route_name": "cross-checking-upload", "component": "cross-checking.upload", "parent_id": 2012, "route_title": "创建任务", "icon": "ri-upload-cloud-line", "sort_order": 1, "is_hidden": False, "is_cache": True, "meta": {"group": "cross-review"}, "children": None, }, { "id": 2017, "route_path": "/cross-checking/result", "route_name": "cross-checking-result", "component": "cross-checking.result", "parent_id": 2012, "route_title": "评查结果", "icon": "ri-file-list-3-line", "sort_order": 2, "is_hidden": False, "is_cache": True, "meta": {"group": "cross-review"}, "children": None, }, ], }, ], } _PERMISSION_PREFIXES_BY_PATH: dict[str, list[str]] = { "/files": ["documents:"], "/files/upload": ["documents:upload:"], "/documents": ["documents:"], "/settings": ["entry_module:", "rbac:", "doc_type:"], "/entry-modules": ["entry_module:"], "/role-permissions": ["rbac:"], "/document-types": ["doc_type:"], "/rules": ["rules:"], "/rule-groups": ["rules:"], "/rules/list": ["rules:"], "/rules-files": ["rules:"], } async def GetCurrentUserRoutes(self, UserId: int) -> RbacUserRoutesVO: """获取当前登录用户可访问的前端路由树。""" async with GetAsyncSession() as Session: userRow = ( await Session.execute( text( """ SELECT id, username FROM sso_users WHERE id = :user_id AND deleted_at IS NULL AND status = 0 """ ), {"user_id": UserId}, ) ).mappings().first() if not userRow: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用") roleRows = ( await Session.execute( text( """ SELECT r.id, r.role_key, COALESCE(r.priority, 0) AS priority FROM user_role ur JOIN roles r ON ur.role_id = r.id WHERE ur.user_id = :user_id ORDER BY COALESCE(r.priority, 0) DESC, r.id ASC """ ), {"user_id": UserId}, ) ).mappings().all() roleIds = [int(row["id"]) for row in roleRows] roleKeys = [str(row["role_key"]) for row in roleRows] or ["common"] grantedPermissions = await self._loadGrantedPermissionKeys(Session, roleIds) databaseRoutes = await self._loadDatabaseRoutes(Session, roleIds, grantedPermissions) if self._isFrontendRouteSetReady(databaseRoutes): routes = self._filterRoutesByMinimalScope(databaseRoutes) else: routes = self._buildCompatibilityRoutes(roleKeys, grantedPermissions) return RbacUserRoutesVO( user_id=int(userRow["id"]), username=str(userRow["username"] or ""), roles=roleKeys, routes=routes, ) async def _loadGrantedPermissionKeys(self, Session, RoleIds: list[int]) -> set[str]: """加载当前角色集合最终授予的权限键。""" if not RoleIds: return set() rows = ( await Session.execute( text( """ SELECT p.permission_key, rp.grant_type FROM role_permissions rp JOIN permissions p ON p.id = rp.permission_id WHERE rp.role_id = ANY(:role_ids) """ ).bindparams(role_ids=RoleIds), ) ).fetchall() grants: set[str] = set() denies: set[str] = set() for permissionKey, grantType in rows: if not permissionKey: continue if grantType == "DENY": denies.add(str(permissionKey)) else: grants.add(str(permissionKey)) return grants - denies async def _loadDatabaseRoutes(self, Session, RoleIds: list[int], GrantedPermissions: set[str]) -> list[RbacRouteVO]: """按当前 role_route + sys_routes 生成路由树。""" if not RoleIds: return [] routeRows = ( await Session.execute( text( """ SELECT sr.id, sr.route_path, sr.route_name, sr.component, sr.parent_id, sr.route_title, sr.icon, sr.sort_order, sr.is_hidden, sr.is_cache, sr.meta FROM role_route rr JOIN sys_routes sr ON sr.id = rr.route_id WHERE rr.role_id = ANY(:role_ids) AND rr.status = 1 AND sr.status = 0 AND sr.deleted_at IS NULL ORDER BY sr.sort_order ASC, sr.id ASC """ ).bindparams(role_ids=RoleIds), ) ).mappings().all() routeMap: dict[int, dict[str, Any]] = {} for row in routeRows: routeId = int(row["id"]) routeMap[routeId] = { "id": routeId, "route_path": str(row["route_path"] or ""), "route_name": str(row["route_name"] or f"route-{routeId}"), "component": row["component"], "parent_id": int(row["parent_id"]) if row["parent_id"] is not None else None, "route_title": str(row["route_title"] or row["route_name"] or ""), "icon": row["icon"], "sort_order": int(row["sort_order"] or 0), "is_hidden": bool(row["is_hidden"]), "is_cache": bool(row["is_cache"]), "meta": self._normalizeMeta(row["meta"]), "permissions": self._resolvePermissionsForPath(str(row["route_path"] or ""), GrantedPermissions), "children": [], } rootRoutes: list[dict[str, Any]] = [] for route in routeMap.values(): parentId = route["parent_id"] if parentId is None or parentId not in routeMap: rootRoutes.append(route) else: routeMap[parentId]["children"].append(route) return self._dictRoutesToVo(rootRoutes) def _isFrontendRouteSetReady(self, Routes: list[RbacRouteVO]) -> bool: """判断数据库路由是否已经切换到当前前端真实路径集合。""" frontendPaths = self._collectRoutePaths(Routes) expected = { "/files", "/settings", "/chat-with-llm", "/contract-template", "/cross-checking", } return any(path in frontendPaths for path in expected) def _buildCompatibilityRoutes(self, RoleKeys: list[str], GrantedPermissions: set[str]) -> list[RbacRouteVO]: """当数据库路由尚未迁移到新前端路径时,返回兼容菜单树。""" roleBucket = "admin" if any(role in {"super_admin", "provincial_admin", "admin"} for role in RoleKeys) else "common" blueprints = deepcopy(self._COMPAT_ROUTE_BLUEPRINTS[roleBucket]) self._attachPermissionsRecursively(blueprints, GrantedPermissions) return self._dictRoutesToVo(self._filterBlueprintsByMinimalScope(blueprints)) def _filterRoutesByMinimalScope(self, Routes: list[RbacRouteVO]) -> list[RbacRouteVO]: """按当前最小可用范围裁剪路由树。""" filtered: list[RbacRouteVO] = [] for route in Routes: if not self._isRoutePathEnabled(route.route_path): continue routeCopy = route.model_copy(deep=True) routeCopy.children = self._filterRoutesByMinimalScope(route.children or []) or None filtered.append(routeCopy) return filtered def _filterBlueprintsByMinimalScope(self, Blueprints: list[dict[str, Any]]) -> list[dict[str, Any]]: """按当前最小可用范围裁剪兼容蓝图。""" filtered: list[dict[str, Any]] = [] for blueprint in Blueprints: routePath = str(blueprint.get("route_path") or "") if not self._isRoutePathEnabled(routePath): continue copied = deepcopy(blueprint) children = copied.get("children") if isinstance(children, list): copied["children"] = self._filterBlueprintsByMinimalScope(children) or None filtered.append(copied) return filtered def _isRoutePathEnabled(self, RoutePath: str | None) -> bool: """判断当前阶段是否允许暴露该前端路径。""" if not RoutePath: return False return any( RoutePath == prefix or RoutePath.startswith(f"{prefix}/") for prefix in self._MINIMAL_VISIBLE_ROUTE_PREFIXES ) def _attachPermissionsRecursively(self, Routes: list[dict[str, Any]], GrantedPermissions: set[str]) -> None: """递归挂接当前用户在各前端路径上的权限键。""" for route in Routes: routePath = str(route.get("route_path") or "") route["permissions"] = self._resolvePermissionsForPath(routePath, GrantedPermissions) children = route.get("children") if isinstance(children, list) and children: self._attachPermissionsRecursively(children, GrantedPermissions) def _resolvePermissionsForPath(self, RoutePath: str, GrantedPermissions: set[str]) -> list[str]: """按当前前端路径聚合对应权限键。""" prefixes = self._PERMISSION_PREFIXES_BY_PATH.get(RoutePath, []) if not prefixes: return [] matched = sorted( permissionKey for permissionKey in GrantedPermissions if any(permissionKey.startswith(prefix) for prefix in prefixes) ) return matched def _dictRoutesToVo(self, Routes: list[dict[str, Any]]) -> list[RbacRouteVO]: """把字典路由树转换成 VO。""" ordered = sorted(Routes, key=lambda item: (int(item.get("sort_order") or 0), int(item.get("id") or 0))) output: list[RbacRouteVO] = [] for route in ordered: children = route.get("children") childVos = self._dictRoutesToVo(children) if isinstance(children, list) and children else None output.append( RbacRouteVO( id=int(route["id"]), route_path=str(route["route_path"]), route_name=str(route["route_name"]), component=route.get("component"), parent_id=int(route["parent_id"]) if route.get("parent_id") is not None else None, route_title=str(route["route_title"]), icon=route.get("icon"), sort_order=int(route.get("sort_order") or 0), is_hidden=bool(route.get("is_hidden", False)), is_cache=bool(route.get("is_cache", False)), meta=self._normalizeMeta(route.get("meta")), permissions=list(route.get("permissions") or []), children=childVos, ) ) return output def _collectRoutePaths(self, Routes: list[RbacRouteVO]) -> set[str]: """递归收集路由树里的全部路径。""" paths: set[str] = set() for route in Routes: paths.add(route.route_path) if route.children: paths.update(self._collectRoutePaths(route.children)) return paths @staticmethod def _normalizeMeta(Meta: Any) -> dict | None: """兼容 meta 为 JSON 字符串、字典或空值的情况。""" if Meta is None: return None if isinstance(Meta, dict): return Meta return None