Files

93 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""入口模块管理控制器。"""
from fastapi import Depends, File, Query, UploadFile
from fastapi.responses import JSONResponse
from fastapi_common.fastapi_common_security.security import verify_access_token
from fastapi_common.fastapi_common_web.controller import BaseController
from fastapi_modules.fastapi_leaudit.domian.Dto.entryModuleDto import EntryModuleCreateDTO, EntryModuleUpdateDTO
from fastapi_modules.fastapi_leaudit.services.entryModuleAdminService import IEntryModuleAdminService
from fastapi_modules.fastapi_leaudit.services.impl.entryModuleAdminServiceImpl import EntryModuleAdminServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService
class EntryModuleController(BaseController):
"""入口模块管理控制器。"""
def __init__(self):
super().__init__(prefix="/v3/entry-modules", tags=["入口模块管理"])
self.EntryModuleService: IEntryModuleAdminService = EntryModuleAdminServiceImpl()
self.PermissionService: IPermissionService = PermissionServiceImpl()
@self.router.get("")
async def GetEntryModules(
name: str | None = Query(None, description="模块名称模糊搜索"),
area: str | None = Query(None, description="历史地区筛选(兼容参数,建议改用 tenant_code"),
tenant_code: str | None = Query(None, description="租户编码筛选"),
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(10, ge=1, le=200, description="每页数量"),
payload: dict = Depends(verify_access_token),
):
"""查询入口模块列表。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:list:read"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有入口模块列表权限", "data": None})
data = await self.EntryModuleService.ListModules(
Name=name,
Area=area,
TenantCode=tenant_code,
Page=page,
PageSize=page_size,
)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.get("/{ModuleId}")
async def GetEntryModule(ModuleId: int, payload: dict = Depends(verify_access_token)):
"""查询入口模块详情。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:detail:read"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有入口模块详情权限", "data": None})
data = await self.EntryModuleService.GetModule(ModuleId)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.post("")
async def CreateEntryModule(Body: EntryModuleCreateDTO, payload: dict = Depends(verify_access_token)):
"""创建入口模块。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:create:write"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有创建入口模块权限", "data": None})
data = await self.EntryModuleService.CreateModule(Body)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.put("/{ModuleId}")
async def UpdateEntryModule(ModuleId: int, Body: EntryModuleUpdateDTO, payload: dict = Depends(verify_access_token)):
"""更新入口模块。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:update:write"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有更新入口模块权限", "data": None})
data = await self.EntryModuleService.UpdateModule(ModuleId, Body)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})
@self.router.delete("/{ModuleId}")
async def DeleteEntryModule(ModuleId: int, payload: dict = Depends(verify_access_token)):
"""删除入口模块。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:delete:delete"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有删除入口模块权限", "data": None})
await self.EntryModuleService.DeleteModule(ModuleId)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": {"message": "删除成功"}})
@self.router.post("/{ModuleId}/image")
async def UploadEntryModuleImage(
ModuleId: int,
file: UploadFile = File(..., description="入口模块图标"),
payload: dict = Depends(verify_access_token),
):
"""上传入口模块图标。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "entry_module:image:write"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有上传入口模块图标权限", "data": None})
content = await file.read()
data = await self.EntryModuleService.UploadModuleImage(
ModuleId=ModuleId,
FileName=file.filename or f"entry_module_{ModuleId}.png",
ContentType=file.content_type or "application/octet-stream",
Content=content,
)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data.model_dump()})