Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/models/jwtToken.py
T
wren 535d97a70c chore: initial commit — leaudit-platform project skeleton
17-table PostgreSQL schema with full Chinese column comments,
FastAPI project structure (admin/common/modules),
DSL rule files, and schema migration scripts.
2026-04-27 16:48:22 +08:00

78 lines
3.3 KiB
Python

"""JWT Token 模型 —— jwt_tokens 表。
记录每次签发的 Token 生命周期:签发、刷新、撤销。
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, String, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from fastapi_common.fastapi_common_web.models import BaseModel
class JwtToken(BaseModel):
"""JWT Token 记录表。"""
__tablename__ = "jwt_tokens"
Id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
userId: Mapped[int] = mapped_column(BigInteger, comment="用户ID")
tokenJti: Mapped[str] = mapped_column(String(128), comment="Token JTI")
tokenHash: Mapped[str] = mapped_column(String(128), comment="Access Token SHA256")
refreshTokenHash: Mapped[str | None] = mapped_column(String(128), comment="Refresh Token SHA256")
tokenType: Mapped[str] = mapped_column(String(32), default="ACCESS", comment="ACCESS/REFRESH")
deviceId: Mapped[str | None] = mapped_column(String(128))
deviceName: Mapped[str | None] = mapped_column(String(256))
userAgent: Mapped[str | None] = mapped_column(String(512))
ipAddress: Mapped[str | None] = mapped_column(String(64))
issuedAt: Mapped[datetime] = mapped_column(DateTime(timezone=True))
expiresAt: Mapped[datetime] = mapped_column(DateTime(timezone=True))
refreshExpiresAt: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
lastUsedAt: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
isRevoked: Mapped[bool] = mapped_column(Boolean, default=False)
revokedAt: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
revokeReason: Mapped[str | None] = mapped_column(String(256))
@classmethod
async def get_by_jti(cls, session: AsyncSession, jti: str) -> "JwtToken | None":
"""按 JTI 查询 Token 记录。"""
return await session.scalar(select(cls).where(cls.tokenJti == jti))
@classmethod
async def revoke_by_jti(cls, session: AsyncSession, jti: str, reason: str = "") -> None:
"""撤销指定 JTI 的 Token。"""
await session.execute(
update(cls)
.where(cls.tokenJti == jti)
.values(isRevoked=True, revokedAt=datetime.now(), revokeReason=reason)
)
@classmethod
async def revoke_all_user_tokens(cls, session: AsyncSession, userId: int, reason: str = "") -> list[str]:
"""撤销用户的所有活跃 Token,返回被撤销的 JTI 列表。"""
result = await session.execute(
select(cls.tokenJti).where(cls.userId == userId, cls.isRevoked == False)
)
jtis = [row[0] for row in result.fetchall()]
await session.execute(
update(cls)
.where(cls.userId == userId, cls.isRevoked == False)
.values(isRevoked=True, revokedAt=datetime.now(), revokeReason=reason)
)
return jtis
@classmethod
async def cleanup_expired(cls, session: AsyncSession, before: datetime) -> int:
"""清理过期的 Token 记录,返回删除数。"""
result = await session.execute(
select(cls).where(cls.expiresAt < before)
)
rows = result.scalars().all()
for row in rows:
await session.delete(row)
return len(rows)