diff --git a/fastapi_modules/fastapi_leaudit/controllers/usageStatsController.py b/fastapi_modules/fastapi_leaudit/controllers/usageStatsController.py new file mode 100644 index 0000000..0a832db --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/controllers/usageStatsController.py @@ -0,0 +1,192 @@ +"""系统使用统计控制器。""" + +from __future__ import annotations + +from typing import Any + +from fastapi import Depends, Query + +from fastapi_common.fastapi_common_security.security import verify_access_token +from fastapi_common.fastapi_common_web.controller import BaseController +from fastapi_common.fastapi_common_web.domain.responses import Result + +from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import ( + UsageStatsAreaPageVO, + UsageStatsDepartmentPageVO, + UsageStatsDetailPageVO, + UsageStatsOverviewVO, + UsageStatsTrendVO, + UsageStatsUserPageVO, +) +from fastapi_modules.fastapi_leaudit.services.impl.usageStatsServiceImpl import UsageStatsServiceImpl +from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService + + +class UsageStatsController(BaseController): + """系统使用统计控制器。""" + + def __init__(self): + super().__init__(prefix="/v3/usage-stats", tags=["系统使用统计"]) + self.UsageStatsService: IUsageStatsService = UsageStatsServiceImpl() + + @self.router.get("/overview", response_model=Result[UsageStatsOverviewVO]) + async def GetOverview( + dateFrom: str | None = None, + dateTo: str | None = None, + area: str | None = None, + areaScope: str | None = None, + entryModuleId: int | None = None, + documentTypeId: int | None = None, + payload: dict[str, Any] = Depends(verify_access_token), + ): + data = await self.UsageStatsService.GetOverview( + CurrentUserId=int(payload["user_id"]), + Filters={ + "dateFrom": dateFrom, + "dateTo": dateTo, + "area": area, + "areaScope": areaScope, + "entryModuleId": entryModuleId, + "documentTypeId": documentTypeId, + }, + ) + return Result.success(data=data) + + @self.router.get("/trends", response_model=Result[UsageStatsTrendVO]) + async def GetTrends( + dateFrom: str | None = None, + dateTo: str | None = None, + granularity: str = "day", + metric: str = "audit", + area: str | None = None, + areaScope: str | None = None, + entryModuleId: int | None = None, + documentTypeId: int | None = None, + payload: dict[str, Any] = Depends(verify_access_token), + ): + data = await self.UsageStatsService.GetTrends( + CurrentUserId=int(payload["user_id"]), + Filters={ + "dateFrom": dateFrom, + "dateTo": dateTo, + "granularity": granularity, + "metric": metric, + "area": area, + "areaScope": areaScope, + "entryModuleId": entryModuleId, + "documentTypeId": documentTypeId, + }, + ) + return Result.success(data=data) + + @self.router.get("/by-users", response_model=Result[UsageStatsUserPageVO]) + async def GetUsers( + page: int = 1, + pageSize: int = 20, + keyword: str | None = None, + userId: int | None = None, + departmentName: str | None = None, + area: str | None = None, + areaScope: str | None = None, + dateFrom: str | None = None, + dateTo: str | None = None, + payload: dict[str, Any] = Depends(verify_access_token), + ): + data = await self.UsageStatsService.GetUsers( + CurrentUserId=int(payload["user_id"]), + Filters={ + "page": page, + "pageSize": pageSize, + "keyword": keyword, + "userId": userId, + "departmentName": departmentName, + "area": area, + "areaScope": areaScope, + "dateFrom": dateFrom, + "dateTo": dateTo, + }, + ) + return Result.success(data=data) + + @self.router.get("/by-departments", response_model=Result[UsageStatsDepartmentPageVO]) + async def GetDepartments( + page: int = 1, + pageSize: int = 20, + departmentName: str | None = None, + area: str | None = None, + areaScope: str | None = None, + dateFrom: str | None = None, + dateTo: str | None = None, + payload: dict[str, Any] = Depends(verify_access_token), + ): + data = await self.UsageStatsService.GetDepartments( + CurrentUserId=int(payload["user_id"]), + Filters={ + "page": page, + "pageSize": pageSize, + "departmentName": departmentName, + "area": area, + "areaScope": areaScope, + "dateFrom": dateFrom, + "dateTo": dateTo, + }, + ) + return Result.success(data=data) + + @self.router.get("/by-areas", response_model=Result[UsageStatsAreaPageVO]) + async def GetAreas( + page: int = 1, + pageSize: int = 20, + areaScope: str = Query("user", description="地区口径:user/document"), + dateFrom: str | None = None, + dateTo: str | None = None, + entryModuleId: int | None = None, + documentTypeId: int | None = None, + payload: dict[str, Any] = Depends(verify_access_token), + ): + data = await self.UsageStatsService.GetAreas( + CurrentUserId=int(payload["user_id"]), + Filters={ + "page": page, + "pageSize": pageSize, + "areaScope": areaScope, + "dateFrom": dateFrom, + "dateTo": dateTo, + "entryModuleId": entryModuleId, + "documentTypeId": documentTypeId, + }, + ) + return Result.success(data=data) + + @self.router.get("/details", response_model=Result[UsageStatsDetailPageVO]) + async def GetDetails( + dataType: str = Query("audit", description="login/upload/audit"), + page: int = 1, + pageSize: int = 20, + userId: int | None = None, + departmentName: str | None = None, + area: str | None = None, + areaScope: str | None = None, + entryModuleId: int | None = None, + documentTypeId: int | None = None, + dateFrom: str | None = None, + dateTo: str | None = None, + payload: dict[str, Any] = Depends(verify_access_token), + ): + data = await self.UsageStatsService.GetDetails( + CurrentUserId=int(payload["user_id"]), + Filters={ + "dataType": dataType, + "page": page, + "pageSize": pageSize, + "userId": userId, + "departmentName": departmentName, + "area": area, + "areaScope": areaScope, + "entryModuleId": entryModuleId, + "documentTypeId": documentTypeId, + "dateFrom": dateFrom, + "dateTo": dateTo, + }, + ) + return Result.success(data=data) diff --git a/fastapi_modules/fastapi_leaudit/domian/vo/usageStatsVo.py b/fastapi_modules/fastapi_leaudit/domian/vo/usageStatsVo.py new file mode 100644 index 0000000..f8f0e97 --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/domian/vo/usageStatsVo.py @@ -0,0 +1,113 @@ +"""系统使用统计 VO。""" + +from __future__ import annotations + +from pydantic import BaseModel, Field + + +class UsageStatsOverviewVO(BaseModel): + loginUserCount: int = Field(0) + loginCount: int = Field(0) + activeUserCount: int = Field(0) + uploadDocumentCount: int = Field(0) + uploadAttachmentCount: int = Field(0) + auditRunCount: int = Field(0) + auditCompletedCount: int = Field(0) + auditFailedCount: int = Field(0) + lastUpdatedAt: str = Field("") + + +class UsageStatsTrendItemVO(BaseModel): + label: str = Field("") + value: int = Field(0) + + +class UsageStatsTrendVO(BaseModel): + granularity: str = Field("day") + metric: str = Field("audit") + items: list[UsageStatsTrendItemVO] = Field(default_factory=list) + + +class UsageStatsUserItemVO(BaseModel): + userId: int = Field(...) + username: str = Field("") + nickName: str = Field("") + departmentName: str | None = Field(None) + area: str | None = Field(None) + loginCount: int = Field(0) + uploadDocumentCount: int = Field(0) + uploadAttachmentCount: int = Field(0) + auditRunCount: int = Field(0) + auditCompletedCount: int = Field(0) + auditFailedCount: int = Field(0) + lastLoginTime: str | None = Field(None) + + +class UsageStatsUserPageVO(BaseModel): + total: int = Field(0) + page: int = Field(1) + pageSize: int = Field(20) + items: list[UsageStatsUserItemVO] = Field(default_factory=list) + + +class UsageStatsDepartmentItemVO(BaseModel): + departmentName: str = Field("") + userCount: int = Field(0) + loginUserCount: int = Field(0) + loginCount: int = Field(0) + uploadDocumentCount: int = Field(0) + uploadAttachmentCount: int = Field(0) + auditRunCount: int = Field(0) + auditCompletedCount: int = Field(0) + auditFailedCount: int = Field(0) + + +class UsageStatsDepartmentPageVO(BaseModel): + total: int = Field(0) + page: int = Field(1) + pageSize: int = Field(20) + items: list[UsageStatsDepartmentItemVO] = Field(default_factory=list) + + +class UsageStatsAreaItemVO(BaseModel): + area: str = Field("") + loginUserCount: int = Field(0) + loginCount: int = Field(0) + uploadDocumentCount: int = Field(0) + uploadAttachmentCount: int = Field(0) + auditRunCount: int = Field(0) + auditCompletedCount: int = Field(0) + auditFailedCount: int = Field(0) + + +class UsageStatsAreaPageVO(BaseModel): + total: int = Field(0) + page: int = Field(1) + pageSize: int = Field(20) + areaScope: str = Field("user") + items: list[UsageStatsAreaItemVO] = Field(default_factory=list) + + +class UsageStatsDetailItemVO(BaseModel): + time: str = Field("") + dataType: str = Field("") + userId: int | None = Field(None) + username: str = Field("") + nickName: str = Field("") + departmentName: str | None = Field(None) + area: str | None = Field(None) + documentId: int | None = Field(None) + documentName: str | None = Field(None) + documentTypeId: int | None = Field(None) + documentTypeName: str | None = Field(None) + entryModuleId: int | None = Field(None) + entryModuleName: str | None = Field(None) + status: str | None = Field(None) + extra: dict = Field(default_factory=dict) + + +class UsageStatsDetailPageVO(BaseModel): + total: int = Field(0) + page: int = Field(1) + pageSize: int = Field(20) + items: list[UsageStatsDetailItemVO] = Field(default_factory=list) diff --git a/fastapi_modules/fastapi_leaudit/models/__init__.py b/fastapi_modules/fastapi_leaudit/models/__init__.py index afb1cd5..f0ff2c7 100644 --- a/fastapi_modules/fastapi_leaudit/models/__init__.py +++ b/fastapi_modules/fastapi_leaudit/models/__init__.py @@ -13,6 +13,7 @@ from fastapi_modules.fastapi_leaudit.models.leauditRagDocument import LeauditRag from fastapi_modules.fastapi_leaudit.models.leauditRagChatApp import LeauditRagChatApp from fastapi_modules.fastapi_leaudit.models.leauditRagConversation import LeauditRagConversation from fastapi_modules.fastapi_leaudit.models.leauditRagMessage import LeauditRagMessage +from fastapi_modules.fastapi_leaudit.models.usageLoginEvent import UsageLoginEvent __all__ = [ "LeauditDocument", @@ -28,4 +29,5 @@ __all__ = [ "LeauditRagChatApp", "LeauditRagConversation", "LeauditRagMessage", + "UsageLoginEvent", ] diff --git a/fastapi_modules/fastapi_leaudit/models/usageLoginEvent.py b/fastapi_modules/fastapi_leaudit/models/usageLoginEvent.py new file mode 100644 index 0000000..235ea21 --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/models/usageLoginEvent.py @@ -0,0 +1,36 @@ +"""系统使用统计 - 登录事件模型。""" + +from __future__ import annotations + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, String +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column + +from fastapi_common.fastapi_common_web.models import BaseModel + + +class UsageLoginEvent(BaseModel): + """登录事件明细表。""" + + __tablename__ = "usage_login_events" + + Id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, autoincrement=True) + userId: Mapped[int | None] = mapped_column("user_id", BigInteger) + sub: Mapped[str | None] = mapped_column(String(128)) + usernameSnapshot: Mapped[str | None] = mapped_column("username_snapshot", String(128)) + nickNameSnapshot: Mapped[str | None] = mapped_column("nick_name_snapshot", String(128)) + departmentNameSnapshot: Mapped[str | None] = mapped_column("department_name_snapshot", String(255)) + ouIdSnapshot: Mapped[str | None] = mapped_column("ou_id_snapshot", String(128)) + ouNameSnapshot: Mapped[str | None] = mapped_column("ou_name_snapshot", String(255)) + areaSnapshot: Mapped[str | None] = mapped_column("area_snapshot", String(64)) + loginTime: Mapped[datetime] = mapped_column("login_time", DateTime(timezone=True)) + loginResult: Mapped[str] = mapped_column("login_result", String(16)) + loginType: Mapped[str] = mapped_column("login_type", String(32)) + ipAddress: Mapped[str | None] = mapped_column("ip_address", String(64)) + userAgent: Mapped[str | None] = mapped_column("user_agent", String(1024)) + clientType: Mapped[str | None] = mapped_column("client_type", String(32)) + tokenJti: Mapped[str | None] = mapped_column("token_jti", String(128)) + failureReason: Mapped[str | None] = mapped_column("failure_reason", String(255)) + extra: Mapped[dict | None] = mapped_column(JSONB) diff --git a/fastapi_modules/fastapi_leaudit/services/__init__.py b/fastapi_modules/fastapi_leaudit/services/__init__.py index 6289fe8..4b3e28c 100644 --- a/fastapi_modules/fastapi_leaudit/services/__init__.py +++ b/fastapi_modules/fastapi_leaudit/services/__init__.py @@ -16,6 +16,7 @@ from fastapi_modules.fastapi_leaudit.services.ruleConfigService import IRuleConf from fastapi_modules.fastapi_leaudit.services.ruleService import IRuleService from fastapi_modules.fastapi_leaudit.services.ragDatasetService import IRagDatasetService from fastapi_modules.fastapi_leaudit.services.ragChatService import IRagChatService +from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService __all__ = [ "IAuditService", @@ -34,4 +35,5 @@ __all__ = [ "IRuleService", "IRagDatasetService", "IRagChatService", + "IUsageStatsService", ] diff --git a/fastapi_modules/fastapi_leaudit/services/impl/usageStatsServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/usageStatsServiceImpl.py new file mode 100644 index 0000000..978a97d --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/services/impl/usageStatsServiceImpl.py @@ -0,0 +1,912 @@ +"""系统使用统计服务实现。""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from sqlalchemy import text + +from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession +from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum +from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException + +from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import ( + UsageStatsAreaItemVO, + UsageStatsAreaPageVO, + UsageStatsDepartmentItemVO, + UsageStatsDepartmentPageVO, + UsageStatsDetailItemVO, + UsageStatsDetailPageVO, + UsageStatsOverviewVO, + UsageStatsTrendItemVO, + UsageStatsTrendVO, + UsageStatsUserItemVO, + UsageStatsUserPageVO, +) +from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService + + +class UsageStatsServiceImpl(IUsageStatsService): + """系统使用统计服务实现。""" + + async def RecordLoginEvent( + self, + *, + UserInfo: dict[str, Any] | None, + Sub: str | None, + LoginResult: str, + LoginType: str, + IpAddress: str | None, + UserAgent: str | None, + FailureReason: str | None = None, + TokenJti: str | None = None, + ) -> None: + async with GetAsyncSession() as session: + await self._ensure_usage_stats_schema(session) + user_id = self._to_int(UserInfo.get("user_id")) if UserInfo else None + username = str((UserInfo or {}).get("username") or "") or None + nick_name = str((UserInfo or {}).get("nick_name") or "") or None + dep_name = str((UserInfo or {}).get("dep_name") or "") or None + ou_id = str((UserInfo or {}).get("ou_id") or "") or None + ou_name = str((UserInfo or {}).get("ou_name") or "") or None + area = str((UserInfo or {}).get("area") or "") or None + client_type = self._detect_client_type(UserAgent) + + await session.execute( + text( + """ + INSERT INTO usage_login_events ( + user_id, sub, username_snapshot, nick_name_snapshot, + department_name_snapshot, ou_id_snapshot, ou_name_snapshot, + area_snapshot, login_time, login_result, login_type, + ip_address, user_agent, client_type, token_jti, failure_reason, + extra, created_at, updated_at, deleted_at + ) VALUES ( + :user_id, :sub, :username_snapshot, :nick_name_snapshot, + :department_name_snapshot, :ou_id_snapshot, :ou_name_snapshot, + :area_snapshot, NOW(), :login_result, :login_type, + :ip_address, :user_agent, :client_type, :token_jti, :failure_reason, + CAST(:extra AS jsonb), NOW(), NOW(), NULL + ) + """ + ), + { + "user_id": user_id, + "sub": Sub, + "username_snapshot": username, + "nick_name_snapshot": nick_name, + "department_name_snapshot": dep_name, + "ou_id_snapshot": ou_id, + "ou_name_snapshot": ou_name, + "area_snapshot": area, + "login_result": LoginResult, + "login_type": LoginType, + "ip_address": IpAddress, + "user_agent": UserAgent, + "client_type": client_type, + "token_jti": TokenJti, + "failure_reason": FailureReason, + "extra": "{}", + }, + ) + + if LoginResult == "success" and user_id is not None: + await session.execute( + text("UPDATE sso_users SET last_login_at = NOW(), updated_at = NOW() WHERE id = :user_id"), + {"user_id": user_id}, + ) + await session.commit() + + async def GetOverview(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsOverviewVO: + context = await self._get_current_user_context(CurrentUserId) + self._assert_stats_access(context) + async with GetAsyncSession() as session: + await self._ensure_usage_stats_schema(session) + + login_where, login_params = self._build_login_filters(context, Filters) + doc_where, doc_params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u") + audit_where, audit_params = self._build_audit_filters(context, Filters) + + login_row = ( + await session.execute( + text( + f""" + SELECT + COUNT(*)::int AS login_count, + COUNT(DISTINCT user_id)::int AS login_user_count, + COALESCE(MAX(login_time), NOW()) AS last_updated_at + FROM usage_login_events e + WHERE {login_where} AND e.login_result = 'success' + """ + ), + login_params, + ) + ).mappings().first() + + upload_row = ( + await session.execute( + text( + f""" + SELECT + COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count, + COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count + FROM leaudit_document_files f + JOIN leaudit_documents d ON d.id = f.document_id + LEFT JOIN sso_users u ON u.id = f.created_by + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {doc_where} + """ + ), + doc_params, + ) + ).mappings().first() + + audit_row = ( + await session.execute( + text( + f""" + SELECT + COUNT(*)::int AS audit_run_count, + COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count, + COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count + FROM leaudit_audit_runs ar + JOIN leaudit_documents d ON d.id = ar.document_id + LEFT JOIN sso_users u ON u.id = ar.trigger_user_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {audit_where} + """ + ), + audit_params, + ) + ).mappings().first() + + active_users = ( + await session.execute( + text( + f""" + WITH active_users AS ( + SELECT DISTINCT e.user_id AS uid + FROM usage_login_events e + WHERE {login_where} AND e.login_result = 'success' AND e.user_id IS NOT NULL + UNION + SELECT DISTINCT f.created_by AS uid + FROM leaudit_document_files f + JOIN leaudit_documents d ON d.id = f.document_id + LEFT JOIN sso_users u ON u.id = f.created_by + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {doc_where} AND f.created_by IS NOT NULL + UNION + SELECT DISTINCT ar.trigger_user_id AS uid + FROM leaudit_audit_runs ar + JOIN leaudit_documents d ON d.id = ar.document_id + LEFT JOIN sso_users u ON u.id = ar.trigger_user_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {audit_where} AND ar.trigger_user_id IS NOT NULL + ) + SELECT COUNT(*)::int AS active_user_count FROM active_users + """ + ), + {**login_params, **doc_params, **audit_params}, + ) + ).mappings().first() + + return UsageStatsOverviewVO( + loginUserCount=int((login_row or {}).get("login_user_count") or 0), + loginCount=int((login_row or {}).get("login_count") or 0), + activeUserCount=int((active_users or {}).get("active_user_count") or 0), + uploadDocumentCount=int((upload_row or {}).get("upload_document_count") or 0), + uploadAttachmentCount=int((upload_row or {}).get("upload_attachment_count") or 0), + auditRunCount=int((audit_row or {}).get("audit_run_count") or 0), + auditCompletedCount=int((audit_row or {}).get("audit_completed_count") or 0), + auditFailedCount=int((audit_row or {}).get("audit_failed_count") or 0), + lastUpdatedAt=self._format_datetime((login_row or {}).get("last_updated_at")), + ) + + async def GetTrends(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsTrendVO: + context = await self._get_current_user_context(CurrentUserId) + self._assert_stats_access(context) + granularity = str(Filters.get("granularity") or "day").strip().lower() + if granularity not in {"day", "week", "month"}: + granularity = "day" + metric = str(Filters.get("metric") or "audit").strip().lower() + date_expr = {"day": "YYYY-MM-DD", "week": 'IYYY-"W"IW', "month": "YYYY-MM"}[granularity] + + async with GetAsyncSession() as session: + await self._ensure_usage_stats_schema(session) + items: list[UsageStatsTrendItemVO] = [] + if metric == "login": + where_clause, params = self._build_login_filters(context, Filters) + rows = ( + await session.execute( + text( + f""" + SELECT to_char(date_trunc('{granularity}', e.login_time), '{date_expr}') AS label, + COUNT(*)::int AS value + FROM usage_login_events e + WHERE {where_clause} AND e.login_result = 'success' + GROUP BY 1 + ORDER BY MIN(date_trunc('{granularity}', e.login_time)) ASC + """ + ), + params, + ) + ).mappings().all() + items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows] + elif metric == "upload": + where_clause, params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u") + rows = ( + await session.execute( + text( + f""" + SELECT to_char(date_trunc('{granularity}', f.created_at), '{date_expr}') AS label, + COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS value + FROM leaudit_document_files f + JOIN leaudit_documents d ON d.id = f.document_id + LEFT JOIN sso_users u ON u.id = f.created_by + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {where_clause} + GROUP BY 1 + ORDER BY MIN(date_trunc('{granularity}', f.created_at)) ASC + """ + ), + params, + ) + ).mappings().all() + items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows] + else: + where_clause, params = self._build_audit_filters(context, Filters) + rows = ( + await session.execute( + text( + f""" + SELECT to_char(date_trunc('{granularity}', COALESCE(ar.started_at, ar.created_at)), '{date_expr}') AS label, + COUNT(*)::int AS value + FROM leaudit_audit_runs ar + JOIN leaudit_documents d ON d.id = ar.document_id + LEFT JOIN sso_users u ON u.id = ar.trigger_user_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {where_clause} + GROUP BY 1 + ORDER BY MIN(date_trunc('{granularity}', COALESCE(ar.started_at, ar.created_at))) ASC + """ + ), + params, + ) + ).mappings().all() + items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows] + + return UsageStatsTrendVO(granularity=granularity, metric=metric, items=items) + + async def GetUsers(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsUserPageVO: + context = await self._get_current_user_context(CurrentUserId) + self._assert_stats_access(context) + page, page_size, offset = self._pagination(Filters) + area_condition, params = self._build_user_scope_condition(context, Filters, user_alias="u") + if Filters.get("keyword"): + params["keyword"] = f"%{str(Filters['keyword']).strip()}%" + area_condition += " AND (u.username ILIKE :keyword OR u.nick_name ILIKE :keyword)" + if Filters.get("departmentName"): + params["department_name"] = str(Filters["departmentName"]).strip() + area_condition += " AND COALESCE(u.dep_name, '') = :department_name" + if Filters.get("userId") is not None: + params["requested_user_id"] = int(Filters["userId"]) + area_condition += " AND u.id = :requested_user_id" + + login_date_clause, login_date_params = self._build_range_clause("e.login_time", Filters, prefix="login") + upload_date_clause, upload_date_params = self._build_range_clause("f.created_at", Filters, prefix="upload") + audit_date_clause, audit_date_params = self._build_range_clause("COALESCE(ar.started_at, ar.created_at)", Filters, prefix="audit") + + query = text( + f""" + WITH base_users AS ( + SELECT u.id, u.username, COALESCE(u.nick_name, '') AS nick_name, + COALESCE(u.dep_name, '') AS department_name, + COALESCE(u.area, '') AS area, + u.last_login_at + FROM sso_users u + WHERE u.deleted_at IS NULL AND u.status = 0 AND {area_condition} + ), + login_stats AS ( + SELECT e.user_id, + COUNT(*)::int AS login_count, + MAX(e.login_time) AS last_login_time + FROM usage_login_events e + WHERE e.login_result = 'success' AND e.user_id IS NOT NULL {login_date_clause} + GROUP BY e.user_id + ), + upload_stats AS ( + SELECT f.created_by AS user_id, + COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count, + COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count + FROM leaudit_document_files f + JOIN leaudit_documents d ON d.id = f.document_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE f.created_by IS NOT NULL AND f.deleted_at IS NULL {upload_date_clause} + GROUP BY f.created_by + ), + audit_stats AS ( + SELECT ar.trigger_user_id AS user_id, + COUNT(*)::int AS audit_run_count, + COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count, + COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count + FROM leaudit_audit_runs ar + JOIN leaudit_documents d ON d.id = ar.document_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE ar.trigger_user_id IS NOT NULL {audit_date_clause} + GROUP BY ar.trigger_user_id + ) + SELECT + b.id AS user_id, + b.username, + b.nick_name, + b.department_name, + b.area, + COALESCE(ls.login_count, 0) AS login_count, + COALESCE(us.upload_document_count, 0) AS upload_document_count, + COALESCE(us.upload_attachment_count, 0) AS upload_attachment_count, + COALESCE(aus.audit_run_count, 0) AS audit_run_count, + COALESCE(aus.audit_completed_count, 0) AS audit_completed_count, + COALESCE(aus.audit_failed_count, 0) AS audit_failed_count, + COALESCE(ls.last_login_time, b.last_login_at) AS last_login_time + FROM base_users b + LEFT JOIN login_stats ls ON ls.user_id = b.id + LEFT JOIN upload_stats us ON us.user_id = b.id + LEFT JOIN audit_stats aus ON aus.user_id = b.id + ORDER BY b.id DESC + LIMIT :limit OFFSET :offset + """ + ) + count_query = text(f"SELECT COUNT(*)::int AS total FROM sso_users u WHERE u.deleted_at IS NULL AND u.status = 0 AND {area_condition}") + + async with GetAsyncSession() as session: + await self._ensure_usage_stats_schema(session) + merged_params = {**params, **login_date_params, **upload_date_params, **audit_date_params, "limit": page_size, "offset": offset} + total = int((await session.execute(count_query, params)).scalar_one() or 0) + rows = (await session.execute(query, merged_params)).mappings().all() + items = [ + UsageStatsUserItemVO( + userId=int(row["user_id"]), + username=str(row["username"] or ""), + nickName=str(row["nick_name"] or ""), + departmentName=row["department_name"] or None, + area=row["area"] or None, + loginCount=int(row["login_count"] or 0), + uploadDocumentCount=int(row["upload_document_count"] or 0), + uploadAttachmentCount=int(row["upload_attachment_count"] or 0), + auditRunCount=int(row["audit_run_count"] or 0), + auditCompletedCount=int(row["audit_completed_count"] or 0), + auditFailedCount=int(row["audit_failed_count"] or 0), + lastLoginTime=self._format_datetime(row.get("last_login_time")), + ) + for row in rows + ] + return UsageStatsUserPageVO(total=total, page=page, pageSize=page_size, items=items) + + async def GetDepartments(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDepartmentPageVO: + context = await self._get_current_user_context(CurrentUserId) + self._assert_stats_access(context) + page, page_size, offset = self._pagination(Filters) + by_users = await self.GetUsers(CurrentUserId, {**Filters, "page": 1, "pageSize": 5000}) + grouped: dict[str, UsageStatsDepartmentItemVO] = {} + user_ids_by_dept: dict[str, set[int]] = {} + login_user_ids_by_dept: dict[str, set[int]] = {} + for item in by_users.items: + dept = item.departmentName or "未分配部门" + if dept not in grouped: + grouped[dept] = UsageStatsDepartmentItemVO(departmentName=dept) + user_ids_by_dept[dept] = set() + login_user_ids_by_dept[dept] = set() + row = grouped[dept] + user_ids_by_dept[dept].add(item.userId) + if item.loginCount > 0: + login_user_ids_by_dept[dept].add(item.userId) + row.loginCount += item.loginCount + row.uploadDocumentCount += item.uploadDocumentCount + row.uploadAttachmentCount += item.uploadAttachmentCount + row.auditRunCount += item.auditRunCount + row.auditCompletedCount += item.auditCompletedCount + row.auditFailedCount += item.auditFailedCount + items = [] + for dept, row in grouped.items(): + row.userCount = len(user_ids_by_dept[dept]) + row.loginUserCount = len(login_user_ids_by_dept[dept]) + items.append(row) + items.sort(key=lambda item: (item.auditRunCount, item.uploadDocumentCount, item.loginCount), reverse=True) + total = len(items) + sliced = items[offset: offset + page_size] + return UsageStatsDepartmentPageVO(total=total, page=page, pageSize=page_size, items=sliced) + + async def GetAreas(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsAreaPageVO: + context = await self._get_current_user_context(CurrentUserId) + self._assert_stats_access(context) + page, page_size, offset = self._pagination(Filters) + area_scope = str(Filters.get("areaScope") or "user").strip().lower() + if area_scope not in {"user", "document"}: + area_scope = "user" + + async with GetAsyncSession() as session: + await self._ensure_usage_stats_schema(session) + login_where, login_params = self._build_login_filters(context, Filters) + if area_scope == "document": + upload_area_expr = "COALESCE(d.region, '')" + audit_area_expr = "COALESCE(d.region, '')" + else: + upload_area_expr = "COALESCE(u.area, '')" + audit_area_expr = "COALESCE(u.area, '')" + doc_where, doc_params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u") + audit_where, audit_params = self._build_audit_filters(context, Filters) + + login_rows = ( + await session.execute( + text( + f""" + SELECT COALESCE(e.area_snapshot, '未分配地区') AS area, + COUNT(*)::int AS login_count, + COUNT(DISTINCT e.user_id)::int AS login_user_count + FROM usage_login_events e + WHERE {login_where} AND e.login_result = 'success' + GROUP BY 1 + """ + ), + login_params, + ) + ).mappings().all() + upload_rows = ( + await session.execute( + text( + f""" + SELECT {upload_area_expr} AS area, + COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count, + COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count + FROM leaudit_document_files f + JOIN leaudit_documents d ON d.id = f.document_id + LEFT JOIN sso_users u ON u.id = f.created_by + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {doc_where} + GROUP BY 1 + """ + ), + doc_params, + ) + ).mappings().all() + audit_rows = ( + await session.execute( + text( + f""" + SELECT {audit_area_expr} AS area, + COUNT(*)::int AS audit_run_count, + COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count, + COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count + FROM leaudit_audit_runs ar + JOIN leaudit_documents d ON d.id = ar.document_id + LEFT JOIN sso_users u ON u.id = ar.trigger_user_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {audit_where} + GROUP BY 1 + """ + ), + audit_params, + ) + ).mappings().all() + + grouped: dict[str, UsageStatsAreaItemVO] = {} + for row in login_rows: + area = str(row["area"] or "未分配地区") + grouped.setdefault(area, UsageStatsAreaItemVO(area=area)) + grouped[area].loginCount += int(row["login_count"] or 0) + grouped[area].loginUserCount += int(row["login_user_count"] or 0) + for row in upload_rows: + area = str(row["area"] or "未分配地区") + grouped.setdefault(area, UsageStatsAreaItemVO(area=area)) + grouped[area].uploadDocumentCount += int(row["upload_document_count"] or 0) + grouped[area].uploadAttachmentCount += int(row["upload_attachment_count"] or 0) + for row in audit_rows: + area = str(row["area"] or "未分配地区") + grouped.setdefault(area, UsageStatsAreaItemVO(area=area)) + grouped[area].auditRunCount += int(row["audit_run_count"] or 0) + grouped[area].auditCompletedCount += int(row["audit_completed_count"] or 0) + grouped[area].auditFailedCount += int(row["audit_failed_count"] or 0) + items = list(grouped.values()) + items.sort(key=lambda item: (item.auditRunCount, item.uploadDocumentCount, item.loginCount), reverse=True) + total = len(items) + return UsageStatsAreaPageVO(total=total, page=page, pageSize=page_size, areaScope=area_scope, items=items[offset: offset + page_size]) + + async def GetDetails(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDetailPageVO: + context = await self._get_current_user_context(CurrentUserId) + self._assert_stats_access(context) + page, page_size, offset = self._pagination(Filters) + data_type = str(Filters.get("dataType") or "audit").strip().lower() + if data_type not in {"login", "upload", "audit"}: + data_type = "audit" + + async with GetAsyncSession() as session: + await self._ensure_usage_stats_schema(session) + if data_type == "login": + where_clause, params = self._build_login_filters(context, Filters) + count_sql = text(f"SELECT COUNT(*)::int FROM usage_login_events e WHERE {where_clause}") + list_sql = text( + f""" + SELECT e.login_time, e.user_id, e.username_snapshot, e.nick_name_snapshot, + e.department_name_snapshot, e.area_snapshot, e.login_result, + e.failure_reason, e.login_type + FROM usage_login_events e + WHERE {where_clause} + ORDER BY e.login_time DESC + LIMIT :limit OFFSET :offset + """ + ) + total = int((await session.execute(count_sql, params)).scalar_one() or 0) + rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all() + items = [ + UsageStatsDetailItemVO( + time=self._format_datetime(row["login_time"]), + dataType="login", + userId=self._to_int(row.get("user_id")), + username=str(row.get("username_snapshot") or ""), + nickName=str(row.get("nick_name_snapshot") or ""), + departmentName=row.get("department_name_snapshot") or None, + area=row.get("area_snapshot") or None, + status=str(row.get("login_result") or ""), + extra={"failureReason": row.get("failure_reason"), "loginType": row.get("login_type")}, + ) + for row in rows + ] + elif data_type == "upload": + where_clause, params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u") + count_sql = text( + f""" + SELECT COUNT(*)::int + FROM leaudit_document_files f + JOIN leaudit_documents d ON d.id = f.document_id + LEFT JOIN sso_users u ON u.id = f.created_by + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {where_clause} + """ + ) + list_sql = text( + f""" + SELECT f.created_at, f.created_by, COALESCE(u.username, '') AS username, + COALESCE(u.nick_name, '') AS nick_name, COALESCE(u.dep_name, '') AS department_name, + COALESCE(u.area, '') AS area, d.id AS document_id, f.file_name, + dt.id AS document_type_id, dt.name AS document_type_name, + em.id AS entry_module_id, em.name AS entry_module_name, + f.file_role + FROM leaudit_document_files f + JOIN leaudit_documents d ON d.id = f.document_id + LEFT JOIN sso_users u ON u.id = f.created_by + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + WHERE {where_clause} + ORDER BY f.created_at DESC + LIMIT :limit OFFSET :offset + """ + ) + total = int((await session.execute(count_sql, params)).scalar_one() or 0) + rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all() + items = [ + UsageStatsDetailItemVO( + time=self._format_datetime(row["created_at"]), + dataType="upload", + userId=self._to_int(row.get("created_by")), + username=str(row.get("username") or ""), + nickName=str(row.get("nick_name") or ""), + departmentName=row.get("department_name") or None, + area=row.get("area") or None, + documentId=self._to_int(row.get("document_id")), + documentName=row.get("file_name"), + documentTypeId=self._to_int(row.get("document_type_id")), + documentTypeName=row.get("document_type_name"), + entryModuleId=self._to_int(row.get("entry_module_id")), + entryModuleName=row.get("entry_module_name"), + status=row.get("file_role"), + ) + for row in rows + ] + else: + where_clause, params = self._build_audit_filters(context, Filters) + count_sql = text( + f""" + SELECT COUNT(*)::int + FROM leaudit_audit_runs ar + JOIN leaudit_documents d ON d.id = ar.document_id + LEFT JOIN sso_users u ON u.id = ar.trigger_user_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.file_role = 'primary' AND f.is_active = true + WHERE {where_clause} + """ + ) + list_sql = text( + f""" + SELECT COALESCE(ar.started_at, ar.created_at) AS event_time, + ar.trigger_user_id, COALESCE(u.username, '') AS username, + COALESCE(u.nick_name, '') AS nick_name, COALESCE(u.dep_name, '') AS department_name, + COALESCE(u.area, '') AS area, d.id AS document_id, + COALESCE(f.file_name, d.normalized_name, '') AS document_name, + dt.id AS document_type_id, dt.name AS document_type_name, + em.id AS entry_module_id, em.name AS entry_module_name, + ar.status, ar.id AS run_id, ar.result_status + FROM leaudit_audit_runs ar + JOIN leaudit_documents d ON d.id = ar.document_id + LEFT JOIN sso_users u ON u.id = ar.trigger_user_id + LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id + LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id + LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.file_role = 'primary' AND f.is_active = true + WHERE {where_clause} + ORDER BY COALESCE(ar.started_at, ar.created_at) DESC + LIMIT :limit OFFSET :offset + """ + ) + total = int((await session.execute(count_sql, params)).scalar_one() or 0) + rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all() + items = [ + UsageStatsDetailItemVO( + time=self._format_datetime(row["event_time"]), + dataType="audit", + userId=self._to_int(row.get("trigger_user_id")), + username=str(row.get("username") or ""), + nickName=str(row.get("nick_name") or ""), + departmentName=row.get("department_name") or None, + area=row.get("area") or None, + documentId=self._to_int(row.get("document_id")), + documentName=row.get("document_name"), + documentTypeId=self._to_int(row.get("document_type_id")), + documentTypeName=row.get("document_type_name"), + entryModuleId=self._to_int(row.get("entry_module_id")), + entryModuleName=row.get("entry_module_name"), + status=row.get("status"), + extra={"runId": self._to_int(row.get("run_id")), "resultStatus": row.get("result_status")}, + ) + for row in rows + ] + return UsageStatsDetailPageVO(total=total, page=page, pageSize=page_size, items=items) + + async def _ensure_usage_stats_schema(self, session) -> None: + await session.execute(text("ALTER TABLE sso_users ADD COLUMN IF NOT EXISTS last_login_at TIMESTAMPTZ NULL")) + await session.execute( + text( + """ + CREATE TABLE IF NOT EXISTS usage_login_events ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NULL, + sub VARCHAR(128) NULL, + username_snapshot VARCHAR(128) NULL, + nick_name_snapshot VARCHAR(128) NULL, + department_name_snapshot VARCHAR(255) NULL, + ou_id_snapshot VARCHAR(128) NULL, + ou_name_snapshot VARCHAR(255) NULL, + area_snapshot VARCHAR(64) NULL, + login_time TIMESTAMPTZ NOT NULL DEFAULT NOW(), + login_result VARCHAR(16) NOT NULL, + login_type VARCHAR(32) NOT NULL, + ip_address VARCHAR(64) NULL, + user_agent VARCHAR(1024) NULL, + client_type VARCHAR(32) NULL, + token_jti VARCHAR(128) NULL, + failure_reason VARCHAR(255) NULL, + extra JSONB NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + deleted_at TIMESTAMPTZ NULL + ) + """ + ) + ) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_login_time ON usage_login_events(login_time DESC)")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_user_id ON usage_login_events(user_id, login_time DESC)")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_department ON usage_login_events(department_name_snapshot, login_time DESC)")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_area ON usage_login_events(area_snapshot, login_time DESC)")) + + async def _get_current_user_context(self, current_user_id: int) -> dict[str, Any]: + async with GetAsyncSession() as session: + row = ( + await session.execute( + text( + """ + SELECT + u.id, + COALESCE(u.area, '') AS area, + COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global, + COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage, + COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin + FROM sso_users u + LEFT JOIN user_role ur ON ur.user_id = u.id + LEFT JOIN roles r ON r.id = ur.role_id + WHERE u.id = :user_id + GROUP BY u.id, u.area + """ + ), + {"user_id": current_user_id}, + ) + ).mappings().first() + if not row: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在") + return {"area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), "is_super_admin": bool(row["is_super_admin"])} + + def _assert_stats_access(self, context: dict[str, Any]) -> None: + if context["is_global"] or context["can_manage"]: + return + raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "仅管理员可查看系统使用统计") + + def _build_user_scope_condition(self, context: dict[str, Any], filters: dict[str, Any], *, user_alias: str) -> tuple[str, dict[str, Any]]: + conditions = ["1 = 1"] + params: dict[str, Any] = {} + requested_area = str(filters.get("area") or "").strip() + if context["is_global"]: + if requested_area: + conditions.append(f"COALESCE({user_alias}.area, '') = :requested_area") + params["requested_area"] = requested_area + else: + if not context["area"]: + conditions.append("1 = 0") + elif requested_area and requested_area != context["area"]: + conditions.append("1 = 0") + else: + conditions.append(f"COALESCE({user_alias}.area, '') = :scope_area") + params["scope_area"] = context["area"] + return " AND ".join(conditions), params + + def _build_login_filters(self, context: dict[str, Any], filters: dict[str, Any]) -> tuple[str, dict[str, Any]]: + conditions = ["e.deleted_at IS NULL"] + scope_cond, params = self._build_user_scope_condition(context, filters, user_alias="e") + scope_cond = scope_cond.replace("e.area", "e.area_snapshot") + conditions.append(scope_cond) + if filters.get("userId") is not None: + conditions.append("e.user_id = :user_id") + params["user_id"] = int(filters["userId"]) + if filters.get("departmentName"): + conditions.append("COALESCE(e.department_name_snapshot, '') = :department_name") + params["department_name"] = str(filters["departmentName"]).strip() + if filters.get("dateFrom"): + conditions.append("e.login_time >= :date_from") + params["date_from"] = self._normalize_date(filters["dateFrom"]) + if filters.get("dateTo"): + conditions.append("e.login_time < (CAST(:date_to AS date) + INTERVAL '1 day')") + params["date_to"] = self._normalize_date(filters["dateTo"]) + return " AND ".join(conditions), params + + def _build_document_filters(self, context: dict[str, Any], filters: dict[str, Any], *, alias_prefix: str, file_alias: str, user_alias: str) -> tuple[str, dict[str, Any]]: + conditions = [f"{file_alias}.deleted_at IS NULL", f"{alias_prefix}.deleted_at IS NULL"] + params: dict[str, Any] = {} + area_scope = str(filters.get("areaScope") or "user").strip().lower() + requested_area = str(filters.get("area") or "").strip() + if area_scope == "document": + if context["is_global"]: + if requested_area: + conditions.append(f"COALESCE({alias_prefix}.region, '') = :requested_area") + params["requested_area"] = requested_area + else: + if not context["area"]: + conditions.append("1 = 0") + elif requested_area and requested_area != context["area"]: + conditions.append("1 = 0") + else: + conditions.append(f"COALESCE({alias_prefix}.region, '') = :scope_area") + params["scope_area"] = context["area"] + else: + user_scope_cond, user_scope_params = self._build_user_scope_condition(context, filters, user_alias=user_alias) + conditions.append(user_scope_cond) + params.update(user_scope_params) + + if filters.get("userId") is not None: + conditions.append(f"{file_alias}.created_by = :user_id") + params["user_id"] = int(filters["userId"]) + if filters.get("departmentName"): + conditions.append(f"COALESCE({user_alias}.dep_name, '') = :department_name") + params["department_name"] = str(filters["departmentName"]).strip() + if filters.get("entryModuleId") is not None: + conditions.append("em.id = :entry_module_id") + params["entry_module_id"] = int(filters["entryModuleId"]) + if filters.get("documentTypeId") is not None: + conditions.append("dt.id = :document_type_id") + params["document_type_id"] = int(filters["documentTypeId"]) + if filters.get("dateFrom"): + conditions.append(f"{file_alias}.created_at >= :date_from") + params["date_from"] = self._normalize_date(filters["dateFrom"]) + if filters.get("dateTo"): + conditions.append(f"{file_alias}.created_at < (CAST(:date_to AS date) + INTERVAL '1 day')") + params["date_to"] = self._normalize_date(filters["dateTo"]) + return " AND ".join(conditions), params + + def _build_audit_filters(self, context: dict[str, Any], filters: dict[str, Any]) -> tuple[str, dict[str, Any]]: + conditions = ["d.deleted_at IS NULL"] + params: dict[str, Any] = {} + area_scope = str(filters.get("areaScope") or "user").strip().lower() + requested_area = str(filters.get("area") or "").strip() + if area_scope == "document": + if context["is_global"]: + if requested_area: + conditions.append("COALESCE(d.region, '') = :requested_area") + params["requested_area"] = requested_area + else: + if not context["area"]: + conditions.append("1 = 0") + elif requested_area and requested_area != context["area"]: + conditions.append("1 = 0") + else: + conditions.append("COALESCE(d.region, '') = :scope_area") + params["scope_area"] = context["area"] + else: + user_scope_cond, user_scope_params = self._build_user_scope_condition(context, filters, user_alias="u") + conditions.append(user_scope_cond) + params.update(user_scope_params) + + if filters.get("userId") is not None: + conditions.append("ar.trigger_user_id = :user_id") + params["user_id"] = int(filters["userId"]) + if filters.get("departmentName"): + conditions.append("COALESCE(u.dep_name, '') = :department_name") + params["department_name"] = str(filters["departmentName"]).strip() + if filters.get("entryModuleId") is not None: + conditions.append("em.id = :entry_module_id") + params["entry_module_id"] = int(filters["entryModuleId"]) + if filters.get("documentTypeId") is not None: + conditions.append("dt.id = :document_type_id") + params["document_type_id"] = int(filters["documentTypeId"]) + if filters.get("dateFrom"): + conditions.append("COALESCE(ar.started_at, ar.created_at) >= :date_from") + params["date_from"] = self._normalize_date(filters["dateFrom"]) + if filters.get("dateTo"): + conditions.append("COALESCE(ar.started_at, ar.created_at) < (CAST(:date_to AS date) + INTERVAL '1 day')") + params["date_to"] = self._normalize_date(filters["dateTo"]) + return " AND ".join(conditions), params + + def _build_range_clause(self, column: str, filters: dict[str, Any], *, prefix: str) -> tuple[str, dict[str, Any]]: + clauses: list[str] = [] + params: dict[str, Any] = {} + if filters.get("dateFrom"): + clauses.append(f" AND {column} >= :{prefix}_date_from") + params[f"{prefix}_date_from"] = self._normalize_date(filters["dateFrom"]) + if filters.get("dateTo"): + clauses.append(f" AND {column} < (CAST(:{prefix}_date_to AS date) + INTERVAL '1 day')") + params[f"{prefix}_date_to"] = self._normalize_date(filters["dateTo"]) + return "".join(clauses), params + + @staticmethod + def _normalize_date(value: Any): + from datetime import date as date_type + if isinstance(value, date_type): + return value + return date_type.fromisoformat(str(value).strip()) + + @staticmethod + def _detect_client_type(user_agent: str | None) -> str: + ua = str(user_agent or "").lower() + if any(token in ua for token in ["iphone", "android", "mobile", "ipad"]): + return "mobile" + if ua: + return "pc" + return "other" + + @staticmethod + def _to_int(value: Any) -> int | None: + try: + return int(value) if value is not None and str(value) != "" else None + except (TypeError, ValueError): + return None + + @staticmethod + def _format_datetime(value: Any) -> str | None: + if not value: + return None + if isinstance(value, datetime): + return value.strftime("%Y-%m-%d %H:%M:%S") + return str(value) + + @staticmethod + def _pagination(filters: dict[str, Any]) -> tuple[int, int, int]: + page = max(int(filters.get("page") or 1), 1) + page_size = max(1, min(int(filters.get("pageSize") or 20), 200)) + return page, page_size, (page - 1) * page_size diff --git a/fastapi_modules/fastapi_leaudit/services/usageStatsService.py b/fastapi_modules/fastapi_leaudit/services/usageStatsService.py new file mode 100644 index 0000000..cd76052 --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/services/usageStatsService.py @@ -0,0 +1,58 @@ +"""系统使用统计服务接口。""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + +from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import ( + UsageStatsAreaPageVO, + UsageStatsDepartmentPageVO, + UsageStatsDetailPageVO, + UsageStatsOverviewVO, + UsageStatsTrendVO, + UsageStatsUserPageVO, +) + + +class IUsageStatsService(ABC): + """系统使用统计服务接口。""" + + @abstractmethod + async def RecordLoginEvent( + self, + *, + UserInfo: dict[str, Any] | None, + Sub: str | None, + LoginResult: str, + LoginType: str, + IpAddress: str | None, + UserAgent: str | None, + FailureReason: str | None = None, + TokenJti: str | None = None, + ) -> None: + ... + + @abstractmethod + async def GetOverview(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsOverviewVO: + ... + + @abstractmethod + async def GetTrends(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsTrendVO: + ... + + @abstractmethod + async def GetUsers(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsUserPageVO: + ... + + @abstractmethod + async def GetDepartments(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDepartmentPageVO: + ... + + @abstractmethod + async def GetAreas(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsAreaPageVO: + ... + + @abstractmethod + async def GetDetails(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDetailPageVO: + ...