Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/controllers/tenantController.py
T

81 lines
5.0 KiB
Python

"""租户主数据控制器。"""
from __future__ import annotations
from typing import Any
from fastapi import Depends, Query
from fastapi.responses import JSONResponse
from fastapi_common.fastapi_common_security.security import verify_access_token
from fastapi_common.fastapi_common_web.controller import BaseController
from fastapi_modules.fastapi_leaudit.domian.Dto.tenantDto import (
TenantCreateDTO,
TenantStatusUpdateDTO,
TenantUpdateDTO,
)
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.tenantServiceImpl import TenantServiceImpl
from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService
from fastapi_modules.fastapi_leaudit.services.tenantService import ITenantService
class TenantController(BaseController):
"""租户主数据控制器。"""
def __init__(self):
super().__init__(prefix="/v3/tenants", tags=["租户主数据"])
self.TenantService: ITenantService = TenantServiceImpl()
self.PermissionService: IPermissionService = PermissionServiceImpl()
@self.router.get("")
async def GetTenants(
include_disabled: bool = Query(False, description="是否包含禁用租户"),
payload: dict[str, Any] = Depends(verify_access_token),
):
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "rbac:tenants:read"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有查看租户主数据权限", "data": None})
data = await self.TenantService.ListTenants(IncludeDisabled=include_disabled)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": {"items": data, "total": len(data)}})
@self.router.get("/options")
async def GetTenantOptions(
feature_key: str | None = Query(None, description="按功能键过滤租户"),
payload: dict[str, Any] = Depends(verify_access_token),
):
data = await self.TenantService.ListTenantOptions(FeatureKey=feature_key)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": {"items": data, "total": len(data)}})
@self.router.get("/{TenantCode}")
async def GetTenant(TenantCode: str, payload: dict[str, Any] = Depends(verify_access_token)):
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "rbac:tenants:read"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有查看租户详情权限", "data": None})
data = await self.TenantService.GetTenant(TenantCode)
if not data:
return JSONResponse(status_code=404, content={"code": 404, "msg": "租户不存在", "data": None})
features = await self.TenantService.GetTenantFeatures(TenantCode)
aliases = await self.TenantService.GetTenantAliases(TenantCode)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": {**data, "feature_keys": features, "alias_values": aliases}})
@self.router.post("")
async def CreateTenant(Body: TenantCreateDTO, payload: dict[str, Any] = Depends(verify_access_token)):
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "rbac:tenants:create"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有创建租户权限", "data": None})
data = await self.TenantService.CreateTenant(int(payload["user_id"]), Body)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data})
@self.router.put("/{TenantCode}")
async def UpdateTenant(TenantCode: str, Body: TenantUpdateDTO, payload: dict[str, Any] = Depends(verify_access_token)):
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "rbac:tenants:update"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有更新租户权限", "data": None})
data = await self.TenantService.UpdateTenant(int(payload["user_id"]), TenantCode, Body)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data})
@self.router.patch("/{TenantCode}/status")
async def UpdateTenantStatus(TenantCode: str, Body: TenantStatusUpdateDTO, payload: dict[str, Any] = Depends(verify_access_token)):
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "rbac:tenants:status"):
return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有更新租户状态权限", "data": None})
data = await self.TenantService.UpdateTenantStatus(int(payload["user_id"]), TenantCode, Body)
return JSONResponse(status_code=200, content={"code": 0, "msg": "success", "data": data})