"""系统使用统计控制器。""" from __future__ import annotations from typing import Any from fastapi import Depends, Query from fastapi_common.fastapi_common_security.security import verify_access_token from fastapi_common.fastapi_common_web.controller import BaseController from fastapi_common.fastapi_common_web.domain.responses import Result from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import ( UsageStatsAreaPageVO, UsageStatsDepartmentPageVO, UsageStatsDetailPageVO, UsageStatsOverviewVO, UsageStatsTrendVO, UsageStatsUserPageVO, ) from fastapi_modules.fastapi_leaudit.services.impl.usageStatsServiceImpl import UsageStatsServiceImpl from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService class UsageStatsController(BaseController): """系统使用统计控制器。""" def __init__(self): super().__init__(prefix="/v3/usage-stats", tags=["系统使用统计"]) self.UsageStatsService: IUsageStatsService = UsageStatsServiceImpl() @self.router.get("/overview", response_model=Result[UsageStatsOverviewVO]) async def GetOverview( dateFrom: str | None = None, dateTo: str | None = None, area: str | None = None, areaScope: str | None = None, entryModuleId: int | None = None, documentTypeId: int | None = None, payload: dict[str, Any] = Depends(verify_access_token), ): data = await self.UsageStatsService.GetOverview( CurrentUserId=int(payload["user_id"]), Filters={ "dateFrom": dateFrom, "dateTo": dateTo, "area": area, "areaScope": areaScope, "entryModuleId": entryModuleId, "documentTypeId": documentTypeId, }, ) return Result.success(data=data) @self.router.get("/trends", response_model=Result[UsageStatsTrendVO]) async def GetTrends( dateFrom: str | None = None, dateTo: str | None = None, granularity: str = "day", metric: str = "audit", area: str | None = None, areaScope: str | None = None, entryModuleId: int | None = None, documentTypeId: int | None = None, payload: dict[str, Any] = Depends(verify_access_token), ): data = await self.UsageStatsService.GetTrends( CurrentUserId=int(payload["user_id"]), Filters={ "dateFrom": dateFrom, "dateTo": dateTo, "granularity": granularity, "metric": metric, "area": area, "areaScope": areaScope, "entryModuleId": entryModuleId, "documentTypeId": documentTypeId, }, ) return Result.success(data=data) @self.router.get("/by-users", response_model=Result[UsageStatsUserPageVO]) async def GetUsers( page: int = 1, pageSize: int = 20, keyword: str | None = None, userId: int | None = None, departmentName: str | None = None, area: str | None = None, areaScope: str | None = None, dateFrom: str | None = None, dateTo: str | None = None, payload: dict[str, Any] = Depends(verify_access_token), ): data = await self.UsageStatsService.GetUsers( CurrentUserId=int(payload["user_id"]), Filters={ "page": page, "pageSize": pageSize, "keyword": keyword, "userId": userId, "departmentName": departmentName, "area": area, "areaScope": areaScope, "dateFrom": dateFrom, "dateTo": dateTo, }, ) return Result.success(data=data) @self.router.get("/by-departments", response_model=Result[UsageStatsDepartmentPageVO]) async def GetDepartments( page: int = 1, pageSize: int = 20, departmentName: str | None = None, area: str | None = None, areaScope: str | None = None, dateFrom: str | None = None, dateTo: str | None = None, payload: dict[str, Any] = Depends(verify_access_token), ): data = await self.UsageStatsService.GetDepartments( CurrentUserId=int(payload["user_id"]), Filters={ "page": page, "pageSize": pageSize, "departmentName": departmentName, "area": area, "areaScope": areaScope, "dateFrom": dateFrom, "dateTo": dateTo, }, ) return Result.success(data=data) @self.router.get("/by-areas", response_model=Result[UsageStatsAreaPageVO]) async def GetAreas( page: int = 1, pageSize: int = 20, areaScope: str = Query("user", description="地区口径:user/document"), dateFrom: str | None = None, dateTo: str | None = None, entryModuleId: int | None = None, documentTypeId: int | None = None, payload: dict[str, Any] = Depends(verify_access_token), ): data = await self.UsageStatsService.GetAreas( CurrentUserId=int(payload["user_id"]), Filters={ "page": page, "pageSize": pageSize, "areaScope": areaScope, "dateFrom": dateFrom, "dateTo": dateTo, "entryModuleId": entryModuleId, "documentTypeId": documentTypeId, }, ) return Result.success(data=data) @self.router.get("/details", response_model=Result[UsageStatsDetailPageVO]) async def GetDetails( dataType: str = Query("audit", description="login/upload/audit"), page: int = 1, pageSize: int = 20, userId: int | None = None, departmentName: str | None = None, area: str | None = None, areaScope: str | None = None, entryModuleId: int | None = None, documentTypeId: int | None = None, dateFrom: str | None = None, dateTo: str | None = None, payload: dict[str, Any] = Depends(verify_access_token), ): data = await self.UsageStatsService.GetDetails( CurrentUserId=int(payload["user_id"]), Filters={ "dataType": dataType, "page": page, "pageSize": pageSize, "userId": userId, "departmentName": departmentName, "area": area, "areaScope": areaScope, "entryModuleId": entryModuleId, "documentTypeId": documentTypeId, "dateFrom": dateFrom, "dateTo": dateTo, }, ) return Result.success(data=data)