Files
2026-05-25 09:50:01 +08:00

1220 lines
65 KiB
Python

"""系统使用统计服务实现。"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.domian.vo.usageStatsVo import (
UsageStatsAreaItemVO,
UsageStatsAreaPageVO,
UsageStatsDepartmentItemVO,
UsageStatsDepartmentPageVO,
UsageStatsDetailItemVO,
UsageStatsDetailPageVO,
UsageStatsOverviewVO,
UsageStatsTrendItemVO,
UsageStatsTrendVO,
UsageStatsUserItemVO,
UsageStatsUserPageVO,
)
from fastapi_modules.fastapi_leaudit.services.impl.ssoUserCompat import SsoUserCompat
from fastapi_modules.fastapi_leaudit.services.impl.tenantResolver import TenantResolver
from fastapi_modules.fastapi_leaudit.services.usageStatsService import IUsageStatsService
class UsageStatsServiceImpl(IUsageStatsService):
"""系统使用统计服务实现。"""
def __init__(self, TenantResolverService: TenantResolver | None = None) -> None:
self.TenantResolver = TenantResolverService or TenantResolver()
async def RecordLoginEvent(
self,
*,
UserInfo: dict[str, Any] | None,
Sub: str | None,
LoginResult: str,
LoginType: str,
IpAddress: str | None,
UserAgent: str | None,
FailureReason: str | None = None,
TokenJti: str | None = None,
) -> None:
async with GetAsyncSession() as session:
await self._ensure_usage_stats_schema(session)
user_id = self._to_int(UserInfo.get("user_id")) if UserInfo else None
username = str((UserInfo or {}).get("username") or "") or None
nick_name = str((UserInfo or {}).get("nick_name") or "") or None
dep_name = str((UserInfo or {}).get("dep_name") or "") or None
ou_id = str((UserInfo or {}).get("ou_id") or "") or None
ou_name = str((UserInfo or {}).get("ou_name") or "") or None
area = str((UserInfo or {}).get("area") or "") or None
tenant_code = str((UserInfo or {}).get("tenant_code") or "") or None
tenant_name = str((UserInfo or {}).get("tenant_name") or "") or None
client_type = self._detect_client_type(UserAgent)
resolved_tenant = await self.TenantResolver.ResolveUserContext(
Area=area,
TenantCode=tenant_code,
TenantName=tenant_name,
Source="usage_login_event",
)
normalized_failure_reason = self._normalize_failure_reason(FailureReason)
await session.execute(
text(
"""
INSERT INTO usage_login_events (
user_id, sub, username_snapshot, nick_name_snapshot,
department_name_snapshot, ou_id_snapshot, ou_name_snapshot,
tenant_code_snapshot, tenant_name_snapshot,
area_snapshot, login_time, login_result, login_type,
ip_address, user_agent, client_type, token_jti, failure_reason,
extra, created_at, updated_at, deleted_at
) VALUES (
:user_id, :sub, :username_snapshot, :nick_name_snapshot,
:department_name_snapshot, :ou_id_snapshot, :ou_name_snapshot,
:tenant_code_snapshot, :tenant_name_snapshot,
:area_snapshot, NOW(), :login_result, :login_type,
:ip_address, :user_agent, :client_type, :token_jti, :failure_reason,
CAST(:extra AS jsonb), NOW(), NOW(), NULL
)
"""
),
{
"user_id": user_id,
"sub": Sub,
"username_snapshot": username,
"nick_name_snapshot": nick_name,
"department_name_snapshot": dep_name,
"ou_id_snapshot": ou_id,
"ou_name_snapshot": ou_name,
"tenant_code_snapshot": resolved_tenant.tenant_code or tenant_code,
"tenant_name_snapshot": resolved_tenant.tenant_name or tenant_name or area,
"area_snapshot": resolved_tenant.tenant_name or area,
"login_result": LoginResult,
"login_type": LoginType,
"ip_address": IpAddress,
"user_agent": UserAgent,
"client_type": client_type,
"token_jti": TokenJti,
"failure_reason": normalized_failure_reason,
"extra": "{}",
},
)
if LoginResult == "success" and user_id is not None:
await session.execute(
text("UPDATE sso_users SET last_login_at = NOW(), updated_at = NOW() WHERE id = :user_id"),
{"user_id": user_id},
)
await session.commit()
async def GetOverview(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsOverviewVO:
context = await self._get_current_user_context(CurrentUserId)
self._assert_stats_access(context)
async with GetAsyncSession() as session:
await self._ensure_usage_stats_schema(session)
login_where, login_params = self._build_login_filters(context, Filters)
doc_where, doc_params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
audit_where, audit_params = self._build_audit_filters(context, Filters)
login_row = (
await session.execute(
text(
f"""
SELECT
COUNT(*)::int AS login_count,
COUNT(DISTINCT user_id)::int AS login_user_count,
COALESCE(MAX(login_time), NOW()) AS last_updated_at
FROM usage_login_events e
WHERE {login_where} AND e.login_result = 'success'
"""
),
login_params,
)
).mappings().first()
upload_row = (
await session.execute(
text(
f"""
SELECT
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count,
COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count
FROM leaudit_document_files f
JOIN leaudit_documents d ON d.id = f.document_id
LEFT JOIN sso_users u ON u.id = f.created_by
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {doc_where}
"""
),
doc_params,
)
).mappings().first()
audit_row = (
await session.execute(
text(
f"""
SELECT
COUNT(*)::int AS audit_run_count,
COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count,
COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count
FROM leaudit_audit_runs ar
JOIN leaudit_documents d ON d.id = ar.document_id
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {audit_where}
"""
),
audit_params,
)
).mappings().first()
active_users = (
await session.execute(
text(
f"""
WITH active_users AS (
SELECT DISTINCT e.user_id AS uid
FROM usage_login_events e
WHERE {login_where} AND e.login_result = 'success' AND e.user_id IS NOT NULL
UNION
SELECT DISTINCT f.created_by AS uid
FROM leaudit_document_files f
JOIN leaudit_documents d ON d.id = f.document_id
LEFT JOIN sso_users u ON u.id = f.created_by
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {doc_where} AND f.created_by IS NOT NULL
UNION
SELECT DISTINCT ar.trigger_user_id AS uid
FROM leaudit_audit_runs ar
JOIN leaudit_documents d ON d.id = ar.document_id
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {audit_where} AND ar.trigger_user_id IS NOT NULL
)
SELECT COUNT(*)::int AS active_user_count FROM active_users
"""
),
{**login_params, **doc_params, **audit_params},
)
).mappings().first()
return UsageStatsOverviewVO(
loginUserCount=int((login_row or {}).get("login_user_count") or 0),
loginCount=int((login_row or {}).get("login_count") or 0),
activeUserCount=int((active_users or {}).get("active_user_count") or 0),
uploadDocumentCount=int((upload_row or {}).get("upload_document_count") or 0),
uploadAttachmentCount=int((upload_row or {}).get("upload_attachment_count") or 0),
auditRunCount=int((audit_row or {}).get("audit_run_count") or 0),
auditCompletedCount=int((audit_row or {}).get("audit_completed_count") or 0),
auditFailedCount=int((audit_row or {}).get("audit_failed_count") or 0),
lastUpdatedAt=self._format_datetime((login_row or {}).get("last_updated_at")),
)
async def GetTrends(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsTrendVO:
context = await self._get_current_user_context(CurrentUserId)
self._assert_stats_access(context)
granularity = str(Filters.get("granularity") or "day").strip().lower()
if granularity not in {"day", "week", "month"}:
granularity = "day"
metric = str(Filters.get("metric") or "audit").strip().lower()
date_expr = {"day": "YYYY-MM-DD", "week": 'IYYY-"W"IW', "month": "YYYY-MM"}[granularity]
async with GetAsyncSession() as session:
await self._ensure_usage_stats_schema(session)
items: list[UsageStatsTrendItemVO] = []
if metric == "login":
where_clause, params = self._build_login_filters(context, Filters)
rows = (
await session.execute(
text(
f"""
SELECT to_char(date_trunc('{granularity}', e.login_time), '{date_expr}') AS label,
COUNT(*)::int AS value
FROM usage_login_events e
WHERE {where_clause} AND e.login_result = 'success'
GROUP BY 1
ORDER BY MIN(date_trunc('{granularity}', e.login_time)) ASC
"""
),
params,
)
).mappings().all()
items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows]
elif metric == "upload":
where_clause, params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
rows = (
await session.execute(
text(
f"""
SELECT to_char(date_trunc('{granularity}', f.created_at), '{date_expr}') AS label,
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS value
FROM leaudit_document_files f
JOIN leaudit_documents d ON d.id = f.document_id
LEFT JOIN sso_users u ON u.id = f.created_by
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {where_clause}
GROUP BY 1
ORDER BY MIN(date_trunc('{granularity}', f.created_at)) ASC
"""
),
params,
)
).mappings().all()
items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows]
else:
where_clause, params = self._build_audit_filters(context, Filters)
rows = (
await session.execute(
text(
f"""
SELECT to_char(date_trunc('{granularity}', COALESCE(ar.started_at, ar.created_at)), '{date_expr}') AS label,
COUNT(*)::int AS value
FROM leaudit_audit_runs ar
JOIN leaudit_documents d ON d.id = ar.document_id
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {where_clause}
GROUP BY 1
ORDER BY MIN(date_trunc('{granularity}', COALESCE(ar.started_at, ar.created_at))) ASC
"""
),
params,
)
).mappings().all()
items = [UsageStatsTrendItemVO(label=str(row["label"]), value=int(row["value"] or 0)) for row in rows]
return UsageStatsTrendVO(granularity=granularity, metric=metric, items=items)
async def GetUsers(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsUserPageVO:
context = await self._get_current_user_context(CurrentUserId)
self._assert_stats_access(context)
page, page_size, offset = self._pagination(Filters)
async with GetAsyncSession() as session:
await self._ensure_usage_stats_schema(session)
sso_user_columns = await SsoUserCompat.get_columns(session)
tenant_code_select = SsoUserCompat.optional_coalesce_as(
sso_user_columns,
alias="u",
column="tenant_code",
fallback_sql="''",
)
tenant_name_select = SsoUserCompat.optional_coalesce_as(
sso_user_columns,
alias="u",
column="tenant_name",
fallback_sql="''",
)
area_condition, params = self._build_user_scope_condition(context, Filters, user_alias="u")
if Filters.get("keyword"):
params["keyword"] = f"%{str(Filters['keyword']).strip()}%"
area_condition += " AND (u.username ILIKE :keyword OR u.nick_name ILIKE :keyword)"
if Filters.get("departmentName"):
params["department_name"] = str(Filters["departmentName"]).strip()
area_condition += " AND COALESCE(u.dep_name, '') = :department_name"
if Filters.get("userId") is not None:
params["requested_user_id"] = int(Filters["userId"])
area_condition += " AND u.id = :requested_user_id"
login_where, login_params = self._build_login_filters(context, Filters)
upload_where, upload_params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
audit_where, audit_params = self._build_audit_filters(context, Filters)
query = text(
f"""
WITH base_users AS (
SELECT u.id, u.username, COALESCE(u.nick_name, '') AS nick_name,
COALESCE(u.dep_name, '') AS department_name,
COALESCE(u.area, '') AS area,
{tenant_code_select},
{tenant_name_select},
u.last_login_at
FROM sso_users u
WHERE u.deleted_at IS NULL AND u.status = 0 AND {area_condition}
),
login_stats AS (
SELECT e.user_id,
COUNT(*)::int AS login_count,
MAX(e.login_time) AS last_login_time
FROM usage_login_events e
WHERE {login_where} AND e.login_result = 'success' AND e.user_id IS NOT NULL
GROUP BY e.user_id
),
upload_stats AS (
SELECT f.created_by AS user_id,
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count,
COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count
FROM leaudit_document_files f
JOIN leaudit_documents d ON d.id = f.document_id
LEFT JOIN sso_users u ON u.id = f.created_by
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {upload_where} AND f.created_by IS NOT NULL
GROUP BY f.created_by
),
audit_stats AS (
SELECT ar.trigger_user_id AS user_id,
COUNT(*)::int AS audit_run_count,
COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count,
COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count
FROM leaudit_audit_runs ar
JOIN leaudit_documents d ON d.id = ar.document_id
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {audit_where} AND ar.trigger_user_id IS NOT NULL
GROUP BY ar.trigger_user_id
)
SELECT
b.id AS user_id,
b.username,
b.nick_name,
b.department_name,
b.area,
b.tenant_code,
b.tenant_name,
COALESCE(ls.login_count, 0) AS login_count,
COALESCE(us.upload_document_count, 0) AS upload_document_count,
COALESCE(us.upload_attachment_count, 0) AS upload_attachment_count,
COALESCE(aus.audit_run_count, 0) AS audit_run_count,
COALESCE(aus.audit_completed_count, 0) AS audit_completed_count,
COALESCE(aus.audit_failed_count, 0) AS audit_failed_count,
COALESCE(ls.last_login_time, b.last_login_at) AS last_login_time
FROM base_users b
LEFT JOIN login_stats ls ON ls.user_id = b.id
LEFT JOIN upload_stats us ON us.user_id = b.id
LEFT JOIN audit_stats aus ON aus.user_id = b.id
ORDER BY b.id DESC
LIMIT :limit OFFSET :offset
"""
)
count_query = text(
f"SELECT COUNT(*)::int AS total FROM sso_users u WHERE u.deleted_at IS NULL AND u.status = 0 AND {area_condition}"
)
merged_params = {
**params,
**login_params,
**upload_params,
**audit_params,
"limit": page_size,
"offset": offset,
}
total = int((await session.execute(count_query, params)).scalar_one() or 0)
rows = (await session.execute(query, merged_params)).mappings().all()
items = [
UsageStatsUserItemVO(
userId=int(row["user_id"]),
username=str(row["username"] or ""),
nickName=str(row["nick_name"] or ""),
departmentName=row["department_name"] or None,
area=row["area"] or None,
tenantCode=row.get("tenant_code") or None,
tenantName=row.get("tenant_name") or row.get("area") or None,
loginCount=int(row["login_count"] or 0),
uploadDocumentCount=int(row["upload_document_count"] or 0),
uploadAttachmentCount=int(row["upload_attachment_count"] or 0),
auditRunCount=int(row["audit_run_count"] or 0),
auditCompletedCount=int(row["audit_completed_count"] or 0),
auditFailedCount=int(row["audit_failed_count"] or 0),
lastLoginTime=self._format_datetime(row.get("last_login_time")),
)
for row in rows
]
return UsageStatsUserPageVO(total=total, page=page, pageSize=page_size, items=items)
async def GetDepartments(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDepartmentPageVO:
context = await self._get_current_user_context(CurrentUserId)
self._assert_stats_access(context)
page, page_size, offset = self._pagination(Filters)
by_users = await self.GetUsers(CurrentUserId, {**Filters, "page": 1, "pageSize": 5000})
grouped: dict[str, UsageStatsDepartmentItemVO] = {}
user_ids_by_dept: dict[str, set[int]] = {}
login_user_ids_by_dept: dict[str, set[int]] = {}
for item in by_users.items:
dept = item.departmentName or "未分配部门"
if dept not in grouped:
grouped[dept] = UsageStatsDepartmentItemVO(departmentName=dept)
user_ids_by_dept[dept] = set()
login_user_ids_by_dept[dept] = set()
row = grouped[dept]
user_ids_by_dept[dept].add(item.userId)
if item.loginCount > 0:
login_user_ids_by_dept[dept].add(item.userId)
row.loginCount += item.loginCount
row.uploadDocumentCount += item.uploadDocumentCount
row.uploadAttachmentCount += item.uploadAttachmentCount
row.auditRunCount += item.auditRunCount
row.auditCompletedCount += item.auditCompletedCount
row.auditFailedCount += item.auditFailedCount
items = []
for dept, row in grouped.items():
row.userCount = len(user_ids_by_dept[dept])
row.loginUserCount = len(login_user_ids_by_dept[dept])
items.append(row)
items.sort(key=lambda item: (item.auditRunCount, item.uploadDocumentCount, item.loginCount), reverse=True)
total = len(items)
sliced = items[offset: offset + page_size]
return UsageStatsDepartmentPageVO(total=total, page=page, pageSize=page_size, items=sliced)
async def GetAreas(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsAreaPageVO:
context = await self._get_current_user_context(CurrentUserId)
self._assert_stats_access(context)
page, page_size, offset = self._pagination(Filters)
area_scope = str(Filters.get("areaScope") or "user").strip().lower()
if area_scope not in {"user", "document"}:
area_scope = "user"
async with GetAsyncSession() as session:
await self._ensure_usage_stats_schema(session)
sso_user_columns = await SsoUserCompat.get_columns(session)
user_tenant_code_expr = SsoUserCompat.raw_optional_column(
sso_user_columns,
alias="u",
column="tenant_code",
)
user_tenant_name_expr = SsoUserCompat.raw_optional_column(
sso_user_columns,
alias="u",
column="tenant_name",
)
login_where, login_params = self._build_login_filters(context, Filters)
if area_scope == "document":
upload_area_expr = "COALESCE(d.region, '')"
audit_area_expr = "COALESCE(d.region, '')"
upload_tenant_code_expr = "COALESCE(NULLIF(BTRIM(d.tenant_code), ''), NULL)"
upload_tenant_name_expr = self._document_tenant_name_sql("d")
audit_tenant_code_expr = "COALESCE(NULLIF(BTRIM(d.tenant_code), ''), NULL)"
audit_tenant_name_expr = self._document_tenant_name_sql("d")
else:
upload_area_expr = "COALESCE(u.area, '')"
audit_area_expr = "COALESCE(u.area, '')"
upload_tenant_code_expr = f"COALESCE(NULLIF(BTRIM({user_tenant_code_expr}), ''), NULL)"
upload_tenant_name_expr = (
f"COALESCE(NULLIF(BTRIM({user_tenant_name_expr}), ''), NULLIF(BTRIM({upload_area_expr}), ''), '未分配地区')"
)
audit_tenant_code_expr = f"COALESCE(NULLIF(BTRIM({user_tenant_code_expr}), ''), NULL)"
audit_tenant_name_expr = (
f"COALESCE(NULLIF(BTRIM({user_tenant_name_expr}), ''), NULLIF(BTRIM({audit_area_expr}), ''), '未分配地区')"
)
doc_where, doc_params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
audit_where, audit_params = self._build_audit_filters(context, Filters)
login_rows = (
await session.execute(
text(
f"""
SELECT COALESCE(e.area_snapshot, '未分配地区') AS area,
COALESCE(NULLIF(BTRIM(e.tenant_code_snapshot), ''), NULL) AS tenant_code,
COALESCE(NULLIF(BTRIM(e.tenant_name_snapshot), ''), COALESCE(e.area_snapshot, '未分配地区')) AS tenant_name,
COUNT(*)::int AS login_count,
COUNT(DISTINCT e.user_id)::int AS login_user_count
FROM usage_login_events e
WHERE {login_where} AND e.login_result = 'success'
GROUP BY 1, 2, 3
"""
),
login_params,
)
).mappings().all()
upload_rows = (
await session.execute(
text(
f"""
SELECT {upload_area_expr} AS area,
{upload_tenant_code_expr} AS tenant_code,
{upload_tenant_name_expr} AS tenant_name,
COUNT(*) FILTER (WHERE f.file_role = 'primary')::int AS upload_document_count,
COUNT(*) FILTER (WHERE f.file_role = 'attachment')::int AS upload_attachment_count
FROM leaudit_document_files f
JOIN leaudit_documents d ON d.id = f.document_id
LEFT JOIN sso_users u ON u.id = f.created_by
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {doc_where}
GROUP BY 1, 2, 3
"""
),
doc_params,
)
).mappings().all()
audit_rows = (
await session.execute(
text(
f"""
SELECT {audit_area_expr} AS area,
{audit_tenant_code_expr} AS tenant_code,
{audit_tenant_name_expr} AS tenant_name,
COUNT(*)::int AS audit_run_count,
COUNT(*) FILTER (WHERE ar.status = 'completed')::int AS audit_completed_count,
COUNT(*) FILTER (WHERE ar.status = 'failed')::int AS audit_failed_count
FROM leaudit_audit_runs ar
JOIN leaudit_documents d ON d.id = ar.document_id
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {audit_where}
GROUP BY 1, 2, 3
"""
),
audit_params,
)
).mappings().all()
grouped: dict[tuple[str | None, str], UsageStatsAreaItemVO] = {}
for row in login_rows:
area = str(row["area"] or "未分配地区")
tenant_code = row.get("tenant_code") or None
group_key = (tenant_code, area)
grouped.setdefault(
group_key,
UsageStatsAreaItemVO(area=area, tenantCode=tenant_code, tenantName=row.get("tenant_name") or area),
)
grouped[group_key].loginCount += int(row["login_count"] or 0)
grouped[group_key].loginUserCount += int(row["login_user_count"] or 0)
for row in upload_rows:
area = str(row["area"] or "未分配地区")
tenant_code = row.get("tenant_code") or None
group_key = (tenant_code, area)
grouped.setdefault(
group_key,
UsageStatsAreaItemVO(area=area, tenantCode=tenant_code, tenantName=row.get("tenant_name") or area),
)
grouped[group_key].uploadDocumentCount += int(row["upload_document_count"] or 0)
grouped[group_key].uploadAttachmentCount += int(row["upload_attachment_count"] or 0)
for row in audit_rows:
area = str(row["area"] or "未分配地区")
tenant_code = row.get("tenant_code") or None
group_key = (tenant_code, area)
grouped.setdefault(
group_key,
UsageStatsAreaItemVO(area=area, tenantCode=tenant_code, tenantName=row.get("tenant_name") or area),
)
grouped[group_key].auditRunCount += int(row["audit_run_count"] or 0)
grouped[group_key].auditCompletedCount += int(row["audit_completed_count"] or 0)
grouped[group_key].auditFailedCount += int(row["audit_failed_count"] or 0)
items = list(grouped.values())
items.sort(key=lambda item: (item.auditRunCount, item.uploadDocumentCount, item.loginCount), reverse=True)
total = len(items)
return UsageStatsAreaPageVO(total=total, page=page, pageSize=page_size, areaScope=area_scope, items=items[offset: offset + page_size])
async def GetDetails(self, CurrentUserId: int, Filters: dict[str, Any]) -> UsageStatsDetailPageVO:
context = await self._get_current_user_context(CurrentUserId)
self._assert_stats_access(context)
page, page_size, offset = self._pagination(Filters)
data_type = str(Filters.get("dataType") or "audit").strip().lower()
if data_type not in {"login", "upload", "audit"}:
data_type = "audit"
area_scope = str(Filters.get("areaScope") or "user").strip().lower()
if area_scope not in {"user", "document"}:
area_scope = "user"
async with GetAsyncSession() as session:
await self._ensure_usage_stats_schema(session)
sso_user_columns = await SsoUserCompat.get_columns(session)
user_tenant_code_expr = SsoUserCompat.raw_optional_column(
sso_user_columns,
alias="u",
column="tenant_code",
)
user_tenant_name_expr = SsoUserCompat.raw_optional_column(
sso_user_columns,
alias="u",
column="tenant_name",
)
if area_scope == "document":
detail_area_expr = "COALESCE(d.region, '')"
detail_tenant_code_select = "COALESCE(NULLIF(BTRIM(d.tenant_code), ''), NULL) AS tenant_code"
detail_tenant_name_select = f"{self._document_tenant_name_sql('d')} AS tenant_name"
else:
detail_area_expr = "COALESCE(u.area, '')"
detail_tenant_code_select = f"COALESCE(NULLIF(BTRIM({user_tenant_code_expr}), ''), NULL) AS tenant_code"
detail_tenant_name_select = (
f"COALESCE(NULLIF(BTRIM({user_tenant_name_expr}), ''), NULLIF(BTRIM(COALESCE(u.area, '')), ''), NULL) AS tenant_name"
)
if data_type == "login":
where_clause, params = self._build_login_filters(context, Filters)
count_sql = text(f"SELECT COUNT(*)::int FROM usage_login_events e WHERE {where_clause}")
list_sql = text(
f"""
SELECT e.login_time, e.user_id, e.username_snapshot, e.nick_name_snapshot,
e.department_name_snapshot, e.area_snapshot, e.tenant_code_snapshot, e.tenant_name_snapshot, e.login_result,
e.failure_reason, e.login_type
FROM usage_login_events e
WHERE {where_clause}
ORDER BY e.login_time DESC
LIMIT :limit OFFSET :offset
"""
)
total = int((await session.execute(count_sql, params)).scalar_one() or 0)
rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all()
items = [
UsageStatsDetailItemVO(
time=self._format_datetime(row["login_time"]),
dataType="login",
userId=self._to_int(row.get("user_id")),
username=str(row.get("username_snapshot") or ""),
nickName=str(row.get("nick_name_snapshot") or ""),
departmentName=row.get("department_name_snapshot") or None,
area=row.get("area_snapshot") or None,
tenantCode=row.get("tenant_code_snapshot") or None,
tenantName=row.get("tenant_name_snapshot") or row.get("area_snapshot") or None,
status=str(row.get("login_result") or ""),
extra={"failureReason": row.get("failure_reason"), "loginType": row.get("login_type")},
)
for row in rows
]
elif data_type == "upload":
where_clause, params = self._build_document_filters(context, Filters, alias_prefix="d", file_alias="f", user_alias="u")
count_sql = text(
f"""
SELECT COUNT(*)::int
FROM leaudit_document_files f
JOIN leaudit_documents d ON d.id = f.document_id
LEFT JOIN sso_users u ON u.id = f.created_by
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {where_clause}
"""
)
list_sql = text(
f"""
SELECT f.created_at, f.created_by, COALESCE(u.username, '') AS username,
COALESCE(u.nick_name, '') AS nick_name, COALESCE(u.dep_name, '') AS department_name,
{detail_area_expr} AS area, {detail_tenant_code_select},
{detail_tenant_name_select}, d.id AS document_id, f.file_name,
dt.id AS document_type_id, dt.name AS document_type_name,
em.id AS entry_module_id, em.name AS entry_module_name,
f.file_role
FROM leaudit_document_files f
JOIN leaudit_documents d ON d.id = f.document_id
LEFT JOIN sso_users u ON u.id = f.created_by
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
WHERE {where_clause}
ORDER BY f.created_at DESC
LIMIT :limit OFFSET :offset
"""
)
total = int((await session.execute(count_sql, params)).scalar_one() or 0)
rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all()
items = [
UsageStatsDetailItemVO(
time=self._format_datetime(row["created_at"]),
dataType="upload",
userId=self._to_int(row.get("created_by")),
username=str(row.get("username") or ""),
nickName=str(row.get("nick_name") or ""),
departmentName=row.get("department_name") or None,
area=row.get("area") or None,
tenantCode=row.get("tenant_code") or None,
tenantName=row.get("tenant_name") or row.get("area") or None,
documentId=self._to_int(row.get("document_id")),
documentName=row.get("file_name"),
documentTypeId=self._to_int(row.get("document_type_id")),
documentTypeName=row.get("document_type_name"),
entryModuleId=self._to_int(row.get("entry_module_id")),
entryModuleName=row.get("entry_module_name"),
status=row.get("file_role"),
)
for row in rows
]
else:
where_clause, params = self._build_audit_filters(context, Filters)
count_sql = text(
f"""
SELECT COUNT(*)::int
FROM leaudit_audit_runs ar
JOIN leaudit_documents d ON d.id = ar.document_id
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.file_role = 'primary' AND f.is_active = true
WHERE {where_clause}
"""
)
list_sql = text(
f"""
SELECT COALESCE(ar.started_at, ar.created_at) AS event_time,
ar.trigger_user_id, COALESCE(u.username, '') AS username,
COALESCE(u.nick_name, '') AS nick_name, COALESCE(u.dep_name, '') AS department_name,
{detail_area_expr} AS area, {detail_tenant_code_select},
{detail_tenant_name_select}, d.id AS document_id,
COALESCE(f.file_name, d.normalized_name, '') AS document_name,
dt.id AS document_type_id, dt.name AS document_type_name,
em.id AS entry_module_id, em.name AS entry_module_name,
ar.status, ar.id AS run_id, ar.result_status
FROM leaudit_audit_runs ar
JOIN leaudit_documents d ON d.id = ar.document_id
LEFT JOIN sso_users u ON u.id = ar.trigger_user_id
LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id
LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id
LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.file_role = 'primary' AND f.is_active = true
WHERE {where_clause}
ORDER BY COALESCE(ar.started_at, ar.created_at) DESC
LIMIT :limit OFFSET :offset
"""
)
total = int((await session.execute(count_sql, params)).scalar_one() or 0)
rows = (await session.execute(list_sql, {**params, "limit": page_size, "offset": offset})).mappings().all()
items = [
UsageStatsDetailItemVO(
time=self._format_datetime(row["event_time"]),
dataType="audit",
userId=self._to_int(row.get("trigger_user_id")),
username=str(row.get("username") or ""),
nickName=str(row.get("nick_name") or ""),
departmentName=row.get("department_name") or None,
area=row.get("area") or None,
tenantCode=row.get("tenant_code") or None,
tenantName=row.get("tenant_name") or row.get("area") or None,
documentId=self._to_int(row.get("document_id")),
documentName=row.get("document_name"),
documentTypeId=self._to_int(row.get("document_type_id")),
documentTypeName=row.get("document_type_name"),
entryModuleId=self._to_int(row.get("entry_module_id")),
entryModuleName=row.get("entry_module_name"),
status=row.get("status"),
extra={"runId": self._to_int(row.get("run_id")), "resultStatus": row.get("result_status")},
)
for row in rows
]
return UsageStatsDetailPageVO(total=total, page=page, pageSize=page_size, items=items)
async def _ensure_usage_stats_schema(self, session) -> None:
await session.execute(text("ALTER TABLE sso_users ADD COLUMN IF NOT EXISTS last_login_at TIMESTAMPTZ NULL"))
await session.execute(
text(
"""
CREATE TABLE IF NOT EXISTS usage_login_events (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NULL,
sub VARCHAR(128) NULL,
username_snapshot VARCHAR(128) NULL,
nick_name_snapshot VARCHAR(128) NULL,
department_name_snapshot VARCHAR(255) NULL,
ou_id_snapshot VARCHAR(128) NULL,
ou_name_snapshot VARCHAR(255) NULL,
tenant_code_snapshot VARCHAR(64) NULL,
tenant_name_snapshot VARCHAR(128) NULL,
area_snapshot VARCHAR(64) NULL,
login_time TIMESTAMPTZ NOT NULL DEFAULT NOW(),
login_result VARCHAR(16) NOT NULL,
login_type VARCHAR(32) NOT NULL,
ip_address VARCHAR(64) NULL,
user_agent VARCHAR(1024) NULL,
client_type VARCHAR(32) NULL,
token_jti VARCHAR(128) NULL,
failure_reason VARCHAR(255) NULL,
extra JSONB NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ NULL
)
"""
)
)
await session.execute(text("ALTER TABLE usage_login_events ADD COLUMN IF NOT EXISTS tenant_code_snapshot VARCHAR(64) NULL"))
await session.execute(text("ALTER TABLE usage_login_events ADD COLUMN IF NOT EXISTS tenant_name_snapshot VARCHAR(128) NULL"))
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_login_time ON usage_login_events(login_time DESC)"))
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_user_id ON usage_login_events(user_id, login_time DESC)"))
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_department ON usage_login_events(department_name_snapshot, login_time DESC)"))
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_area ON usage_login_events(area_snapshot, login_time DESC)"))
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_usage_login_events_tenant_code ON usage_login_events(tenant_code_snapshot, login_time DESC)"))
async def _get_current_user_context(self, current_user_id: int) -> dict[str, Any]:
async with GetAsyncSession() as session:
sso_user_columns = await SsoUserCompat.get_columns(session)
tenant_code_select = SsoUserCompat.optional_coalesce_as(
sso_user_columns,
alias="u",
column="tenant_code",
fallback_sql="''",
)
tenant_name_select = SsoUserCompat.optional_coalesce_as(
sso_user_columns,
alias="u",
column="tenant_name",
fallback_sql="''",
)
row = (
await session.execute(
text(
f"""
SELECT
u.id,
COALESCE(u.area, '') AS area,
{tenant_code_select},
{tenant_name_select},
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global,
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin', 'admin')), FALSE) AS can_manage,
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS is_super_admin
FROM sso_users u
LEFT JOIN user_role ur ON ur.user_id = u.id
LEFT JOIN roles r ON r.id = ur.role_id
WHERE u.id = :user_id
GROUP BY u.id, u.area
"""
),
{"user_id": current_user_id},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在")
tenant = await self.TenantResolver.ResolveUserContext(
Area=str(row["area"] or ""),
TenantCode=str(row["tenant_code"] or "") or None,
TenantName=str(row["tenant_name"] or "") or None,
Source="usage_stats_user_context",
)
return {
"area": tenant.tenant_name or tenant.normalized_value or str(row["area"] or ""),
"tenant_code": tenant.tenant_code or str(row["tenant_code"] or "") or None,
"tenant_name": tenant.tenant_name or str(row["tenant_name"] or "") or str(row["area"] or "") or None,
"tenant_scope_value": tenant.tenant_name or tenant.normalized_value or str(row["area"] or ""),
"is_global": bool(row["is_global"]),
"can_manage": bool(row["can_manage"]),
"is_super_admin": bool(row["is_super_admin"]),
}
def _assert_stats_access(self, context: dict[str, Any]) -> None:
return
def _build_user_scope_condition(self, context: dict[str, Any], filters: dict[str, Any], *, user_alias: str) -> tuple[str, dict[str, Any]]:
conditions = ["1 = 1"]
params: dict[str, Any] = {}
requested_area = str(filters.get("area") or "").strip()
requested_tenant_code = str(filters.get("tenantCode") or filters.get("tenant_code") or "").strip()
requested_tenant_name = self._resolve_requested_tenant_name(requested_tenant_code, requested_area, context)
if context["is_global"]:
if requested_tenant_code:
conditions.extend(self._user_tenant_filter_sql(user_alias, params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name:
conditions.append(f"COALESCE({user_alias}.area, '') = :requested_area")
params["requested_area"] = requested_tenant_name
else:
scope_value = str(context.get("tenant_scope_value") or context.get("area") or "").strip()
scope_tenant_code = str(context.get("tenant_code") or "").strip()
if not scope_value and not scope_tenant_code:
conditions.append("1 = 0")
elif requested_tenant_code:
if scope_tenant_code and requested_tenant_code != scope_tenant_code:
conditions.append("1 = 0")
elif not scope_tenant_code and requested_tenant_name and requested_tenant_name != scope_value:
conditions.append("1 = 0")
else:
conditions.extend(self._user_tenant_filter_sql(user_alias, params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name and requested_tenant_name != scope_value:
conditions.append("1 = 0")
else:
conditions.extend(self._user_tenant_filter_sql(user_alias, params, scope_tenant_code or None, scope_value or None, "scope"))
return " AND ".join(conditions), params
def _build_login_filters(self, context: dict[str, Any], filters: dict[str, Any]) -> tuple[str, dict[str, Any]]:
conditions = ["e.deleted_at IS NULL"]
params: dict[str, Any] = {}
requested_area = str(filters.get("area") or "").strip()
requested_tenant_code = str(filters.get("tenantCode") or filters.get("tenant_code") or "").strip()
requested_tenant_name = self._resolve_requested_tenant_name(requested_tenant_code, requested_area, context)
scope_tenant_code = str(context.get("tenant_code") or "").strip()
scope_tenant_name = str(context.get("tenant_scope_value") or context.get("area") or "").strip()
if context["is_global"]:
if requested_tenant_code:
conditions.extend(self._login_tenant_filter_sql(params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name:
conditions.append("COALESCE(e.area_snapshot, '') = :requested_area")
params["requested_area"] = requested_tenant_name
else:
if not scope_tenant_code and not scope_tenant_name:
conditions.append("1 = 0")
elif requested_tenant_code:
if scope_tenant_code and requested_tenant_code != scope_tenant_code:
conditions.append("1 = 0")
elif not scope_tenant_code and requested_tenant_name and requested_tenant_name != scope_tenant_name:
conditions.append("1 = 0")
else:
conditions.extend(self._login_tenant_filter_sql(params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name and requested_tenant_name != scope_tenant_name:
conditions.append("1 = 0")
else:
conditions.extend(self._login_tenant_filter_sql(params, scope_tenant_code or None, scope_tenant_name or None, "scope"))
if filters.get("userId") is not None:
conditions.append("e.user_id = :user_id")
params["user_id"] = int(filters["userId"])
if filters.get("departmentName"):
conditions.append("COALESCE(e.department_name_snapshot, '') = :department_name")
params["department_name"] = str(filters["departmentName"]).strip()
if filters.get("dateFrom"):
conditions.append("e.login_time >= :date_from")
params["date_from"] = self._normalize_date(filters["dateFrom"])
if filters.get("dateTo"):
conditions.append("e.login_time < (CAST(:date_to AS date) + INTERVAL '1 day')")
params["date_to"] = self._normalize_date(filters["dateTo"])
return " AND ".join(conditions), params
def _build_document_filters(self, context: dict[str, Any], filters: dict[str, Any], *, alias_prefix: str, file_alias: str, user_alias: str) -> tuple[str, dict[str, Any]]:
conditions = [f"{file_alias}.deleted_at IS NULL", f"{alias_prefix}.deleted_at IS NULL"]
params: dict[str, Any] = {}
area_scope = str(filters.get("areaScope") or "user").strip().lower()
requested_area = str(filters.get("area") or "").strip()
requested_tenant_code = str(filters.get("tenantCode") or filters.get("tenant_code") or "").strip()
requested_tenant_name = self._resolve_requested_tenant_name(requested_tenant_code, requested_area, context)
if area_scope == "document":
if context["is_global"]:
if requested_tenant_code:
conditions.extend(self._document_tenant_filter_sql(alias_prefix, params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name:
conditions.append(f"COALESCE({alias_prefix}.region, '') = :requested_area")
params["requested_area"] = requested_tenant_name
else:
scope_value = str(context.get("tenant_scope_value") or context.get("area") or "").strip()
scope_tenant_code = str(context.get("tenant_code") or "").strip()
if not scope_value and not scope_tenant_code:
conditions.append("1 = 0")
elif requested_tenant_code:
if scope_tenant_code and requested_tenant_code != scope_tenant_code:
conditions.append("1 = 0")
elif not scope_tenant_code and requested_tenant_name and requested_tenant_name != scope_value:
conditions.append("1 = 0")
else:
conditions.extend(self._document_tenant_filter_sql(alias_prefix, params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name and requested_tenant_name != scope_value:
conditions.append("1 = 0")
else:
conditions.extend(self._document_tenant_filter_sql(alias_prefix, params, scope_tenant_code or None, scope_value or None, "scope"))
else:
user_scope_cond, user_scope_params = self._build_user_scope_condition(context, filters, user_alias=user_alias)
conditions.append(user_scope_cond)
params.update(user_scope_params)
if filters.get("userId") is not None:
conditions.append(f"{file_alias}.created_by = :user_id")
params["user_id"] = int(filters["userId"])
if filters.get("departmentName"):
conditions.append(f"COALESCE({user_alias}.dep_name, '') = :department_name")
params["department_name"] = str(filters["departmentName"]).strip()
if filters.get("entryModuleId") is not None:
conditions.append("em.id = :entry_module_id")
params["entry_module_id"] = int(filters["entryModuleId"])
if filters.get("documentTypeId") is not None:
conditions.append("dt.id = :document_type_id")
params["document_type_id"] = int(filters["documentTypeId"])
if filters.get("dateFrom"):
conditions.append(f"{file_alias}.created_at >= :date_from")
params["date_from"] = self._normalize_date(filters["dateFrom"])
if filters.get("dateTo"):
conditions.append(f"{file_alias}.created_at < (CAST(:date_to AS date) + INTERVAL '1 day')")
params["date_to"] = self._normalize_date(filters["dateTo"])
return " AND ".join(conditions), params
def _build_audit_filters(self, context: dict[str, Any], filters: dict[str, Any]) -> tuple[str, dict[str, Any]]:
conditions = ["d.deleted_at IS NULL"]
params: dict[str, Any] = {}
area_scope = str(filters.get("areaScope") or "user").strip().lower()
requested_area = str(filters.get("area") or "").strip()
requested_tenant_code = str(filters.get("tenantCode") or filters.get("tenant_code") or "").strip()
requested_tenant_name = self._resolve_requested_tenant_name(requested_tenant_code, requested_area, context)
if area_scope == "document":
if context["is_global"]:
if requested_tenant_code:
conditions.extend(self._document_tenant_filter_sql("d", params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name:
conditions.append("COALESCE(d.region, '') = :requested_area")
params["requested_area"] = requested_tenant_name
else:
scope_value = str(context.get("tenant_scope_value") or context.get("area") or "").strip()
scope_tenant_code = str(context.get("tenant_code") or "").strip()
if not scope_value and not scope_tenant_code:
conditions.append("1 = 0")
elif requested_tenant_code:
if scope_tenant_code and requested_tenant_code != scope_tenant_code:
conditions.append("1 = 0")
elif not scope_tenant_code and requested_tenant_name and requested_tenant_name != scope_value:
conditions.append("1 = 0")
else:
conditions.extend(self._document_tenant_filter_sql("d", params, requested_tenant_code, requested_tenant_name, "requested"))
elif requested_tenant_name and requested_tenant_name != scope_value:
conditions.append("1 = 0")
else:
conditions.extend(self._document_tenant_filter_sql("d", params, scope_tenant_code or None, scope_value or None, "scope"))
else:
user_scope_cond, user_scope_params = self._build_user_scope_condition(context, filters, user_alias="u")
conditions.append(user_scope_cond)
params.update(user_scope_params)
if filters.get("userId") is not None:
conditions.append("ar.trigger_user_id = :user_id")
params["user_id"] = int(filters["userId"])
if filters.get("departmentName"):
conditions.append("COALESCE(u.dep_name, '') = :department_name")
params["department_name"] = str(filters["departmentName"]).strip()
if filters.get("entryModuleId") is not None:
conditions.append("em.id = :entry_module_id")
params["entry_module_id"] = int(filters["entryModuleId"])
if filters.get("documentTypeId") is not None:
conditions.append("dt.id = :document_type_id")
params["document_type_id"] = int(filters["documentTypeId"])
if filters.get("dateFrom"):
conditions.append("COALESCE(ar.started_at, ar.created_at) >= :date_from")
params["date_from"] = self._normalize_date(filters["dateFrom"])
if filters.get("dateTo"):
conditions.append("COALESCE(ar.started_at, ar.created_at) < (CAST(:date_to AS date) + INTERVAL '1 day')")
params["date_to"] = self._normalize_date(filters["dateTo"])
return " AND ".join(conditions), params
def _build_range_clause(self, column: str, filters: dict[str, Any], *, prefix: str) -> tuple[str, dict[str, Any]]:
clauses: list[str] = []
params: dict[str, Any] = {}
if filters.get("dateFrom"):
clauses.append(f" AND {column} >= :{prefix}_date_from")
params[f"{prefix}_date_from"] = self._normalize_date(filters["dateFrom"])
if filters.get("dateTo"):
clauses.append(f" AND {column} < (CAST(:{prefix}_date_to AS date) + INTERVAL '1 day')")
params[f"{prefix}_date_to"] = self._normalize_date(filters["dateTo"])
return "".join(clauses), params
def _resolve_requested_tenant_name(
self,
requested_tenant_code: str | None,
requested_area: str | None,
context: dict[str, Any],
) -> str:
tenant_code = str(requested_tenant_code or "").strip()
if tenant_code:
if tenant_code == str(context.get("tenant_code") or "").strip():
return str(context.get("tenant_scope_value") or context.get("tenant_name") or context.get("area") or "").strip()
if tenant_code == "PUBLIC":
return "公共"
if tenant_code == "PROVINCIAL":
return "省级"
return str(requested_area or "").strip()
@staticmethod
def _document_tenant_name_sql(document_alias: str) -> str:
return (
"CASE "
f"WHEN NULLIF(BTRIM({document_alias}.tenant_code), '') = 'PUBLIC' THEN '公共' "
f"WHEN NULLIF(BTRIM({document_alias}.tenant_code), '') = 'PROVINCIAL' THEN '省级' "
f"ELSE COALESCE(NULLIF(BTRIM({document_alias}.region), ''), '未分配地区') "
"END"
)
def _user_tenant_filter_sql(
self,
user_alias: str,
params: dict[str, Any],
tenant_code: str | None,
tenant_name: str | None,
prefix: str,
) -> list[str]:
normalized_tenant_code = str(tenant_code or "").strip()
normalized_tenant_name = str(tenant_name or "").strip()
if normalized_tenant_code:
params[f"{prefix}_tenant_code"] = normalized_tenant_code
return [f"{user_alias}.tenant_code = :{prefix}_tenant_code"]
if normalized_tenant_name:
params[f"{prefix}_tenant_name"] = normalized_tenant_name
return [f"COALESCE({user_alias}.area, '') = :{prefix}_tenant_name"]
return ["1 = 0"]
def _login_tenant_filter_sql(
self,
params: dict[str, Any],
tenant_code: str | None,
tenant_name: str | None,
prefix: str,
) -> list[str]:
normalized_tenant_code = str(tenant_code or "").strip()
normalized_tenant_name = str(tenant_name or "").strip()
if normalized_tenant_code:
params[f"{prefix}_tenant_code"] = normalized_tenant_code
return [f"e.tenant_code_snapshot = :{prefix}_tenant_code"]
if normalized_tenant_name:
params[f"{prefix}_tenant_name"] = normalized_tenant_name
return [f"COALESCE(e.area_snapshot, '') = :{prefix}_tenant_name"]
return ["1 = 0"]
def _document_tenant_filter_sql(
self,
alias_prefix: str,
params: dict[str, Any],
tenant_code: str | None,
tenant_name: str | None,
prefix: str,
) -> list[str]:
normalized_tenant_code = str(tenant_code or "").strip()
normalized_tenant_name = str(tenant_name or "").strip()
if normalized_tenant_code:
params[f"{prefix}_tenant_code"] = normalized_tenant_code
return [f"{alias_prefix}.tenant_code = :{prefix}_tenant_code"]
if normalized_tenant_name:
params[f"{prefix}_tenant_name"] = normalized_tenant_name
return [f"COALESCE({alias_prefix}.region, '') = :{prefix}_tenant_name"]
return ["1 = 0"]
@staticmethod
def _normalize_date(value: Any):
from datetime import date as date_type
if isinstance(value, date_type):
return value
return date_type.fromisoformat(str(value).strip())
@staticmethod
def _detect_client_type(user_agent: str | None) -> str:
ua = str(user_agent or "").lower()
if any(token in ua for token in ["iphone", "android", "mobile", "ipad"]):
return "mobile"
if ua:
return "pc"
return "other"
@staticmethod
def _normalize_failure_reason(value: str | None, limit: int = 255) -> str | None:
"""裁剪登录失败原因,避免审计表二次写入失败。"""
if value is None:
return None
normalized = " ".join(str(value).split()).strip()
if not normalized:
return None
if len(normalized) <= limit:
return normalized
if limit <= 3:
return normalized[:limit]
return f"{normalized[: limit - 3]}..."
@staticmethod
def _to_int(value: Any) -> int | None:
try:
return int(value) if value is not None and str(value) != "" else None
except (TypeError, ValueError):
return None
@staticmethod
def _format_datetime(value: Any) -> str | None:
if not value:
return None
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d %H:%M:%S")
return str(value)
@staticmethod
def _pagination(filters: dict[str, Any]) -> tuple[int, int, int]:
page = max(int(filters.get("page") or 1), 1)
page_size = max(1, min(int(filters.get("pageSize") or 20), 200))
return page, page_size, (page - 1) * page_size