Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/controllers/entryModuleController.py
T
2026-04-29 22:25:06 +08:00

86 lines
5.6 KiB
Python

"""入口模块管理控制器。"""
from fastapi import Depends, File, Query, UploadFile
from fastapi.responses import JSONResponse
from fastapi_common.fastapi_common_security.security import verify_access_token
from fastapi_common.fastapi_common_web.controller import BaseController
from fastapi_modules.fastapi_leaudit.domian.Dto.entryModuleDto import EntryModuleCreateDTO, EntryModuleUpdateDTO
from fastapi_modules.fastapi_leaudit.services.entryModuleAdminService import IEntryModuleAdminService
from fastapi_modules.fastapi_leaudit.services.impl.entryModuleAdminServiceImpl import EntryModuleAdminServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService
class EntryModuleController(BaseController):
"""入口模块管理控制器。"""
def __init__(self):
super().__init__(prefix="/v3/entry-modules", tags=["入口模块管理"])
self.EntryModuleService: IEntryModuleAdminService = EntryModuleAdminServiceImpl()
self.PermissionService: IPermissionService = PermissionServiceImpl()
@self.router.get("")
async def GetEntryModules(
name: str | None = Query(None, description="模块名称模糊搜索"),
area: str | None = Query(None, description="地区筛选"),
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(10, ge=1, le=200, description="每页数量"),
payload: dict = Depends(verify_access_token),
):
"""查询入口模块列表。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:list:read"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有入口模块列表权限", "data": None})
data = await self.EntryModuleService.ListModules(Name=name, Area=area, Page=page, PageSize=page_size)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.get("/{ModuleId}")
async def GetEntryModule(ModuleId: int, payload: dict = Depends(verify_access_token)):
"""查询入口模块详情。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:detail:read"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有入口模块详情权限", "data": None})
data = await self.EntryModuleService.GetModule(ModuleId)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.post("")
async def CreateEntryModule(Body: EntryModuleCreateDTO, payload: dict = Depends(verify_access_token)):
"""创建入口模块。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:create:write"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有创建入口模块权限", "data": None})
data = await self.EntryModuleService.CreateModule(Body)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.put("/{ModuleId}")
async def UpdateEntryModule(ModuleId: int, Body: EntryModuleUpdateDTO, payload: dict = Depends(verify_access_token)):
"""更新入口模块。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:update:write"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有更新入口模块权限", "data": None})
data = await self.EntryModuleService.UpdateModule(ModuleId, Body)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.delete("/{ModuleId}")
async def DeleteEntryModule(ModuleId: int, payload: dict = Depends(verify_access_token)):
"""删除入口模块。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:delete:delete"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有删除入口模块权限", "data": None})
await self.EntryModuleService.DeleteModule(ModuleId)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": {"message": "删除成功"}})
@self.router.post("/{ModuleId}/image")
async def UploadEntryModuleImage(
ModuleId: int,
file: UploadFile = File(..., description="入口模块图标"),
payload: dict = Depends(verify_access_token),
):
"""上传入口模块图标。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:image:write"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有上传入口模块图标权限", "data": None})
content = await file.read()
data = await self.EntryModuleService.UploadModuleImage(
ModuleId=ModuleId,
FileName=file.filename or f"entry_module_{ModuleId}.png",
ContentType=file.content_type or "application/octet-stream",
Content=content,
)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})