Files

205 lines
7.3 KiB
Python

"""统一租户解析器与兼容层。"""
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
@dataclass(slots=True)
class TenantResolution:
"""租户解析结果。"""
tenant_code: str | None
tenant_name: str | None
tenant_type: str | None
raw_value: str | None
normalized_value: str | None
source: str
is_public: bool = False
class TenantResolver:
"""统一租户解析器。
当前阶段目标:
1. 兼容 area / region / tenant_name 等历史字段
2. 尽可能解析成稳定 tenant_code
3. 为旧业务继续保留 normalized 中文值,避免立即打崩 area 依赖代码
"""
_PUBLIC_SENTINELS = {"default", "公共"}
_PROVINCIAL_SENTINELS = {"省级", "省局"}
def __init__(self) -> None:
self._table_exists_cache: dict[str, bool] = {}
async def Resolve(
self,
RawValue: str | None,
Source: str = "generic",
FallbackTenantName: str | None = None,
PreferredTenantCode: str | None = None,
) -> TenantResolution:
normalized = self._normalize(RawValue)
if PreferredTenantCode and PreferredTenantCode.strip():
tenant = await self._loadTenantByCode(PreferredTenantCode.strip())
if tenant:
return self._to_resolution(tenant=tenant, raw_value=RawValue, normalized_value=normalized, source=Source)
if normalized in self._PUBLIC_SENTINELS:
tenant = await self._loadTenantByCode("PUBLIC")
if tenant:
return self._to_resolution(tenant=tenant, raw_value=RawValue, normalized_value=normalized, source=Source, force_public=True)
if normalized in self._PROVINCIAL_SENTINELS:
tenant = await self._loadTenantByCode("PROVINCIAL")
if tenant:
return self._to_resolution(tenant=tenant, raw_value=RawValue, normalized_value=normalized, source=Source)
if normalized:
tenant = await self._loadTenantByAlias(normalized)
if tenant:
return self._to_resolution(tenant=tenant, raw_value=RawValue, normalized_value=normalized, source=Source)
fallback_name = self._normalize(FallbackTenantName)
if fallback_name:
tenant = await self._loadTenantByAlias(fallback_name)
if tenant:
return self._to_resolution(tenant=tenant, raw_value=RawValue, normalized_value=normalized or fallback_name, source=Source)
return TenantResolution(
tenant_code=None,
tenant_name=fallback_name or normalized,
tenant_type=None,
raw_value=RawValue,
normalized_value=normalized or fallback_name,
source=Source,
is_public=False,
)
async def ResolveUserContext(
self,
*,
Area: str | None,
TenantCode: str | None,
TenantName: str | None,
Source: str = "user_context",
) -> TenantResolution:
"""按用户上下文解析租户。"""
return await self.Resolve(
RawValue=Area,
Source=Source,
FallbackTenantName=TenantName,
PreferredTenantCode=TenantCode,
)
@staticmethod
def _normalize(value: str | None) -> str | None:
if value is None:
return None
trimmed = str(value).strip()
return trimmed if trimmed else ""
async def _loadTenantByCode(self, tenant_code: str) -> dict | None:
if not await self._table_exists("sys_tenants"):
return None
async with GetAsyncSession() as session:
row = (
await session.execute(
text(
"""
SELECT tenant_code, tenant_name, tenant_type, COALESCE(is_public, FALSE) AS is_public
FROM sys_tenants
WHERE tenant_code = :tenant_code
AND deleted_at IS NULL
AND is_enabled = TRUE
LIMIT 1
"""
),
{"tenant_code": tenant_code},
)
).mappings().first()
return dict(row) if row else None
async def _loadTenantByAlias(self, alias_value: str) -> dict | None:
if not await self._table_exists("sys_tenants") or not await self._table_exists("sys_tenant_aliases"):
return None
async with GetAsyncSession() as session:
row = (
await session.execute(
text(
"""
SELECT t.tenant_code, t.tenant_name, t.tenant_type, COALESCE(t.is_public, FALSE) AS is_public
FROM sys_tenant_aliases a
JOIN sys_tenants t ON t.tenant_code = a.tenant_code
WHERE a.alias_value = :alias_value
AND a.deleted_at IS NULL
AND a.is_enabled = TRUE
AND t.deleted_at IS NULL
AND t.is_enabled = TRUE
ORDER BY
CASE a.alias_type
WHEN 'DISPLAY' THEN 1
WHEN 'LEGACY_AREA' THEN 2
WHEN 'LEGACY_REGION' THEN 3
ELSE 9
END ASC,
a.id ASC
LIMIT 1
"""
),
{"alias_value": alias_value},
)
).mappings().first()
return dict(row) if row else None
async def _table_exists(self, table_name: str) -> bool:
"""缓存表存在性,兼容旧环境未完成租户基础表建表的场景。"""
cached = self._table_exists_cache.get(table_name)
if cached is not None:
return cached
async with GetAsyncSession() as session:
row = (
await session.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = current_schema()
AND table_name = :table_name
) AS table_exists
"""
),
{"table_name": table_name},
)
).scalar_one()
exists = bool(row)
self._table_exists_cache[table_name] = exists
return exists
@staticmethod
def _to_resolution(
*,
tenant: dict,
raw_value: str | None,
normalized_value: str | None,
source: str,
force_public: bool = False,
) -> TenantResolution:
return TenantResolution(
tenant_code=str(tenant.get("tenant_code") or ""),
tenant_name=tenant.get("tenant_name"),
tenant_type=tenant.get("tenant_type"),
raw_value=raw_value,
normalized_value=normalized_value,
source=source,
is_public=bool(tenant.get("is_public")) or force_public,
)