examples.asyncio.async_orm_writeonly 源代码

"""Illustrates using **write only relationships** for simpler handling
of ORM collections under asyncio.

"""

from __future__ import annotations

import asyncio
import datetime
from typing import Optional

from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import WriteOnlyMapped


class Base(AsyncAttrs, DeclarativeBase):
    pass


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[Optional[str]]
    create_date: Mapped[datetime.datetime] = mapped_column(
        server_default=func.now()
    )

    # collection relationships are declared with WriteOnlyMapped.  There
    # is no separate collection type
    bs: WriteOnlyMapped[B] = relationship()


class B(Base):
    __tablename__ = "b"
    id: Mapped[int] = mapped_column(primary_key=True)
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
    data: Mapped[Optional[str]]


async def async_main():
    """Main program function."""

    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async_session = async_sessionmaker(engine, expire_on_commit=False)

    async with async_session() as session:
        async with session.begin():
            # WriteOnlyMapped may be populated using any iterable,
            # e.g. lists, sets, etc.
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )

        stmt = select(A)

        result = await session.scalars(stmt)

        for a1 in result:
            print(a1)
            print(f"created at: {a1.create_date}")

            # to iterate a collection, emit a SELECT statement
            for b1 in await session.scalars(a1.bs.select()):
                print(b1)

        result = await session.stream(stmt)

        async for a1 in result.scalars():
            print(a1)

            # similar using "streaming" (server side cursors)
            async for b1 in (await session.stream(a1.bs.select())).scalars():
                print(b1)

        await session.commit()
        result = await session.scalars(select(A).order_by(A.id))

        a1 = result.first()

        a1.data = "new data"


asyncio.run(async_main())