feat(usage-stats): add usage stats backend apis
This commit is contained in:
@@ -0,0 +1,192 @@
|
||||
"""系统使用统计控制器。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Depends, Query
|
||||
|
||||
from fastapi_common.fastapi_common_security.security import verify_access_token
|
||||
from fastapi_common.fastapi_common_web.controller import BaseController
|
||||
from fastapi_common.fastapi_common_web.domain.responses import Result
|
||||
|
||||
from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import (
|
||||
UsageStatsAreaPageVO,
|
||||
UsageStatsDepartmentPageVO,
|
||||
UsageStatsDetailPageVO,
|
||||
UsageStatsOverviewVO,
|
||||
UsageStatsTrendVO,
|
||||
UsageStatsUserPageVO,
|
||||
)
|
||||
from fastapi_modules.fastapi_leaudit.services.impl.usageStatsServiceImpl import UsageStatsServiceImpl
|
||||
from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService
|
||||
|
||||
|
||||
class UsageStatsController(BaseController):
|
||||
"""系统使用统计控制器。"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(prefix="/v3/usage-stats", tags=["系统使用统计"])
|
||||
self.UsageStatsService: IUsageStatsService = UsageStatsServiceImpl()
|
||||
|
||||
@self.router.get("/overview", response_model=Result[UsageStatsOverviewVO])
|
||||
async def GetOverview(
|
||||
dateFrom: str | None = None,
|
||||
dateTo: str | None = None,
|
||||
area: str | None = None,
|
||||
areaScope: str | None = None,
|
||||
entryModuleId: int | None = None,
|
||||
documentTypeId: int | None = None,
|
||||
payload: dict[str, Any] = Depends(verify_access_token),
|
||||
):
|
||||
data = await self.UsageStatsService.GetOverview(
|
||||
CurrentUserId=int(payload["user_id"]),
|
||||
Filters={
|
||||
"dateFrom": dateFrom,
|
||||
"dateTo": dateTo,
|
||||
"area": area,
|
||||
"areaScope": areaScope,
|
||||
"entryModuleId": entryModuleId,
|
||||
"documentTypeId": documentTypeId,
|
||||
},
|
||||
)
|
||||
return Result.success(data=data)
|
||||
|
||||
@self.router.get("/trends", response_model=Result[UsageStatsTrendVO])
|
||||
async def GetTrends(
|
||||
dateFrom: str | None = None,
|
||||
dateTo: str | None = None,
|
||||
granularity: str = "day",
|
||||
metric: str = "audit",
|
||||
area: str | None = None,
|
||||
areaScope: str | None = None,
|
||||
entryModuleId: int | None = None,
|
||||
documentTypeId: int | None = None,
|
||||
payload: dict[str, Any] = Depends(verify_access_token),
|
||||
):
|
||||
data = await self.UsageStatsService.GetTrends(
|
||||
CurrentUserId=int(payload["user_id"]),
|
||||
Filters={
|
||||
"dateFrom": dateFrom,
|
||||
"dateTo": dateTo,
|
||||
"granularity": granularity,
|
||||
"metric": metric,
|
||||
"area": area,
|
||||
"areaScope": areaScope,
|
||||
"entryModuleId": entryModuleId,
|
||||
"documentTypeId": documentTypeId,
|
||||
},
|
||||
)
|
||||
return Result.success(data=data)
|
||||
|
||||
@self.router.get("/by-users", response_model=Result[UsageStatsUserPageVO])
|
||||
async def GetUsers(
|
||||
page: int = 1,
|
||||
pageSize: int = 20,
|
||||
keyword: str | None = None,
|
||||
userId: int | None = None,
|
||||
departmentName: str | None = None,
|
||||
area: str | None = None,
|
||||
areaScope: str | None = None,
|
||||
dateFrom: str | None = None,
|
||||
dateTo: str | None = None,
|
||||
payload: dict[str, Any] = Depends(verify_access_token),
|
||||
):
|
||||
data = await self.UsageStatsService.GetUsers(
|
||||
CurrentUserId=int(payload["user_id"]),
|
||||
Filters={
|
||||
"page": page,
|
||||
"pageSize": pageSize,
|
||||
"keyword": keyword,
|
||||
"userId": userId,
|
||||
"departmentName": departmentName,
|
||||
"area": area,
|
||||
"areaScope": areaScope,
|
||||
"dateFrom": dateFrom,
|
||||
"dateTo": dateTo,
|
||||
},
|
||||
)
|
||||
return Result.success(data=data)
|
||||
|
||||
@self.router.get("/by-departments", response_model=Result[UsageStatsDepartmentPageVO])
|
||||
async def GetDepartments(
|
||||
page: int = 1,
|
||||
pageSize: int = 20,
|
||||
departmentName: str | None = None,
|
||||
area: str | None = None,
|
||||
areaScope: str | None = None,
|
||||
dateFrom: str | None = None,
|
||||
dateTo: str | None = None,
|
||||
payload: dict[str, Any] = Depends(verify_access_token),
|
||||
):
|
||||
data = await self.UsageStatsService.GetDepartments(
|
||||
CurrentUserId=int(payload["user_id"]),
|
||||
Filters={
|
||||
"page": page,
|
||||
"pageSize": pageSize,
|
||||
"departmentName": departmentName,
|
||||
"area": area,
|
||||
"areaScope": areaScope,
|
||||
"dateFrom": dateFrom,
|
||||
"dateTo": dateTo,
|
||||
},
|
||||
)
|
||||
return Result.success(data=data)
|
||||
|
||||
@self.router.get("/by-areas", response_model=Result[UsageStatsAreaPageVO])
|
||||
async def GetAreas(
|
||||
page: int = 1,
|
||||
pageSize: int = 20,
|
||||
areaScope: str = Query("user", description="地区口径:user/document"),
|
||||
dateFrom: str | None = None,
|
||||
dateTo: str | None = None,
|
||||
entryModuleId: int | None = None,
|
||||
documentTypeId: int | None = None,
|
||||
payload: dict[str, Any] = Depends(verify_access_token),
|
||||
):
|
||||
data = await self.UsageStatsService.GetAreas(
|
||||
CurrentUserId=int(payload["user_id"]),
|
||||
Filters={
|
||||
"page": page,
|
||||
"pageSize": pageSize,
|
||||
"areaScope": areaScope,
|
||||
"dateFrom": dateFrom,
|
||||
"dateTo": dateTo,
|
||||
"entryModuleId": entryModuleId,
|
||||
"documentTypeId": documentTypeId,
|
||||
},
|
||||
)
|
||||
return Result.success(data=data)
|
||||
|
||||
@self.router.get("/details", response_model=Result[UsageStatsDetailPageVO])
|
||||
async def GetDetails(
|
||||
dataType: str = Query("audit", description="login/upload/audit"),
|
||||
page: int = 1,
|
||||
pageSize: int = 20,
|
||||
userId: int | None = None,
|
||||
departmentName: str | None = None,
|
||||
area: str | None = None,
|
||||
areaScope: str | None = None,
|
||||
entryModuleId: int | None = None,
|
||||
documentTypeId: int | None = None,
|
||||
dateFrom: str | None = None,
|
||||
dateTo: str | None = None,
|
||||
payload: dict[str, Any] = Depends(verify_access_token),
|
||||
):
|
||||
data = await self.UsageStatsService.GetDetails(
|
||||
CurrentUserId=int(payload["user_id"]),
|
||||
Filters={
|
||||
"dataType": dataType,
|
||||
"page": page,
|
||||
"pageSize": pageSize,
|
||||
"userId": userId,
|
||||
"departmentName": departmentName,
|
||||
"area": area,
|
||||
"areaScope": areaScope,
|
||||
"entryModuleId": entryModuleId,
|
||||
"documentTypeId": documentTypeId,
|
||||
"dateFrom": dateFrom,
|
||||
"dateTo": dateTo,
|
||||
},
|
||||
)
|
||||
return Result.success(data=data)
|
||||
@@ -0,0 +1,113 @@
|
||||
"""系统使用统计 VO。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class UsageStatsOverviewVO(BaseModel):
|
||||
loginUserCount: int = Field(0)
|
||||
loginCount: int = Field(0)
|
||||
activeUserCount: int = Field(0)
|
||||
uploadDocumentCount: int = Field(0)
|
||||
uploadAttachmentCount: int = Field(0)
|
||||
auditRunCount: int = Field(0)
|
||||
auditCompletedCount: int = Field(0)
|
||||
auditFailedCount: int = Field(0)
|
||||
lastUpdatedAt: str = Field("")
|
||||
|
||||
|
||||
class UsageStatsTrendItemVO(BaseModel):
|
||||
label: str = Field("")
|
||||
value: int = Field(0)
|
||||
|
||||
|
||||
class UsageStatsTrendVO(BaseModel):
|
||||
granularity: str = Field("day")
|
||||
metric: str = Field("audit")
|
||||
items: list[UsageStatsTrendItemVO] = Field(default_factory=list)
|
||||
|
||||
|
||||
class UsageStatsUserItemVO(BaseModel):
|
||||
userId: int = Field(...)
|
||||
username: str = Field("")
|
||||
nickName: str = Field("")
|
||||
departmentName: str | None = Field(None)
|
||||
area: str | None = Field(None)
|
||||
loginCount: int = Field(0)
|
||||
uploadDocumentCount: int = Field(0)
|
||||
uploadAttachmentCount: int = Field(0)
|
||||
auditRunCount: int = Field(0)
|
||||
auditCompletedCount: int = Field(0)
|
||||
auditFailedCount: int = Field(0)
|
||||
lastLoginTime: str | None = Field(None)
|
||||
|
||||
|
||||
class UsageStatsUserPageVO(BaseModel):
|
||||
total: int = Field(0)
|
||||
page: int = Field(1)
|
||||
pageSize: int = Field(20)
|
||||
items: list[UsageStatsUserItemVO] = Field(default_factory=list)
|
||||
|
||||
|
||||
class UsageStatsDepartmentItemVO(BaseModel):
|
||||
departmentName: str = Field("")
|
||||
userCount: int = Field(0)
|
||||
loginUserCount: int = Field(0)
|
||||
loginCount: int = Field(0)
|
||||
uploadDocumentCount: int = Field(0)
|
||||
uploadAttachmentCount: int = Field(0)
|
||||
auditRunCount: int = Field(0)
|
||||
auditCompletedCount: int = Field(0)
|
||||
auditFailedCount: int = Field(0)
|
||||
|
||||
|
||||
class UsageStatsDepartmentPageVO(BaseModel):
|
||||
total: int = Field(0)
|
||||
page: int = Field(1)
|
||||
pageSize: int = Field(20)
|
||||
items: list[UsageStatsDepartmentItemVO] = Field(default_factory=list)
|
||||
|
||||
|
||||
class UsageStatsAreaItemVO(BaseModel):
|
||||
area: str = Field("")
|
||||
loginUserCount: int = Field(0)
|
||||
loginCount: int = Field(0)
|
||||
uploadDocumentCount: int = Field(0)
|
||||
uploadAttachmentCount: int = Field(0)
|
||||
auditRunCount: int = Field(0)
|
||||
auditCompletedCount: int = Field(0)
|
||||
auditFailedCount: int = Field(0)
|
||||
|
||||
|
||||
class UsageStatsAreaPageVO(BaseModel):
|
||||
total: int = Field(0)
|
||||
page: int = Field(1)
|
||||
pageSize: int = Field(20)
|
||||
areaScope: str = Field("user")
|
||||
items: list[UsageStatsAreaItemVO] = Field(default_factory=list)
|
||||
|
||||
|
||||
class UsageStatsDetailItemVO(BaseModel):
|
||||
time: str = Field("")
|
||||
dataType: str = Field("")
|
||||
userId: int | None = Field(None)
|
||||
username: str = Field("")
|
||||
nickName: str = Field("")
|
||||
departmentName: str | None = Field(None)
|
||||
area: str | None = Field(None)
|
||||
documentId: int | None = Field(None)
|
||||
documentName: str | None = Field(None)
|
||||
documentTypeId: int | None = Field(None)
|
||||
documentTypeName: str | None = Field(None)
|
||||
entryModuleId: int | None = Field(None)
|
||||
entryModuleName: str | None = Field(None)
|
||||
status: str | None = Field(None)
|
||||
extra: dict = Field(default_factory=dict)
|
||||
|
||||
|
||||
class UsageStatsDetailPageVO(BaseModel):
|
||||
total: int = Field(0)
|
||||
page: int = Field(1)
|
||||
pageSize: int = Field(20)
|
||||
items: list[UsageStatsDetailItemVO] = Field(default_factory=list)
|
||||
@@ -13,6 +13,7 @@ from fastapi_modules.fastapi_leaudit.models.leauditRagDocument import LeauditRag
|
||||
from fastapi_modules.fastapi_leaudit.models.leauditRagChatApp import LeauditRagChatApp
|
||||
from fastapi_modules.fastapi_leaudit.models.leauditRagConversation import LeauditRagConversation
|
||||
from fastapi_modules.fastapi_leaudit.models.leauditRagMessage import LeauditRagMessage
|
||||
from fastapi_modules.fastapi_leaudit.models.usageLoginEvent import UsageLoginEvent
|
||||
|
||||
__all__ = [
|
||||
"LeauditDocument",
|
||||
@@ -28,4 +29,5 @@ __all__ = [
|
||||
"LeauditRagChatApp",
|
||||
"LeauditRagConversation",
|
||||
"LeauditRagMessage",
|
||||
"UsageLoginEvent",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
"""系统使用统计 - 登录事件模型。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, DateTime, String
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from fastapi_common.fastapi_common_web.models import BaseModel
|
||||
|
||||
|
||||
class UsageLoginEvent(BaseModel):
|
||||
"""登录事件明细表。"""
|
||||
|
||||
__tablename__ = "usage_login_events"
|
||||
|
||||
Id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, autoincrement=True)
|
||||
userId: Mapped[int | None] = mapped_column("user_id", BigInteger)
|
||||
sub: Mapped[str | None] = mapped_column(String(128))
|
||||
usernameSnapshot: Mapped[str | None] = mapped_column("username_snapshot", String(128))
|
||||
nickNameSnapshot: Mapped[str | None] = mapped_column("nick_name_snapshot", String(128))
|
||||
departmentNameSnapshot: Mapped[str | None] = mapped_column("department_name_snapshot", String(255))
|
||||
ouIdSnapshot: Mapped[str | None] = mapped_column("ou_id_snapshot", String(128))
|
||||
ouNameSnapshot: Mapped[str | None] = mapped_column("ou_name_snapshot", String(255))
|
||||
areaSnapshot: Mapped[str | None] = mapped_column("area_snapshot", String(64))
|
||||
loginTime: Mapped[datetime] = mapped_column("login_time", DateTime(timezone=True))
|
||||
loginResult: Mapped[str] = mapped_column("login_result", String(16))
|
||||
loginType: Mapped[str] = mapped_column("login_type", String(32))
|
||||
ipAddress: Mapped[str | None] = mapped_column("ip_address", String(64))
|
||||
userAgent: Mapped[str | None] = mapped_column("user_agent", String(1024))
|
||||
clientType: Mapped[str | None] = mapped_column("client_type", String(32))
|
||||
tokenJti: Mapped[str | None] = mapped_column("token_jti", String(128))
|
||||
failureReason: Mapped[str | None] = mapped_column("failure_reason", String(255))
|
||||
extra: Mapped[dict | None] = mapped_column(JSONB)
|
||||
@@ -16,6 +16,7 @@ from fastapi_modules.fastapi_leaudit.services.ruleConfigService import IRuleConf
|
||||
from fastapi_modules.fastapi_leaudit.services.ruleService import IRuleService
|
||||
from fastapi_modules.fastapi_leaudit.services.ragDatasetService import IRagDatasetService
|
||||
from fastapi_modules.fastapi_leaudit.services.ragChatService import IRagChatService
|
||||
from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService
|
||||
|
||||
__all__ = [
|
||||
"IAuditService",
|
||||
@@ -34,4 +35,5 @@ __all__ = [
|
||||
"IRuleService",
|
||||
"IRagDatasetService",
|
||||
"IRagChatService",
|
||||
"IUsageStatsService",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,912 @@
|
||||
"""系统使用统计服务实现。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
||||
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
||||
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
|
||||
|
||||
from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import (
|
||||
UsageStatsAreaItemVO,
|
||||
UsageStatsAreaPageVO,
|
||||
UsageStatsDepartmentItemVO,
|
||||
UsageStatsDepartmentPageVO,
|
||||
UsageStatsDetailItemVO,
|
||||
UsageStatsDetailPageVO,
|
||||
UsageStatsOverviewVO,
|
||||
UsageStatsTrendItemVO,
|
||||
UsageStatsTrendVO,
|
||||
UsageStatsUserItemVO,
|
||||
UsageStatsUserPageVO,
|
||||
)
|
||||
from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService
|
||||
|
||||
|
||||
class UsageStatsServiceImpl(IUsageStatsService):
|
||||
"""系统使用统计服务实现。"""
|
||||
|
||||
async def RecordLoginEvent(
|
||||
self,
|
||||
*,
|
||||
UserInfo: dict[str, Any] | None,
|
||||
Sub: str | None,
|
||||
LoginResult: str,
|
||||
LoginType: str,
|
||||
IpAddress: str | None,
|
||||
UserAgent: str | None,
|
||||
FailureReason: str | None = None,
|
||||
TokenJti: str | None = None,
|
||||
) -> None:
|
||||
async with GetAsyncSession() as session:
|
||||
await self._ensure_usage_stats_schema(session)
|
||||
user_id = self._to_int(UserInfo.get("user_id")) if UserInfo else None
|
||||
username = str((UserInfo or {}).get("username") or "") or None
|
||||
nick_name = str((UserInfo or {}).get("nick_name") or "") or None
|
||||
dep_name = str((UserInfo or {}).get("dep_name") or "") or None
|
||||
ou_id = str((UserInfo or {}).get("ou_id") or "") or None
|
||||
ou_name = str((UserInfo or {}).get("ou_name") or "") or None
|
||||
area = str((UserInfo or {}).get("area") or "") or None
|
||||
client_type = self._detect_client_type(UserAgent)
|
||||
|
||||
await session.execute(
|
||||
text(
|
||||
"""
|
||||
INSERT INTO usage_login_events (
|
||||
user_id, sub, username_snapshot, nick_name_snapshot,
|
||||
department_name_snapshot, ou_id_snapshot, ou_name_snapshot,
|
||||
area_snapshot, login_time, login_result, login_type,
|
||||
ip_address, user_agent, client_type, token_jti, failure_reason,
|
||||
extra, created_at, updated_at, deleted_at
|
||||
) VALUES (
|
||||
:user_id, :sub, :username_snapshot, :nick_name_snapshot,
|
||||
:department_name_snapshot, :ou_id_snapshot, :ou_name_snapshot,
|
||||
:area_snapshot, NOW(), :login_result, :login_type,
|
||||
:ip_address, :user_agent, :client_type, :token_jti, :failure_reason,
|
||||
CAST(:extra AS jsonb), NOW(), NOW(), NULL
|
||||
)
|
||||
"""
|
||||
),
|
||||
{
|
||||
"user_id": user_id,
|
||||
"sub": Sub,
|
||||
"username_snapshot": username,
|
||||
"nick_name_snapshot": nick_name,
|
||||
"department_name_snapshot": dep_name,
|
||||
"ou_id_snapshot": ou_id,
|
||||
"ou_name_snapshot": ou_name,
|
||||
"area_snapshot": area,
|
||||
"login_result": LoginResult,
|
||||
"login_type": LoginType,
|
||||
"ip_address": IpAddress,
|
||||
"user_agent": UserAgent,
|
||||
"client_type": client_type,
|
||||
"token_jti": TokenJti,
|
||||
"failure_reason": FailureReason,
|
||||
"extra": "{}",
|
||||
},
|
||||
)
|
||||
|
||||
if LoginResult == "success" and user_id is not None:
|
||||
await session.execute(
|
||||
text("UPDATE sso_users SET last_login_at = NOW(), updated_at = NOW() WHERE id = :user_id"),
|
||||
{"user_id": user_id},
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
async def GetOverview(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsOverviewVO:
|
||||
context = await self._get_current_user_context(CurrentUserId)
|
||||
self._assert_stats_access(context)
|
||||
async with GetAsyncSession() as session:
|
||||
await self._ensure_usage_stats_schema(session)
|
||||
|
||||
login_where, login_params = self._build_login_filters(context, Filters)
|
||||
doc_where, doc_params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
|
||||
audit_where, audit_params = self._build_audit_filters(context, Filters)
|
||||
|
||||
login_row = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT
|
||||
COUNT(*)::int AS login_count,
|
||||
COUNT(DISTINCT user_id)::int AS login_user_count,
|
||||
COALESCE(MAX(login_time), NOW()) AS last_updated_at
|
||||
FROM usage_login_events e
|
||||
WHERE {login_where} AND e.login_result = 'success'
|
||||
"""
|
||||
),
|
||||
login_params,
|
||||
)
|
||||
).mappings().first()
|
||||
|
||||
upload_row = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count,
|
||||
COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count
|
||||
FROM leaudit_document_files f
|
||||
JOIN leaudit_documents d ON d.id = f.document_id
|
||||
LEFT JOIN sso_users u ON u.id = f.created_by
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {doc_where}
|
||||
"""
|
||||
),
|
||||
doc_params,
|
||||
)
|
||||
).mappings().first()
|
||||
|
||||
audit_row = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT
|
||||
COUNT(*)::int AS audit_run_count,
|
||||
COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count,
|
||||
COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count
|
||||
FROM leaudit_audit_runs ar
|
||||
JOIN leaudit_documents d ON d.id = ar.document_id
|
||||
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {audit_where}
|
||||
"""
|
||||
),
|
||||
audit_params,
|
||||
)
|
||||
).mappings().first()
|
||||
|
||||
active_users = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
WITH active_users AS (
|
||||
SELECT DISTINCT e.user_id AS uid
|
||||
FROM usage_login_events e
|
||||
WHERE {login_where} AND e.login_result = 'success' AND e.user_id IS NOT NULL
|
||||
UNION
|
||||
SELECT DISTINCT f.created_by AS uid
|
||||
FROM leaudit_document_files f
|
||||
JOIN leaudit_documents d ON d.id = f.document_id
|
||||
LEFT JOIN sso_users u ON u.id = f.created_by
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {doc_where} AND f.created_by IS NOT NULL
|
||||
UNION
|
||||
SELECT DISTINCT ar.trigger_user_id AS uid
|
||||
FROM leaudit_audit_runs ar
|
||||
JOIN leaudit_documents d ON d.id = ar.document_id
|
||||
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {audit_where} AND ar.trigger_user_id IS NOT NULL
|
||||
)
|
||||
SELECT COUNT(*)::int AS active_user_count FROM active_users
|
||||
"""
|
||||
),
|
||||
{**login_params, **doc_params, **audit_params},
|
||||
)
|
||||
).mappings().first()
|
||||
|
||||
return UsageStatsOverviewVO(
|
||||
loginUserCount=int((login_row or {}).get("login_user_count") or 0),
|
||||
loginCount=int((login_row or {}).get("login_count") or 0),
|
||||
activeUserCount=int((active_users or {}).get("active_user_count") or 0),
|
||||
uploadDocumentCount=int((upload_row or {}).get("upload_document_count") or 0),
|
||||
uploadAttachmentCount=int((upload_row or {}).get("upload_attachment_count") or 0),
|
||||
auditRunCount=int((audit_row or {}).get("audit_run_count") or 0),
|
||||
auditCompletedCount=int((audit_row or {}).get("audit_completed_count") or 0),
|
||||
auditFailedCount=int((audit_row or {}).get("audit_failed_count") or 0),
|
||||
lastUpdatedAt=self._format_datetime((login_row or {}).get("last_updated_at")),
|
||||
)
|
||||
|
||||
async def GetTrends(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsTrendVO:
|
||||
context = await self._get_current_user_context(CurrentUserId)
|
||||
self._assert_stats_access(context)
|
||||
granularity = str(Filters.get("granularity") or "day").strip().lower()
|
||||
if granularity not in {"day", "week", "month"}:
|
||||
granularity = "day"
|
||||
metric = str(Filters.get("metric") or "audit").strip().lower()
|
||||
date_expr = {"day": "YYYY-MM-DD", "week": 'IYYY-"W"IW', "month": "YYYY-MM"}[granularity]
|
||||
|
||||
async with GetAsyncSession() as session:
|
||||
await self._ensure_usage_stats_schema(session)
|
||||
items: list[UsageStatsTrendItemVO] = []
|
||||
if metric == "login":
|
||||
where_clause, params = self._build_login_filters(context, Filters)
|
||||
rows = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT to_char(date_trunc('{granularity}', e.login_time), '{date_expr}') AS label,
|
||||
COUNT(*)::int AS value
|
||||
FROM usage_login_events e
|
||||
WHERE {where_clause} AND e.login_result = 'success'
|
||||
GROUP BY 1
|
||||
ORDER BY MIN(date_trunc('{granularity}', e.login_time)) ASC
|
||||
"""
|
||||
),
|
||||
params,
|
||||
)
|
||||
).mappings().all()
|
||||
items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows]
|
||||
elif metric == "upload":
|
||||
where_clause, params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
|
||||
rows = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT to_char(date_trunc('{granularity}', f.created_at), '{date_expr}') AS label,
|
||||
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS value
|
||||
FROM leaudit_document_files f
|
||||
JOIN leaudit_documents d ON d.id = f.document_id
|
||||
LEFT JOIN sso_users u ON u.id = f.created_by
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {where_clause}
|
||||
GROUP BY 1
|
||||
ORDER BY MIN(date_trunc('{granularity}', f.created_at)) ASC
|
||||
"""
|
||||
),
|
||||
params,
|
||||
)
|
||||
).mappings().all()
|
||||
items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows]
|
||||
else:
|
||||
where_clause, params = self._build_audit_filters(context, Filters)
|
||||
rows = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT to_char(date_trunc('{granularity}', COALESCE(ar.started_at, ar.created_at)), '{date_expr}') AS label,
|
||||
COUNT(*)::int AS value
|
||||
FROM leaudit_audit_runs ar
|
||||
JOIN leaudit_documents d ON d.id = ar.document_id
|
||||
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {where_clause}
|
||||
GROUP BY 1
|
||||
ORDER BY MIN(date_trunc('{granularity}', COALESCE(ar.started_at, ar.created_at))) ASC
|
||||
"""
|
||||
),
|
||||
params,
|
||||
)
|
||||
).mappings().all()
|
||||
items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows]
|
||||
|
||||
return UsageStatsTrendVO(granularity=granularity, metric=metric, items=items)
|
||||
|
||||
async def GetUsers(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsUserPageVO:
|
||||
context = await self._get_current_user_context(CurrentUserId)
|
||||
self._assert_stats_access(context)
|
||||
page, page_size, offset = self._pagination(Filters)
|
||||
area_condition, params = self._build_user_scope_condition(context, Filters, user_alias="u")
|
||||
if Filters.get("keyword"):
|
||||
params["keyword"] = f"%{str(Filters['keyword']).strip()}%"
|
||||
area_condition += " AND (u.username ILIKE :keyword OR u.nick_name ILIKE :keyword)"
|
||||
if Filters.get("departmentName"):
|
||||
params["department_name"] = str(Filters["departmentName"]).strip()
|
||||
area_condition += " AND COALESCE(u.dep_name, '') = :department_name"
|
||||
if Filters.get("userId") is not None:
|
||||
params["requested_user_id"] = int(Filters["userId"])
|
||||
area_condition += " AND u.id = :requested_user_id"
|
||||
|
||||
login_date_clause, login_date_params = self._build_range_clause("e.login_time", Filters, prefix="login")
|
||||
upload_date_clause, upload_date_params = self._build_range_clause("f.created_at", Filters, prefix="upload")
|
||||
audit_date_clause, audit_date_params = self._build_range_clause("COALESCE(ar.started_at, ar.created_at)", Filters, prefix="audit")
|
||||
|
||||
query = text(
|
||||
f"""
|
||||
WITH base_users AS (
|
||||
SELECT u.id, u.username, COALESCE(u.nick_name, '') AS nick_name,
|
||||
COALESCE(u.dep_name, '') AS department_name,
|
||||
COALESCE(u.area, '') AS area,
|
||||
u.last_login_at
|
||||
FROM sso_users u
|
||||
WHERE u.deleted_at IS NULL AND u.status = 0 AND {area_condition}
|
||||
),
|
||||
login_stats AS (
|
||||
SELECT e.user_id,
|
||||
COUNT(*)::int AS login_count,
|
||||
MAX(e.login_time) AS last_login_time
|
||||
FROM usage_login_events e
|
||||
WHERE e.login_result = 'success' AND e.user_id IS NOT NULL {login_date_clause}
|
||||
GROUP BY e.user_id
|
||||
),
|
||||
upload_stats AS (
|
||||
SELECT f.created_by AS user_id,
|
||||
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count,
|
||||
COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count
|
||||
FROM leaudit_document_files f
|
||||
JOIN leaudit_documents d ON d.id = f.document_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE f.created_by IS NOT NULL AND f.deleted_at IS NULL {upload_date_clause}
|
||||
GROUP BY f.created_by
|
||||
),
|
||||
audit_stats AS (
|
||||
SELECT ar.trigger_user_id AS user_id,
|
||||
COUNT(*)::int AS audit_run_count,
|
||||
COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count,
|
||||
COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count
|
||||
FROM leaudit_audit_runs ar
|
||||
JOIN leaudit_documents d ON d.id = ar.document_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE ar.trigger_user_id IS NOT NULL {audit_date_clause}
|
||||
GROUP BY ar.trigger_user_id
|
||||
)
|
||||
SELECT
|
||||
b.id AS user_id,
|
||||
b.username,
|
||||
b.nick_name,
|
||||
b.department_name,
|
||||
b.area,
|
||||
COALESCE(ls.login_count, 0) AS login_count,
|
||||
COALESCE(us.upload_document_count, 0) AS upload_document_count,
|
||||
COALESCE(us.upload_attachment_count, 0) AS upload_attachment_count,
|
||||
COALESCE(aus.audit_run_count, 0) AS audit_run_count,
|
||||
COALESCE(aus.audit_completed_count, 0) AS audit_completed_count,
|
||||
COALESCE(aus.audit_failed_count, 0) AS audit_failed_count,
|
||||
COALESCE(ls.last_login_time, b.last_login_at) AS last_login_time
|
||||
FROM base_users b
|
||||
LEFT JOIN login_stats ls ON ls.user_id = b.id
|
||||
LEFT JOIN upload_stats us ON us.user_id = b.id
|
||||
LEFT JOIN audit_stats aus ON aus.user_id = b.id
|
||||
ORDER BY b.id DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""
|
||||
)
|
||||
count_query = text(f"SELECT COUNT(*)::int AS total FROM sso_users u WHERE u.deleted_at IS NULL AND u.status = 0 AND {area_condition}")
|
||||
|
||||
async with GetAsyncSession() as session:
|
||||
await self._ensure_usage_stats_schema(session)
|
||||
merged_params = {**params, **login_date_params, **upload_date_params, **audit_date_params, "limit": page_size, "offset": offset}
|
||||
total = int((await session.execute(count_query, params)).scalar_one() or 0)
|
||||
rows = (await session.execute(query, merged_params)).mappings().all()
|
||||
items = [
|
||||
UsageStatsUserItemVO(
|
||||
userId=int(row["user_id"]),
|
||||
username=str(row["username"] or ""),
|
||||
nickName=str(row["nick_name"] or ""),
|
||||
departmentName=row["department_name"] or None,
|
||||
area=row["area"] or None,
|
||||
loginCount=int(row["login_count"] or 0),
|
||||
uploadDocumentCount=int(row["upload_document_count"] or 0),
|
||||
uploadAttachmentCount=int(row["upload_attachment_count"] or 0),
|
||||
auditRunCount=int(row["audit_run_count"] or 0),
|
||||
auditCompletedCount=int(row["audit_completed_count"] or 0),
|
||||
auditFailedCount=int(row["audit_failed_count"] or 0),
|
||||
lastLoginTime=self._format_datetime(row.get("last_login_time")),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
return UsageStatsUserPageVO(total=total, page=page, pageSize=page_size, items=items)
|
||||
|
||||
async def GetDepartments(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDepartmentPageVO:
|
||||
context = await self._get_current_user_context(CurrentUserId)
|
||||
self._assert_stats_access(context)
|
||||
page, page_size, offset = self._pagination(Filters)
|
||||
by_users = await self.GetUsers(CurrentUserId, {**Filters, "page": 1, "pageSize": 5000})
|
||||
grouped: dict[str, UsageStatsDepartmentItemVO] = {}
|
||||
user_ids_by_dept: dict[str, set[int]] = {}
|
||||
login_user_ids_by_dept: dict[str, set[int]] = {}
|
||||
for item in by_users.items:
|
||||
dept = item.departmentName or "未分配部门"
|
||||
if dept not in grouped:
|
||||
grouped[dept] = UsageStatsDepartmentItemVO(departmentName=dept)
|
||||
user_ids_by_dept[dept] = set()
|
||||
login_user_ids_by_dept[dept] = set()
|
||||
row = grouped[dept]
|
||||
user_ids_by_dept[dept].add(item.userId)
|
||||
if item.loginCount > 0:
|
||||
login_user_ids_by_dept[dept].add(item.userId)
|
||||
row.loginCount += item.loginCount
|
||||
row.uploadDocumentCount += item.uploadDocumentCount
|
||||
row.uploadAttachmentCount += item.uploadAttachmentCount
|
||||
row.auditRunCount += item.auditRunCount
|
||||
row.auditCompletedCount += item.auditCompletedCount
|
||||
row.auditFailedCount += item.auditFailedCount
|
||||
items = []
|
||||
for dept, row in grouped.items():
|
||||
row.userCount = len(user_ids_by_dept[dept])
|
||||
row.loginUserCount = len(login_user_ids_by_dept[dept])
|
||||
items.append(row)
|
||||
items.sort(key=lambda item: (item.auditRunCount, item.uploadDocumentCount, item.loginCount), reverse=True)
|
||||
total = len(items)
|
||||
sliced = items[offset: offset + page_size]
|
||||
return UsageStatsDepartmentPageVO(total=total, page=page, pageSize=page_size, items=sliced)
|
||||
|
||||
async def GetAreas(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsAreaPageVO:
|
||||
context = await self._get_current_user_context(CurrentUserId)
|
||||
self._assert_stats_access(context)
|
||||
page, page_size, offset = self._pagination(Filters)
|
||||
area_scope = str(Filters.get("areaScope") or "user").strip().lower()
|
||||
if area_scope not in {"user", "document"}:
|
||||
area_scope = "user"
|
||||
|
||||
async with GetAsyncSession() as session:
|
||||
await self._ensure_usage_stats_schema(session)
|
||||
login_where, login_params = self._build_login_filters(context, Filters)
|
||||
if area_scope == "document":
|
||||
upload_area_expr = "COALESCE(d.region, '')"
|
||||
audit_area_expr = "COALESCE(d.region, '')"
|
||||
else:
|
||||
upload_area_expr = "COALESCE(u.area, '')"
|
||||
audit_area_expr = "COALESCE(u.area, '')"
|
||||
doc_where, doc_params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
|
||||
audit_where, audit_params = self._build_audit_filters(context, Filters)
|
||||
|
||||
login_rows = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT COALESCE(e.area_snapshot, '未分配地区') AS area,
|
||||
COUNT(*)::int AS login_count,
|
||||
COUNT(DISTINCT e.user_id)::int AS login_user_count
|
||||
FROM usage_login_events e
|
||||
WHERE {login_where} AND e.login_result = 'success'
|
||||
GROUP BY 1
|
||||
"""
|
||||
),
|
||||
login_params,
|
||||
)
|
||||
).mappings().all()
|
||||
upload_rows = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT {upload_area_expr} AS area,
|
||||
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count,
|
||||
COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count
|
||||
FROM leaudit_document_files f
|
||||
JOIN leaudit_documents d ON d.id = f.document_id
|
||||
LEFT JOIN sso_users u ON u.id = f.created_by
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {doc_where}
|
||||
GROUP BY 1
|
||||
"""
|
||||
),
|
||||
doc_params,
|
||||
)
|
||||
).mappings().all()
|
||||
audit_rows = (
|
||||
await session.execute(
|
||||
text(
|
||||
f"""
|
||||
SELECT {audit_area_expr} AS area,
|
||||
COUNT(*)::int AS audit_run_count,
|
||||
COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count,
|
||||
COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count
|
||||
FROM leaudit_audit_runs ar
|
||||
JOIN leaudit_documents d ON d.id = ar.document_id
|
||||
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {audit_where}
|
||||
GROUP BY 1
|
||||
"""
|
||||
),
|
||||
audit_params,
|
||||
)
|
||||
).mappings().all()
|
||||
|
||||
grouped: dict[str, UsageStatsAreaItemVO] = {}
|
||||
for row in login_rows:
|
||||
area = str(row["area"] or "未分配地区")
|
||||
grouped.setdefault(area, UsageStatsAreaItemVO(area=area))
|
||||
grouped[area].loginCount += int(row["login_count"] or 0)
|
||||
grouped[area].loginUserCount += int(row["login_user_count"] or 0)
|
||||
for row in upload_rows:
|
||||
area = str(row["area"] or "未分配地区")
|
||||
grouped.setdefault(area, UsageStatsAreaItemVO(area=area))
|
||||
grouped[area].uploadDocumentCount += int(row["upload_document_count"] or 0)
|
||||
grouped[area].uploadAttachmentCount += int(row["upload_attachment_count"] or 0)
|
||||
for row in audit_rows:
|
||||
area = str(row["area"] or "未分配地区")
|
||||
grouped.setdefault(area, UsageStatsAreaItemVO(area=area))
|
||||
grouped[area].auditRunCount += int(row["audit_run_count"] or 0)
|
||||
grouped[area].auditCompletedCount += int(row["audit_completed_count"] or 0)
|
||||
grouped[area].auditFailedCount += int(row["audit_failed_count"] or 0)
|
||||
items = list(grouped.values())
|
||||
items.sort(key=lambda item: (item.auditRunCount, item.uploadDocumentCount, item.loginCount), reverse=True)
|
||||
total = len(items)
|
||||
return UsageStatsAreaPageVO(total=total, page=page, pageSize=page_size, areaScope=area_scope, items=items[offset: offset + page_size])
|
||||
|
||||
async def GetDetails(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDetailPageVO:
|
||||
context = await self._get_current_user_context(CurrentUserId)
|
||||
self._assert_stats_access(context)
|
||||
page, page_size, offset = self._pagination(Filters)
|
||||
data_type = str(Filters.get("dataType") or "audit").strip().lower()
|
||||
if data_type not in {"login", "upload", "audit"}:
|
||||
data_type = "audit"
|
||||
|
||||
async with GetAsyncSession() as session:
|
||||
await self._ensure_usage_stats_schema(session)
|
||||
if data_type == "login":
|
||||
where_clause, params = self._build_login_filters(context, Filters)
|
||||
count_sql = text(f"SELECT COUNT(*)::int FROM usage_login_events e WHERE {where_clause}")
|
||||
list_sql = text(
|
||||
f"""
|
||||
SELECT e.login_time, e.user_id, e.username_snapshot, e.nick_name_snapshot,
|
||||
e.department_name_snapshot, e.area_snapshot, e.login_result,
|
||||
e.failure_reason, e.login_type
|
||||
FROM usage_login_events e
|
||||
WHERE {where_clause}
|
||||
ORDER BY e.login_time DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""
|
||||
)
|
||||
total = int((await session.execute(count_sql, params)).scalar_one() or 0)
|
||||
rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all()
|
||||
items = [
|
||||
UsageStatsDetailItemVO(
|
||||
time=self._format_datetime(row["login_time"]),
|
||||
dataType="login",
|
||||
userId=self._to_int(row.get("user_id")),
|
||||
username=str(row.get("username_snapshot") or ""),
|
||||
nickName=str(row.get("nick_name_snapshot") or ""),
|
||||
departmentName=row.get("department_name_snapshot") or None,
|
||||
area=row.get("area_snapshot") or None,
|
||||
status=str(row.get("login_result") or ""),
|
||||
extra={"failureReason": row.get("failure_reason"), "loginType": row.get("login_type")},
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
elif data_type == "upload":
|
||||
where_clause, params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
|
||||
count_sql = text(
|
||||
f"""
|
||||
SELECT COUNT(*)::int
|
||||
FROM leaudit_document_files f
|
||||
JOIN leaudit_documents d ON d.id = f.document_id
|
||||
LEFT JOIN sso_users u ON u.id = f.created_by
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {where_clause}
|
||||
"""
|
||||
)
|
||||
list_sql = text(
|
||||
f"""
|
||||
SELECT f.created_at, f.created_by, COALESCE(u.username, '') AS username,
|
||||
COALESCE(u.nick_name, '') AS nick_name, COALESCE(u.dep_name, '') AS department_name,
|
||||
COALESCE(u.area, '') AS area, d.id AS document_id, f.file_name,
|
||||
dt.id AS document_type_id, dt.name AS document_type_name,
|
||||
em.id AS entry_module_id, em.name AS entry_module_name,
|
||||
f.file_role
|
||||
FROM leaudit_document_files f
|
||||
JOIN leaudit_documents d ON d.id = f.document_id
|
||||
LEFT JOIN sso_users u ON u.id = f.created_by
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
WHERE {where_clause}
|
||||
ORDER BY f.created_at DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""
|
||||
)
|
||||
total = int((await session.execute(count_sql, params)).scalar_one() or 0)
|
||||
rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all()
|
||||
items = [
|
||||
UsageStatsDetailItemVO(
|
||||
time=self._format_datetime(row["created_at"]),
|
||||
dataType="upload",
|
||||
userId=self._to_int(row.get("created_by")),
|
||||
username=str(row.get("username") or ""),
|
||||
nickName=str(row.get("nick_name") or ""),
|
||||
departmentName=row.get("department_name") or None,
|
||||
area=row.get("area") or None,
|
||||
documentId=self._to_int(row.get("document_id")),
|
||||
documentName=row.get("file_name"),
|
||||
documentTypeId=self._to_int(row.get("document_type_id")),
|
||||
documentTypeName=row.get("document_type_name"),
|
||||
entryModuleId=self._to_int(row.get("entry_module_id")),
|
||||
entryModuleName=row.get("entry_module_name"),
|
||||
status=row.get("file_role"),
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
else:
|
||||
where_clause, params = self._build_audit_filters(context, Filters)
|
||||
count_sql = text(
|
||||
f"""
|
||||
SELECT COUNT(*)::int
|
||||
FROM leaudit_audit_runs ar
|
||||
JOIN leaudit_documents d ON d.id = ar.document_id
|
||||
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.file_role = 'primary' AND f.is_active = true
|
||||
WHERE {where_clause}
|
||||
"""
|
||||
)
|
||||
list_sql = text(
|
||||
f"""
|
||||
SELECT COALESCE(ar.started_at, ar.created_at) AS event_time,
|
||||
ar.trigger_user_id, COALESCE(u.username, '') AS username,
|
||||
COALESCE(u.nick_name, '') AS nick_name, COALESCE(u.dep_name, '') AS department_name,
|
||||
COALESCE(u.area, '') AS area, d.id AS document_id,
|
||||
COALESCE(f.file_name, d.normalized_name, '') AS document_name,
|
||||
dt.id AS document_type_id, dt.name AS document_type_name,
|
||||
em.id AS entry_module_id, em.name AS entry_module_name,
|
||||
ar.status, ar.id AS run_id, ar.result_status
|
||||
FROM leaudit_audit_runs ar
|
||||
JOIN leaudit_documents d ON d.id = ar.document_id
|
||||
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
|
||||
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
|
||||
LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.file_role = 'primary' AND f.is_active = true
|
||||
WHERE {where_clause}
|
||||
ORDER BY COALESCE(ar.started_at, ar.created_at) DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""
|
||||
)
|
||||
total = int((await session.execute(count_sql, params)).scalar_one() or 0)
|
||||
rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all()
|
||||
items = [
|
||||
UsageStatsDetailItemVO(
|
||||
time=self._format_datetime(row["event_time"]),
|
||||
dataType="audit",
|
||||
userId=self._to_int(row.get("trigger_user_id")),
|
||||
username=str(row.get("username") or ""),
|
||||
nickName=str(row.get("nick_name") or ""),
|
||||
departmentName=row.get("department_name") or None,
|
||||
area=row.get("area") or None,
|
||||
documentId=self._to_int(row.get("document_id")),
|
||||
documentName=row.get("document_name"),
|
||||
documentTypeId=self._to_int(row.get("document_type_id")),
|
||||
documentTypeName=row.get("document_type_name"),
|
||||
entryModuleId=self._to_int(row.get("entry_module_id")),
|
||||
entryModuleName=row.get("entry_module_name"),
|
||||
status=row.get("status"),
|
||||
extra={"runId": self._to_int(row.get("run_id")), "resultStatus": row.get("result_status")},
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
return UsageStatsDetailPageVO(total=total, page=page, pageSize=page_size, items=items)
|
||||
|
||||
async def _ensure_usage_stats_schema(self, session) -> None:
|
||||
await session.execute(text("ALTER TABLE sso_users ADD COLUMN IF NOT EXISTS last_login_at TIMESTAMPTZ NULL"))
|
||||
await session.execute(
|
||||
text(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS usage_login_events (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
user_id BIGINT NULL,
|
||||
sub VARCHAR(128) NULL,
|
||||
username_snapshot VARCHAR(128) NULL,
|
||||
nick_name_snapshot VARCHAR(128) NULL,
|
||||
department_name_snapshot VARCHAR(255) NULL,
|
||||
ou_id_snapshot VARCHAR(128) NULL,
|
||||
ou_name_snapshot VARCHAR(255) NULL,
|
||||
area_snapshot VARCHAR(64) NULL,
|
||||
login_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
login_result VARCHAR(16) NOT NULL,
|
||||
login_type VARCHAR(32) NOT NULL,
|
||||
ip_address VARCHAR(64) NULL,
|
||||
user_agent VARCHAR(1024) NULL,
|
||||
client_type VARCHAR(32) NULL,
|
||||
token_jti VARCHAR(128) NULL,
|
||||
failure_reason VARCHAR(255) NULL,
|
||||
extra JSONB NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
deleted_at TIMESTAMPTZ NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
)
|
||||
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_login_time ON usage_login_events(login_time DESC)"))
|
||||
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_user_id ON usage_login_events(user_id, login_time DESC)"))
|
||||
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_department ON usage_login_events(department_name_snapshot, login_time DESC)"))
|
||||
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_area ON usage_login_events(area_snapshot, login_time DESC)"))
|
||||
|
||||
async def _get_current_user_context(self, current_user_id: int) -> dict[str, Any]:
|
||||
async with GetAsyncSession() as session:
|
||||
row = (
|
||||
await session.execute(
|
||||
text(
|
||||
"""
|
||||
SELECT
|
||||
u.id,
|
||||
COALESCE(u.area, '') AS area,
|
||||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
|
||||
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
|
||||
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin
|
||||
FROM sso_users u
|
||||
LEFT JOIN user_role ur ON ur.user_id = u.id
|
||||
LEFT JOIN roles r ON r.id = ur.role_id
|
||||
WHERE u.id = :user_id
|
||||
GROUP BY u.id, u.area
|
||||
"""
|
||||
),
|
||||
{"user_id": current_user_id},
|
||||
)
|
||||
).mappings().first()
|
||||
if not row:
|
||||
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在")
|
||||
return {"area": str(row["area"] or ""), "is_global": bool(row["is_global"]), "can_manage": bool(row["can_manage"]), "is_super_admin": bool(row["is_super_admin"])}
|
||||
|
||||
def _assert_stats_access(self, context: dict[str, Any]) -> None:
|
||||
if context["is_global"] or context["can_manage"]:
|
||||
return
|
||||
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "仅管理员可查看系统使用统计")
|
||||
|
||||
def _build_user_scope_condition(self, context: dict[str, Any], filters: dict[str, Any], *, user_alias: str) -> tuple[str, dict[str, Any]]:
|
||||
conditions = ["1 = 1"]
|
||||
params: dict[str, Any] = {}
|
||||
requested_area = str(filters.get("area") or "").strip()
|
||||
if context["is_global"]:
|
||||
if requested_area:
|
||||
conditions.append(f"COALESCE({user_alias}.area, '') = :requested_area")
|
||||
params["requested_area"] = requested_area
|
||||
else:
|
||||
if not context["area"]:
|
||||
conditions.append("1 = 0")
|
||||
elif requested_area and requested_area != context["area"]:
|
||||
conditions.append("1 = 0")
|
||||
else:
|
||||
conditions.append(f"COALESCE({user_alias}.area, '') = :scope_area")
|
||||
params["scope_area"] = context["area"]
|
||||
return " AND ".join(conditions), params
|
||||
|
||||
def _build_login_filters(self, context: dict[str, Any], filters: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
||||
conditions = ["e.deleted_at IS NULL"]
|
||||
scope_cond, params = self._build_user_scope_condition(context, filters, user_alias="e")
|
||||
scope_cond = scope_cond.replace("e.area", "e.area_snapshot")
|
||||
conditions.append(scope_cond)
|
||||
if filters.get("userId") is not None:
|
||||
conditions.append("e.user_id = :user_id")
|
||||
params["user_id"] = int(filters["userId"])
|
||||
if filters.get("departmentName"):
|
||||
conditions.append("COALESCE(e.department_name_snapshot, '') = :department_name")
|
||||
params["department_name"] = str(filters["departmentName"]).strip()
|
||||
if filters.get("dateFrom"):
|
||||
conditions.append("e.login_time >= :date_from")
|
||||
params["date_from"] = self._normalize_date(filters["dateFrom"])
|
||||
if filters.get("dateTo"):
|
||||
conditions.append("e.login_time < (CAST(:date_to AS date) + INTERVAL '1 day')")
|
||||
params["date_to"] = self._normalize_date(filters["dateTo"])
|
||||
return " AND ".join(conditions), params
|
||||
|
||||
def _build_document_filters(self, context: dict[str, Any], filters: dict[str, Any], *, alias_prefix: str, file_alias: str, user_alias: str) -> tuple[str, dict[str, Any]]:
|
||||
conditions = [f"{file_alias}.deleted_at IS NULL", f"{alias_prefix}.deleted_at IS NULL"]
|
||||
params: dict[str, Any] = {}
|
||||
area_scope = str(filters.get("areaScope") or "user").strip().lower()
|
||||
requested_area = str(filters.get("area") or "").strip()
|
||||
if area_scope == "document":
|
||||
if context["is_global"]:
|
||||
if requested_area:
|
||||
conditions.append(f"COALESCE({alias_prefix}.region, '') = :requested_area")
|
||||
params["requested_area"] = requested_area
|
||||
else:
|
||||
if not context["area"]:
|
||||
conditions.append("1 = 0")
|
||||
elif requested_area and requested_area != context["area"]:
|
||||
conditions.append("1 = 0")
|
||||
else:
|
||||
conditions.append(f"COALESCE({alias_prefix}.region, '') = :scope_area")
|
||||
params["scope_area"] = context["area"]
|
||||
else:
|
||||
user_scope_cond, user_scope_params = self._build_user_scope_condition(context, filters, user_alias=user_alias)
|
||||
conditions.append(user_scope_cond)
|
||||
params.update(user_scope_params)
|
||||
|
||||
if filters.get("userId") is not None:
|
||||
conditions.append(f"{file_alias}.created_by = :user_id")
|
||||
params["user_id"] = int(filters["userId"])
|
||||
if filters.get("departmentName"):
|
||||
conditions.append(f"COALESCE({user_alias}.dep_name, '') = :department_name")
|
||||
params["department_name"] = str(filters["departmentName"]).strip()
|
||||
if filters.get("entryModuleId") is not None:
|
||||
conditions.append("em.id = :entry_module_id")
|
||||
params["entry_module_id"] = int(filters["entryModuleId"])
|
||||
if filters.get("documentTypeId") is not None:
|
||||
conditions.append("dt.id = :document_type_id")
|
||||
params["document_type_id"] = int(filters["documentTypeId"])
|
||||
if filters.get("dateFrom"):
|
||||
conditions.append(f"{file_alias}.created_at >= :date_from")
|
||||
params["date_from"] = self._normalize_date(filters["dateFrom"])
|
||||
if filters.get("dateTo"):
|
||||
conditions.append(f"{file_alias}.created_at < (CAST(:date_to AS date) + INTERVAL '1 day')")
|
||||
params["date_to"] = self._normalize_date(filters["dateTo"])
|
||||
return " AND ".join(conditions), params
|
||||
|
||||
def _build_audit_filters(self, context: dict[str, Any], filters: dict[str, Any]) -> tuple[str, dict[str, Any]]:
|
||||
conditions = ["d.deleted_at IS NULL"]
|
||||
params: dict[str, Any] = {}
|
||||
area_scope = str(filters.get("areaScope") or "user").strip().lower()
|
||||
requested_area = str(filters.get("area") or "").strip()
|
||||
if area_scope == "document":
|
||||
if context["is_global"]:
|
||||
if requested_area:
|
||||
conditions.append("COALESCE(d.region, '') = :requested_area")
|
||||
params["requested_area"] = requested_area
|
||||
else:
|
||||
if not context["area"]:
|
||||
conditions.append("1 = 0")
|
||||
elif requested_area and requested_area != context["area"]:
|
||||
conditions.append("1 = 0")
|
||||
else:
|
||||
conditions.append("COALESCE(d.region, '') = :scope_area")
|
||||
params["scope_area"] = context["area"]
|
||||
else:
|
||||
user_scope_cond, user_scope_params = self._build_user_scope_condition(context, filters, user_alias="u")
|
||||
conditions.append(user_scope_cond)
|
||||
params.update(user_scope_params)
|
||||
|
||||
if filters.get("userId") is not None:
|
||||
conditions.append("ar.trigger_user_id = :user_id")
|
||||
params["user_id"] = int(filters["userId"])
|
||||
if filters.get("departmentName"):
|
||||
conditions.append("COALESCE(u.dep_name, '') = :department_name")
|
||||
params["department_name"] = str(filters["departmentName"]).strip()
|
||||
if filters.get("entryModuleId") is not None:
|
||||
conditions.append("em.id = :entry_module_id")
|
||||
params["entry_module_id"] = int(filters["entryModuleId"])
|
||||
if filters.get("documentTypeId") is not None:
|
||||
conditions.append("dt.id = :document_type_id")
|
||||
params["document_type_id"] = int(filters["documentTypeId"])
|
||||
if filters.get("dateFrom"):
|
||||
conditions.append("COALESCE(ar.started_at, ar.created_at) >= :date_from")
|
||||
params["date_from"] = self._normalize_date(filters["dateFrom"])
|
||||
if filters.get("dateTo"):
|
||||
conditions.append("COALESCE(ar.started_at, ar.created_at) < (CAST(:date_to AS date) + INTERVAL '1 day')")
|
||||
params["date_to"] = self._normalize_date(filters["dateTo"])
|
||||
return " AND ".join(conditions), params
|
||||
|
||||
def _build_range_clause(self, column: str, filters: dict[str, Any], *, prefix: str) -> tuple[str, dict[str, Any]]:
|
||||
clauses: list[str] = []
|
||||
params: dict[str, Any] = {}
|
||||
if filters.get("dateFrom"):
|
||||
clauses.append(f" AND {column} >= :{prefix}_date_from")
|
||||
params[f"{prefix}_date_from"] = self._normalize_date(filters["dateFrom"])
|
||||
if filters.get("dateTo"):
|
||||
clauses.append(f" AND {column} < (CAST(:{prefix}_date_to AS date) + INTERVAL '1 day')")
|
||||
params[f"{prefix}_date_to"] = self._normalize_date(filters["dateTo"])
|
||||
return "".join(clauses), params
|
||||
|
||||
@staticmethod
|
||||
def _normalize_date(value: Any):
|
||||
from datetime import date as date_type
|
||||
if isinstance(value, date_type):
|
||||
return value
|
||||
return date_type.fromisoformat(str(value).strip())
|
||||
|
||||
@staticmethod
|
||||
def _detect_client_type(user_agent: str | None) -> str:
|
||||
ua = str(user_agent or "").lower()
|
||||
if any(token in ua for token in ["iphone", "android", "mobile", "ipad"]):
|
||||
return "mobile"
|
||||
if ua:
|
||||
return "pc"
|
||||
return "other"
|
||||
|
||||
@staticmethod
|
||||
def _to_int(value: Any) -> int | None:
|
||||
try:
|
||||
return int(value) if value is not None and str(value) != "" else None
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _format_datetime(value: Any) -> str | None:
|
||||
if not value:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
return value.strftime("%Y-%m-%d %H:%M:%S")
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
def _pagination(filters: dict[str, Any]) -> tuple[int, int, int]:
|
||||
page = max(int(filters.get("page") or 1), 1)
|
||||
page_size = max(1, min(int(filters.get("pageSize") or 20), 200))
|
||||
return page, page_size, (page - 1) * page_size
|
||||
@@ -0,0 +1,58 @@
|
||||
"""系统使用统计服务接口。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
|
||||
from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import (
|
||||
UsageStatsAreaPageVO,
|
||||
UsageStatsDepartmentPageVO,
|
||||
UsageStatsDetailPageVO,
|
||||
UsageStatsOverviewVO,
|
||||
UsageStatsTrendVO,
|
||||
UsageStatsUserPageVO,
|
||||
)
|
||||
|
||||
|
||||
class IUsageStatsService(ABC):
|
||||
"""系统使用统计服务接口。"""
|
||||
|
||||
@abstractmethod
|
||||
async def RecordLoginEvent(
|
||||
self,
|
||||
*,
|
||||
UserInfo: dict[str, Any] | None,
|
||||
Sub: str | None,
|
||||
LoginResult: str,
|
||||
LoginType: str,
|
||||
IpAddress: str | None,
|
||||
UserAgent: str | None,
|
||||
FailureReason: str | None = None,
|
||||
TokenJti: str | None = None,
|
||||
) -> None:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def GetOverview(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsOverviewVO:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def GetTrends(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsTrendVO:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def GetUsers(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsUserPageVO:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def GetDepartments(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDepartmentPageVO:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def GetAreas(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsAreaPageVO:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
async def GetDetails(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDetailPageVO:
|
||||
...
|
||||
Reference in New Issue
Block a user