Files
2026-05-25 09:50:01 +08:00

215 lines
9.6 KiB
Python

"""系统使用统计控制器。"""
from __future__ import annotations
from typing import Any
from fastapi import Depends, Query
from fastapi.responses import JSONResponse
from fastapi_common.fastapi_common_security.security import verify_access_token
from fastapi_common.fastapi_common_web.controller import BaseController
from fastapi_common.fastapi_common_web.domain.responses import Result
from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import (
UsageStatsAreaPageVO,
UsageStatsDepartmentPageVO,
UsageStatsDetailPageVO,
UsageStatsOverviewVO,
UsageStatsTrendVO,
UsageStatsUserPageVO,
)
from fastapi_modules.fastapi_leaudit.services.impl.permissionServiceImpl import PermissionServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.usageStatsServiceImpl import UsageStatsServiceImpl
from fastapi_modules.fastapi_leaudit.services.permissionService import IPermissionService
from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService
class UsageStatsController(BaseController):
"""系统使用统计控制器。"""
def __init__(self):
super().__init__(prefix="/v3/usage-stats", tags=["系统使用统计"])
self.UsageStatsService: IUsageStatsService = UsageStatsServiceImpl()
self.PermissionService: IPermissionService = PermissionServiceImpl()
@self.router.get("/overview", response_model=Result[UsageStatsOverviewVO])
async def GetOverview(
dateFrom: str | None = None,
dateTo: str | None = None,
area: str | None = None,
areaScope: str | None = None,
entryModuleId: int | None = None,
documentTypeId: int | None = None,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""查询系统使用统计总览。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "usage_stats:overview:read"):
return JSONResponse(status_code=403, content={"code": 403, "message": "当前用户没有查看统计总览权限", "data": None})
data = await self.UsageStatsService.GetOverview(
CurrentUserId=int(payload["user_id"]),
Filters={
"dateFrom": dateFrom,
"dateTo": dateTo,
"area": area,
"areaScope": areaScope,
"entryModuleId": entryModuleId,
"documentTypeId": documentTypeId,
},
)
return Result.success(data=data)
@self.router.get("/trends", response_model=Result[UsageStatsTrendVO])
async def GetTrends(
dateFrom: str | None = None,
dateTo: str | None = None,
granularity: str = "day",
metric: str = "audit",
area: str | None = None,
areaScope: str | None = None,
entryModuleId: int | None = None,
documentTypeId: int | None = None,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""查询系统使用统计趋势。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "usage_stats:trends:read"):
return JSONResponse(status_code=403, content={"code": 403, "message": "当前用户没有查看统计趋势权限", "data": None})
data = await self.UsageStatsService.GetTrends(
CurrentUserId=int(payload["user_id"]),
Filters={
"dateFrom": dateFrom,
"dateTo": dateTo,
"granularity": granularity,
"metric": metric,
"area": area,
"areaScope": areaScope,
"entryModuleId": entryModuleId,
"documentTypeId": documentTypeId,
},
)
return Result.success(data=data)
@self.router.get("/by-users", response_model=Result[UsageStatsUserPageVO])
async def GetUsers(
page: int = 1,
pageSize: int = 20,
keyword: str | None = None,
userId: int | None = None,
departmentName: str | None = None,
area: str | None = None,
areaScope: str | None = None,
dateFrom: str | None = None,
dateTo: str | None = None,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""按用户维度查询系统使用统计。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "usage_stats:users:read"):
return JSONResponse(status_code=403, content={"code": 403, "message": "当前用户没有查看用户统计权限", "data": None})
data = await self.UsageStatsService.GetUsers(
CurrentUserId=int(payload["user_id"]),
Filters={
"page": page,
"pageSize": pageSize,
"keyword": keyword,
"userId": userId,
"departmentName": departmentName,
"area": area,
"areaScope": areaScope,
"dateFrom": dateFrom,
"dateTo": dateTo,
},
)
return Result.success(data=data)
@self.router.get("/by-departments", response_model=Result[UsageStatsDepartmentPageVO])
async def GetDepartments(
page: int = 1,
pageSize: int = 20,
departmentName: str | None = None,
area: str | None = None,
areaScope: str | None = None,
dateFrom: str | None = None,
dateTo: str | None = None,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""按部门维度查询系统使用统计。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "usage_stats:departments:read"):
return JSONResponse(status_code=403, content={"code": 403, "message": "当前用户没有查看部门统计权限", "data": None})
data = await self.UsageStatsService.GetDepartments(
CurrentUserId=int(payload["user_id"]),
Filters={
"page": page,
"pageSize": pageSize,
"departmentName": departmentName,
"area": area,
"areaScope": areaScope,
"dateFrom": dateFrom,
"dateTo": dateTo,
},
)
return Result.success(data=data)
@self.router.get("/by-areas", response_model=Result[UsageStatsAreaPageVO])
async def GetAreas(
page: int = 1,
pageSize: int = 20,
areaScope: str = Query("user", description="地区口径:user/document"),
dateFrom: str | None = None,
dateTo: str | None = None,
entryModuleId: int | None = None,
documentTypeId: int | None = None,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""按地区维度查询系统使用统计。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "usage_stats:areas:read"):
return JSONResponse(status_code=403, content={"code": 403, "message": "当前用户没有查看地区统计权限", "data": None})
data = await self.UsageStatsService.GetAreas(
CurrentUserId=int(payload["user_id"]),
Filters={
"page": page,
"pageSize": pageSize,
"areaScope": areaScope,
"dateFrom": dateFrom,
"dateTo": dateTo,
"entryModuleId": entryModuleId,
"documentTypeId": documentTypeId,
},
)
return Result.success(data=data)
@self.router.get("/details", response_model=Result[UsageStatsDetailPageVO])
async def GetDetails(
dataType: str = Query("audit", description="login/upload/audit"),
page: int = 1,
pageSize: int = 20,
userId: int | None = None,
departmentName: str | None = None,
area: str | None = None,
areaScope: str | None = None,
entryModuleId: int | None = None,
documentTypeId: int | None = None,
dateFrom: str | None = None,
dateTo: str | None = None,
payload: dict[str, Any] = Depends(verify_access_token),
):
"""查询系统使用统计明细。"""
if not await self.PermissionService.CheckPermission(int(payload["user_id"]), "usage_stats:details:read"):
return JSONResponse(status_code=403, content={"code": 403, "message": "当前用户没有查看统计明细权限", "data": None})
data = await self.UsageStatsService.GetDetails(
CurrentUserId=int(payload["user_id"]),
Filters={
"dataType": dataType,
"page": page,
"pageSize": pageSize,
"userId": userId,
"departmentName": departmentName,
"area": area,
"areaScope": areaScope,
"entryModuleId": entryModuleId,
"documentTypeId": documentTypeId,
"dateFrom": dateFrom,
"dateTo": dateTo,
},
)
return Result.success(data=data)