"""入口模块管理服务实现。""" from __future__ import annotations from datetime import datetime from pathlib import Path from sqlalchemy import text from fastapi_admin.config import OSS_BASE_URL, OSS_BUCKET from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.domian.Dto.entryModuleDto import EntryModuleCreateDTO, EntryModuleUpdateDTO from fastapi_modules.fastapi_leaudit.domian.vo.entryModuleAdminVo import ( EntryModuleAreaVO, EntryModuleImageUploadVO, EntryModuleListVO, EntryModuleVO, ) from fastapi_modules.fastapi_leaudit.services.entryModuleAdminService import IEntryModuleAdminService from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl class EntryModuleAdminServiceImpl(IEntryModuleAdminService): """入口模块管理服务实现。""" def __init__(self) -> None: self.OssService = OssServiceImpl() async def ListModules(self, Name: str | None, Area: str | None, Page: int, PageSize: int) -> EntryModuleListVO: """分页查询入口模块。""" offset = max(Page - 1, 0) * PageSize filters = ["deleted_at IS NULL"] params: dict[str, object] = {"limit": PageSize, "offset": offset} if Name: filters.append("name ILIKE :name") params["name"] = f"%{Name.strip()}%" if Area: filters.append( "EXISTS (SELECT 1 FROM jsonb_array_elements(COALESCE(areas, '[]'::jsonb)) AS area_item WHERE area_item->>'area' = :area)" ) params["area"] = Area.strip() whereClause = " AND ".join(filters) async with GetAsyncSession() as Session: total = int( ( await Session.execute( text(f"SELECT COUNT(*) FROM leaudit_entry_modules WHERE {whereClause}"), params, ) ).scalar_one() ) rows = ( await Session.execute( text( f""" SELECT id, name, description, path, icon_path, areas, sort_order, is_enabled, created_at, updated_at FROM leaudit_entry_modules WHERE {whereClause} ORDER BY sort_order ASC, id ASC LIMIT :limit OFFSET :offset """ ), params, ) ).mappings().all() return EntryModuleListVO( total=total, page=Page, page_size=PageSize, items=[self._toModuleVo(row) for row in rows], ) async def GetModule(self, ModuleId: int) -> EntryModuleVO: """获取入口模块详情。""" row = await self._getModuleRow(ModuleId) return self._toModuleVo(row) async def CreateModule(self, Body: EntryModuleCreateDTO) -> EntryModuleVO: """创建入口模块。""" async with GetAsyncSession() as Session: try: row = ( await Session.execute( text( """ INSERT INTO leaudit_entry_modules ( name, description, path, icon_path, areas, sort_order, is_enabled, created_at, updated_at, deleted_at ) VALUES ( :name, :description, :route_path, :icon_path, CAST(:areas AS jsonb), :sort_order, TRUE, NOW(), NOW(), NULL ) RETURNING id, name, description, path, icon_path, areas, sort_order, is_enabled, created_at, updated_at """ ), { "name": Body.name.strip(), "description": (Body.description or "").strip() or None, "route_path": (Body.path or "").strip() or None, "icon_path": None, "areas": self._areasJson(Body.areas), "sort_order": await self._nextSortOrder(Session), }, ) ).mappings().one() await Session.commit() except Exception as exc: await Session.rollback() raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, f"创建入口模块失败: {exc}") from exc return self._toModuleVo(row) async def UpdateModule(self, ModuleId: int, Body: EntryModuleUpdateDTO) -> EntryModuleVO: """更新入口模块。""" current = await self._getModuleRow(ModuleId) async with GetAsyncSession() as Session: row = ( await Session.execute( text( """ UPDATE leaudit_entry_modules SET name = :name, description = :description, path = :route_path, areas = CAST(:areas AS jsonb), updated_at = NOW() WHERE id = :module_id AND deleted_at IS NULL RETURNING id, name, description, path, icon_path, areas, sort_order, is_enabled, created_at, updated_at """ ), { "module_id": ModuleId, "name": Body.name.strip() if Body.name is not None else current["name"], "description": (Body.description.strip() if Body.description is not None else current["description"]), "route_path": (Body.path.strip() if Body.path is not None else current["path"]), "areas": self._areasJson(Body.areas) if Body.areas is not None else self._areasJson(current["areas"]), }, ) ).mappings().first() if not row: await Session.rollback() raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "入口模块不存在") await Session.commit() return self._toModuleVo(row) async def DeleteModule(self, ModuleId: int) -> None: """删除入口模块。""" async with GetAsyncSession() as Session: await Session.execute( text( """ UPDATE leaudit_entry_modules SET deleted_at = NOW(), updated_at = NOW() WHERE id = :module_id AND deleted_at IS NULL """ ), {"module_id": ModuleId}, ) await Session.commit() async def UploadModuleImage(self, ModuleId: int, FileName: str, ContentType: str, Content: bytes) -> EntryModuleImageUploadVO: """上传入口模块图标。""" module = await self._getModuleRow(ModuleId) suffix = Path(FileName).suffix.lower() or ".png" objectKey = f"documents/mz/static/img/entry_module_{ModuleId}{suffix}" await self.OssService.UploadBytes(ObjectKey=objectKey, Content=Content, ContentType=ContentType or "application/octet-stream") async with GetAsyncSession() as Session: await Session.execute( text( """ UPDATE leaudit_entry_modules SET icon_path = :icon_path, updated_at = NOW() WHERE id = :module_id AND deleted_at IS NULL """ ), {"module_id": ModuleId, "icon_path": objectKey}, ) await Session.commit() return EntryModuleImageUploadVO( module_id=ModuleId, path=objectKey, url=f"{OSS_BASE_URL.rstrip('/')}/{OSS_BUCKET}/{objectKey}", message=f"入口模块 {module['name']} 图标上传成功", ) async def _getModuleRow(self, ModuleId: int): """查询入口模块原始记录。""" async with GetAsyncSession() as Session: row = ( await Session.execute( text( """ SELECT id, name, description, path, icon_path, areas, sort_order, is_enabled, created_at, updated_at FROM leaudit_entry_modules WHERE id = :module_id AND deleted_at IS NULL """ ), {"module_id": ModuleId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "入口模块不存在") return row async def _nextSortOrder(self, Session) -> int: """获取下一个排序号。""" maxSort = ( await Session.execute(text("SELECT COALESCE(MAX(sort_order), 0) FROM leaudit_entry_modules WHERE deleted_at IS NULL")) ).scalar_one() return int(maxSort or 0) + 10 def _toModuleVo(self, Row) -> EntryModuleVO: """把数据库记录转换为 VO。""" rawAreas = Row.get("areas") or [] areas = [ EntryModuleAreaVO( area=str(item.get("area") or ""), enabled=bool(item.get("enabled", False)), sort_order=int(item.get("sort_order", 0)), ) for item in rawAreas if isinstance(item, dict) and item.get("area") ] areas.sort(key=lambda item: (item.sort_order, item.area)) return EntryModuleVO( id=int(Row["id"]), name=str(Row["name"] or ""), description=Row.get("description"), path=Row.get("icon_path"), route_path=Row.get("path"), sort_order=int(Row.get("sort_order") or 0), is_enabled=bool(Row.get("is_enabled", True)), areas=areas, created_at=self._toIso(Row.get("created_at")), updated_at=self._toIso(Row.get("updated_at")), ) def _areasJson(self, Areas) -> str: """序列化地区配置。""" import json if not Areas: return "[]" normalized: list[dict[str, object]] = [] for index, item in enumerate(Areas, start=1): if hasattr(item, "model_dump"): payload = item.model_dump() elif isinstance(item, dict): payload = item else: continue if not payload.get("area"): continue normalized.append( { "area": str(payload["area"]), "enabled": bool(payload.get("enabled", True)), "sort_order": int(payload.get("sort_order", index)), } ) return json.dumps(normalized, ensure_ascii=False) def _toIso(self, Value) -> str | None: """时间转 ISO 字符串。""" if Value is None: return None if isinstance(Value, datetime): return Value.isoformat() return str(Value)