From 5e682762efd10d7bf7a880f849a820582b6a955d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 23 Dec 2023 03:31:07 +0100 Subject: [PATCH] Python/SQLAlchemy: Demonstrate support for `asyncpg` and `psycopg` The `sqlalchemy-cratedb` package supports the vanilla HTTP-based transport using urllib3, and the standard PostgreSQL drivers `asyncpg` and `psycopg`. --- by-language/python-sqlalchemy/README.rst | 23 ++- .../python-sqlalchemy/async_streaming.py | 168 +++++++++++++++ by-language/python-sqlalchemy/async_table.py | 194 ++++++++++++++++++ .../python-sqlalchemy/requirements.txt | 2 +- by-language/python-sqlalchemy/sync_table.py | 167 +++++++++++++++ by-language/python-sqlalchemy/test.py | 16 +- 6 files changed, 563 insertions(+), 7 deletions(-) create mode 100644 by-language/python-sqlalchemy/async_streaming.py create mode 100644 by-language/python-sqlalchemy/async_table.py create mode 100644 by-language/python-sqlalchemy/sync_table.py diff --git a/by-language/python-sqlalchemy/README.rst b/by-language/python-sqlalchemy/README.rst index 3397d1e0..9a31f2f4 100644 --- a/by-language/python-sqlalchemy/README.rst +++ b/by-language/python-sqlalchemy/README.rst @@ -79,9 +79,7 @@ Navigate to example program directory, and install prerequisites:: Examples ******** -Run example programs:: - - # Connect to CrateDB on localhost. +The ``insert`` example programs are about efficient data loading:: time python insert_efficient.py cratedb multirow time python insert_efficient.py cratedb batched @@ -92,7 +90,21 @@ Run example programs:: time python insert_dask.py -Use ``insert_pandas.py`` to connect to any other database instance:: +The ``sync`` and ``async`` example programs demonstrate SQLAlchemy's +low-level table/core API using both the HTTP-based transport driver +using ``urllib3``, as well as the canonical ``asyncpg`` and ``psycopg3`` +drivers using the PostgreSQL wire protocol:: + + time python sync_table.py urllib3 psycopg + time python async_table.py asyncpg psycopg + time python async_streaming.py asyncpg psycopg + +Connect to CrateDB Cloud +======================== + +By default, the example programs will connect to CrateDB on ``localhost``. +In order to connect to any other database instance, for example on CrateDB +Cloud:: export DBURI="crate://crate@localhost:4200/" export DBURI="crate://admin:@example.aks1.westeurope.azure.cratedb.net:4200?ssl=true" @@ -100,7 +112,8 @@ Use ``insert_pandas.py`` to connect to any other database instance:: .. TIP:: - For more information, please refer to the header sections of each of the provided example programs. + For more information, please refer to the header sections of each of the + provided example programs. ***** diff --git a/by-language/python-sqlalchemy/async_streaming.py b/by-language/python-sqlalchemy/async_streaming.py new file mode 100644 index 00000000..ee972eba --- /dev/null +++ b/by-language/python-sqlalchemy/async_streaming.py @@ -0,0 +1,168 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API, in +asynchronous mode. + +Specific to the asynchronous mode of SQLAlchemy is the streaming of results: + +> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()` +> method that returns an `AsyncResult` object. This result object uses a server-side cursor +> and provides an async/await API, such as an async iterator. +> +> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core + +Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used. +The corresponding SQLAlchemy dialect identifiers are:: + + # PostgreSQL protocol on port 5432, using `asyncpg` + crate+asyncpg://crate@localhost:5432/doc + + # PostgreSQL protocol on port 5432, using `psycopg` + crate+psycopg://crate@localhost:5432/doc + + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use PostgreSQL protocol, with `asyncpg` + python async_streaming.py asyncpg + + # Use PostgreSQL protocol, with asynchronous support of `psycopg` + python async_streaming.py psycopg + + # Use with both variants + python async_streaming.py asyncpg psycopg + +""" +import asyncio +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import create_async_engine + +metadata = sa.MetaData() +table = sa.Table( + "t1", + metadata, + sa.Column("id", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("name", sa.String), +) + + +class AsynchronousTableStreamingExample: + """ + Demonstrate reading streamed results when using the CrateDB SQLAlchemy + dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers. + + - https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core + - https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return create_async_engine(self.dsn, echo=True) + + async def run(self): + """ + Run the whole recipe. + """ + await self.create_and_insert() + await self.read_buffered() + await self.read_streaming() + + async def create_and_insert(self): + """ + Create table schema, completely dropping it upfront, and insert a few records. + """ + # conn is an instance of AsyncConnection + async with self.engine.begin() as conn: + # to support SQLAlchemy DDL methods as well as legacy functions, the + # AsyncConnection.run_sync() awaitable method will pass a "sync" + # version of the AsyncConnection object to any synchronous method, + # where synchronous IO calls will be transparently translated for + # await. + await conn.run_sync(metadata.drop_all, checkfirst=True) + await conn.run_sync(metadata.create_all) + + # for normal statement execution, a traditional "await execute()" + # pattern is used. + await conn.execute( + table.insert(), + [{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}], + ) + + # CrateDB specifics to flush/synchronize the write operation. + await conn.execute(sa.text("REFRESH TABLE t1;")) + + async def read_buffered(self): + """ + Read data from the database, in buffered mode. + """ + async with self.engine.connect() as conn: + # the default result object is the + # sqlalchemy.engine.Result object + result = await conn.execute(table.select()) + + # the results are buffered so no await call is necessary + # for this case. + print(result.fetchall()) + + async def read_streaming(self): + """ + Read data from the database, in streaming mode. + """ + async with self.engine.connect() as conn: + + # for a streaming result that buffers only segments of the + # result at time, the AsyncConnection.stream() method is used. + # this returns a sqlalchemy.ext.asyncio.AsyncResult object. + async_result = await conn.stream(table.select()) + + # this object supports async iteration and awaitable + # versions of methods like .all(), fetchmany(), etc. + async for row in async_result: + print(row) + + +async def run_example(dsn: str): + example = AsynchronousTableStreamingExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + await example.run() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "asyncpg": + dsn = "crate+asyncpg://crate@localhost:5432/doc" + elif driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + asyncio.run(run_example(dsn)) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/by-language/python-sqlalchemy/async_table.py b/by-language/python-sqlalchemy/async_table.py new file mode 100644 index 00000000..ff26838a --- /dev/null +++ b/by-language/python-sqlalchemy/async_table.py @@ -0,0 +1,194 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API, in +asynchronous mode. + +Both the PostgreSQL drivers `asyncpg` and `psycopg` can be used. +The corresponding SQLAlchemy dialect identifiers are:: + + # PostgreSQL protocol on port 5432, using `asyncpg` + crate+asyncpg://crate@localhost:5432/doc + + # PostgreSQL protocol on port 5432, using `psycopg` + crate+psycopg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use PostgreSQL protocol, with `asyncpg` + python async_table.py asyncpg + + # Use PostgreSQL protocol, with asynchronous support of `psycopg` + python async_table.py psycopg + + # Use with both variants + python async_table.py asyncpg psycopg + +""" +import asyncio +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import create_async_engine + + +class AsynchronousTableExample: + """ + Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode, + using the `asyncpg` and `psycopg` drivers. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True) + + @property + @lru_cache + def table(self): + """ + Provide an SQLAlchemy table object. + """ + metadata = sa.MetaData() + return sa.Table( + "testdrive", + metadata, + sa.Column("x", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("y", sa.Integer), + ) + + async def conn_run_sync(self, func: t.Callable, *args, **kwargs): + """ + To support SQLAlchemy DDL methods as well as legacy functions, the + AsyncConnection.run_sync() awaitable method will pass a "sync" + version of the AsyncConnection object to any synchronous method, + where synchronous IO calls will be transparently translated for + await. + + https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html + """ + # `conn` is an instance of `AsyncConnection` + async with self.engine.begin() as conn: + return await conn.run_sync(func, *args, **kwargs) + + async def run(self): + """ + Run the whole recipe, returning the result from the "read" step. + """ + await self.create() + await self.insert(sync=True) + return await self.read() + + async def create(self): + """ + Create table schema, completely dropping it upfront. + """ + await self.conn_run_sync(self.table.drop, checkfirst=True) + await self.conn_run_sync(self.table.create) + + async def insert(self, sync: bool = False): + """ + Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account. + """ + async with self.engine.begin() as conn: + stmt = self.table.insert().values(x=1, y=42) + await conn.execute(stmt) + stmt = self.table.insert().values(x=2, y=42) + await conn.execute(stmt) + if sync and self.dsn.startswith("crate"): + await conn.execute(sa.text("REFRESH TABLE testdrive;")) + + async def read(self): + """ + Read data from the database. + """ + async with self.engine.begin() as conn: + cursor = await conn.execute(sa.text("SELECT * FROM testdrive;")) + return cursor.fetchall() + + async def reflect(self): + """ + Reflect the table schema from the database. + """ + + # Optionally enable tracing SQLAlchemy calls. + # self.trace() + + def reflect(session): + """ + A function written in "synchronous" style that will be invoked + within the asyncio event loop. + + The session object passed is a traditional orm.Session object with + synchronous interface. + + https://docs.sqlalchemy.org/en/20/_modules/asyncio/greenlet_orm.html + """ + meta = sa.MetaData() + reflected_table = sa.Table("testdrive", meta, autoload_with=session) + print("Table information:") + print(f"Table: {reflected_table}") + print(f"Columns: {reflected_table.columns}") + print(f"Constraints: {reflected_table.constraints}") + print(f"Primary key: {reflected_table.primary_key}") + + return await self.conn_run_sync(reflect) + + @staticmethod + def trace(): + """ + Trace execution flow through SQLAlchemy. + + pip install hunter + """ + from hunter import Q, trace + + constraint = Q(module_startswith="sqlalchemy") + trace(constraint) + + +async def run_example(dsn: str): + example = AsynchronousTableExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + result = await example.run() + print(result) + + # Reflect the table schema. + await example.reflect() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "asyncpg": + dsn = "crate+asyncpg://crate@localhost:5432/doc" + elif driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + asyncio.run(run_example(dsn)) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/by-language/python-sqlalchemy/requirements.txt b/by-language/python-sqlalchemy/requirements.txt index ddd8db2d..bde3120c 100644 --- a/by-language/python-sqlalchemy/requirements.txt +++ b/by-language/python-sqlalchemy/requirements.txt @@ -1,6 +1,6 @@ click<9 colorlog<7 -crate[sqlalchemy] dask==2023.12.1 pandas<2.2 sqlalchemy>=2,<2.1 +sqlalchemy-cratedb[all] @ git+https://github.com/crate-workbench/sqlalchemy-cratedb@amo/postgresql-async diff --git a/by-language/python-sqlalchemy/sync_table.py b/by-language/python-sqlalchemy/sync_table.py new file mode 100644 index 00000000..3d2b8110 --- /dev/null +++ b/by-language/python-sqlalchemy/sync_table.py @@ -0,0 +1,167 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API. + +Both the HTTP driver based on `urllib3`, and the PostgreSQL driver based on +`psycopg` are exercised. The corresponding SQLAlchemy dialect identifiers are:: + + # CrateDB HTTP API on port 4200 + crate+urllib3://localhost:4200/doc + + # PostgreSQL protocol on port 5432 + crate+psycopg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use HTTP API + python sync_table.py urllib3 + + # Use PostgreSQL protocol + python sync_table.py psycopg + + # Use with both variants + python sync_table.py urllib3 psycopg + +""" +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa + + +class SynchronousTableExample: + """ + Demonstrate the CrateDB SQLAlchemy dialect with the `urllib3` and `psycopg` drivers. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return sa.create_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True) + + @property + @lru_cache + def table(self): + """ + Provide an SQLAlchemy table object. + """ + metadata = sa.MetaData() + return sa.Table( + "testdrive", + metadata, + # TODO: When omitting `autoincrement`, SA's DDL generator will use `SERIAL`. + # (psycopg.errors.InternalError_) Cannot find data type: serial + # This is probably one more thing to redirect to the CrateDialect. + sa.Column("x", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("y", sa.Integer), + ) + + def run(self): + """ + Run the whole recipe, returning the result from the "read" step. + """ + self.create() + self.insert(sync=True) + return self.read() + + def create(self): + """ + Create table schema, completely dropping it upfront. + """ + self.table.drop(bind=self.engine, checkfirst=True) + self.table.create(bind=self.engine) + + def insert(self, sync: bool = False): + """ + Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account. + """ + with self.engine.begin() as session: + stmt = self.table.insert().values(x=1, y=42) + session.execute(stmt) + stmt = self.table.insert().values(x=2, y=42) + session.execute(stmt) + if sync and self.dsn.startswith("crate"): + session.execute(sa.text("REFRESH TABLE testdrive;")) + + def read(self): + """ + Read data from the database. + """ + with self.engine.begin() as session: + cursor = session.execute(sa.text("SELECT * FROM testdrive;")) + return cursor.fetchall() + + def reflect(self): + """ + Reflect the table schema from the database. + """ + meta = sa.MetaData() + + # Optionally enable tracing SQLAlchemy calls. + # self.trace() + + reflected_table = sa.Table("testdrive", meta, autoload_with=self.engine) + print("Table information:") + print(f"Table: {reflected_table}") + print(f"Columns: {reflected_table.columns}") + print(f"Constraints: {reflected_table.constraints}") + print(f"Primary key: {reflected_table.primary_key}") + + @staticmethod + def trace(): + """ + Trace execution flow through SQLAlchemy. + + pip install hunter + """ + from hunter import Q, trace + + constraint = Q(module_startswith="sqlalchemy") + trace(constraint) + + +def run_example(dsn: str): + example = SynchronousTableExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + result = example.run() + print(result) + + # Reflect the table schema. + # example.reflect() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "urllib3": + dsn = "crate+urllib3://localhost:4200/doc" + elif driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + run_example(dsn) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/by-language/python-sqlalchemy/test.py b/by-language/python-sqlalchemy/test.py index 3c21b292..fd168a29 100644 --- a/by-language/python-sqlalchemy/test.py +++ b/by-language/python-sqlalchemy/test.py @@ -1,4 +1,3 @@ -import os import shlex import subprocess import pytest @@ -34,3 +33,18 @@ def test_insert_pandas(): def test_insert_dask(): cmd = "time python insert_dask.py" run(cmd) + + +def test_sync_table(): + cmd = "time python sync_table.py urllib3 psycopg" + run(cmd) + + +def test_async_table(): + cmd = "time python async_table.py psycopg asyncpg" + run(cmd) + + +def test_async_streaming(): + cmd = "time python async_streaming.py psycopg asyncpg" + run(cmd)