"""SQLAlchemy 异步引擎和 session 工厂。""" from __future__ import annotations from contextlib import asynccontextmanager from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from fastapi_admin.config import ASYNCPG_DATABASE_URL _engine = create_async_engine(ASYNCPG_DATABASE_URL, echo=False, pool_size=20, max_overflow=10) _AsyncSessionFactory = async_sessionmaker(_engine, class_=AsyncSession, expire_on_commit=False) @asynccontextmanager async def GetAsyncSession(): """获取异步数据库 session(上下文管理器)。""" async with _AsyncSessionFactory() as session: yield session await session.commit()