Files
2026-04-28 16:53:16 +08:00

40 lines
1.2 KiB
Python

"""SQLAlchemy 异步引擎和 session 工厂。"""
from __future__ import annotations
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.pool import NullPool
from fastapi_admin.config import ASYNCPG_DATABASE_URL
_engine: AsyncEngine | None = None
_AsyncSessionFactory: async_sessionmaker[AsyncSession] | None = None
def _get_session_factory() -> async_sessionmaker[AsyncSession]:
"""按需创建 session factory,避免 reload/多事件循环复用旧连接池。"""
global _engine, _AsyncSessionFactory
if _AsyncSessionFactory is None:
_engine = create_async_engine(
ASYNCPG_DATABASE_URL,
echo=False,
poolclass=NullPool,
)
_AsyncSessionFactory = async_sessionmaker(
_engine,
class_=AsyncSession,
expire_on_commit=False,
)
return _AsyncSessionFactory
@asynccontextmanager
async def GetAsyncSession():
"""获取异步数据库 session(上下文管理器)。"""
SessionFactory = _get_session_factory()
async with SessionFactory() as session:
yield session
await session.commit()