From 75dee5d20d4a98fb111033170eba836876fcac14 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sat, 23 Dec 2023 03:31:07 +0100 Subject: [PATCH] Python/SQLAlchemy: Demonstrate support for `asyncpg` and `psycopg` The `sqlalchemy-cratedb` package supports the vanilla HTTP-based transport using urllib3, and the standard PostgreSQL drivers `asyncpg` and `psycopg`. --- by-language/python-sqlalchemy/README.rst | 4 + .../python-sqlalchemy/async_streaming.py | 174 ++++++++++++++++ by-language/python-sqlalchemy/async_table.py | 193 ++++++++++++++++++ .../python-sqlalchemy/requirements.txt | 2 +- by-language/python-sqlalchemy/sync_table.py | 165 +++++++++++++++ by-language/python-sqlalchemy/test.py | 15 ++ 6 files changed, 552 insertions(+), 1 deletion(-) create mode 100644 by-language/python-sqlalchemy/async_streaming.py create mode 100644 by-language/python-sqlalchemy/async_table.py create mode 100644 by-language/python-sqlalchemy/sync_table.py diff --git a/by-language/python-sqlalchemy/README.rst b/by-language/python-sqlalchemy/README.rst index 3397d1e0..365fe20e 100644 --- a/by-language/python-sqlalchemy/README.rst +++ b/by-language/python-sqlalchemy/README.rst @@ -92,6 +92,10 @@ Run example programs:: time python insert_dask.py + time python sync_table.py urllib3 psycopg + time python async_table.py psycopg asyncpg + time python async_streaming.py psycopg asyncpg + Use ``insert_pandas.py`` to connect to any other database instance:: export DBURI="crate://crate@localhost:4200/" diff --git a/by-language/python-sqlalchemy/async_streaming.py b/by-language/python-sqlalchemy/async_streaming.py new file mode 100644 index 00000000..c1e5b673 --- /dev/null +++ b/by-language/python-sqlalchemy/async_streaming.py @@ -0,0 +1,174 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API, this +time in asynchronous mode. + +Specific to the asynchronous mode of SQLAlchemy is the streaming of results: + +> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()` +> method that returns an `AsyncResult` object. This result object uses a server-side cursor +> and provides an async/await API, such as an async iterator. +> +> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core + +Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised. +The corresponding SQLAlchemy dialect identifiers are:: + + # PostgreSQL protocol on port 5432, using `psycopg` + crate+psycopg://crate@localhost:5432/doc + + # PostgreSQL protocol on port 5432, using `asyncpg` + crate+asyncpg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use PostgreSQL protocol, with asynchronous support of `psycopg` + python async_streaming.py psycopg + + # Use PostgreSQL protocol, with `asyncpg` + python async_streaming.py asyncpg + + # Use with both variants + python async_streaming.py psycopg asyncpg + +Bugs +==== + +When using the `psycopg` driver, the program currently croaks like:: + + sqlalchemy.exc.InternalError: (psycopg.errors.InternalError_) Cannot find portal: c_10479c0a0_1 + +""" +import asyncio +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import create_async_engine + +metadata = sa.MetaData() +table = sa.Table( + "t1", + metadata, + sa.Column("id", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("name", sa.String), +) + + +class AsynchronousTableStreamingExample: + """ + Demonstrate reading streamed results when using the CrateDB SQLAlchemy + dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers. + + - https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core + - https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return create_async_engine(self.dsn, echo=True) + + async def run(self): + """ + Run the whole recipe. + """ + await self.create_and_insert() + await self.read_buffered() + await self.read_streaming() + + async def create_and_insert(self): + """ + Create table schema, completely dropping it upfront, and insert a few records. + """ + # conn is an instance of AsyncConnection + async with self.engine.begin() as conn: + # to support SQLAlchemy DDL methods as well as legacy functions, the + # AsyncConnection.run_sync() awaitable method will pass a "sync" + # version of the AsyncConnection object to any synchronous method, + # where synchronous IO calls will be transparently translated for + # await. + await conn.run_sync(metadata.drop_all, checkfirst=True) + await conn.run_sync(metadata.create_all) + + # for normal statement execution, a traditional "await execute()" + # pattern is used. + await conn.execute( + table.insert(), + [{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}], + ) + + # CrateDB specifics to flush/synchronize the write operation. + await conn.execute(sa.text("REFRESH TABLE t1;")) + + async def read_buffered(self): + """ + Read data from the database, in buffered mode. + """ + async with self.engine.connect() as conn: + # the default result object is the + # sqlalchemy.engine.Result object + result = await conn.execute(table.select()) + + # the results are buffered so no await call is necessary + # for this case. + print(result.fetchall()) + + async def read_streaming(self): + """ + Read data from the database, in streaming mode. + """ + async with self.engine.connect() as conn: + + # for a streaming result that buffers only segments of the + # result at time, the AsyncConnection.stream() method is used. + # this returns a sqlalchemy.ext.asyncio.AsyncResult object. + async_result = await conn.stream(table.select()) + + # this object supports async iteration and awaitable + # versions of methods like .all(), fetchmany(), etc. + async for row in async_result: + print(row) + + +async def run_example(dsn: str): + example = AsynchronousTableStreamingExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + await example.run() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + elif driver == "asyncpg": + dsn = "crate+asyncpg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + asyncio.run(run_example(dsn)) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/by-language/python-sqlalchemy/async_table.py b/by-language/python-sqlalchemy/async_table.py new file mode 100644 index 00000000..80b56131 --- /dev/null +++ b/by-language/python-sqlalchemy/async_table.py @@ -0,0 +1,193 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API, this +time in asynchronous mode. + +Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised. +The corresponding SQLAlchemy dialect identifiers are:: + + # PostgreSQL protocol on port 5432, using `psycopg` + crate+psycopg://crate@localhost:5432/doc + + # PostgreSQL protocol on port 5432, using `asyncpg` + crate+asyncpg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use PostgreSQL protocol, with asynchronous support of `psycopg` + python async_table.py psycopg + + # Use PostgreSQL protocol, with `asyncpg` + python async_table.py asyncpg + + # Use with both variants + python async_table.py psycopg asyncpg + +""" +import asyncio +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import create_async_engine + + +class AsynchronousTableExample: + """ + Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True) + + @property + @lru_cache + def table(self): + """ + Provide an SQLAlchemy table object. + """ + metadata = sa.MetaData() + return sa.Table( + "testdrive", + metadata, + sa.Column("x", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("y", sa.Integer), + ) + + async def conn_run_sync(self, func: t.Callable, *args, **kwargs): + """ + To support SQLAlchemy DDL methods as well as legacy functions, the + AsyncConnection.run_sync() awaitable method will pass a "sync" + version of the AsyncConnection object to any synchronous method, + where synchronous IO calls will be transparently translated for + await. + + https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html + """ + # `conn` is an instance of `AsyncConnection` + async with self.engine.begin() as conn: + return await conn.run_sync(func, *args, **kwargs) + + async def run(self): + """ + Run the whole recipe, returning the result from the "read" step. + """ + await self.create() + await self.insert(sync=True) + return await self.read() + + async def create(self): + """ + Create table schema, completely dropping it upfront. + """ + await self.conn_run_sync(self.table.drop, checkfirst=True) + await self.conn_run_sync(self.table.create) + + async def insert(self, sync: bool = False): + """ + Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account. + """ + async with self.engine.begin() as conn: + stmt = self.table.insert().values(x=1, y=42) + await conn.execute(stmt) + stmt = self.table.insert().values(x=2, y=42) + await conn.execute(stmt) + if sync and self.dsn.startswith("crate"): + await conn.execute(sa.text("REFRESH TABLE testdrive;")) + + async def read(self): + """ + Read data from the database. + """ + async with self.engine.begin() as conn: + cursor = await conn.execute(sa.text("SELECT * FROM testdrive;")) + return cursor.fetchall() + + async def reflect(self): + """ + Reflect the table schema from the database. + """ + + # Debugging. + # self.trace() + + def reflect(session): + """ + A function written in "synchronous" style that will be invoked + within the asyncio event loop. + + The session object passed is a traditional orm.Session object with + synchronous interface. + + https://docs.sqlalchemy.org/en/20/_modules/asyncio/greenlet_orm.html + """ + meta = sa.MetaData() + reflected_table = sa.Table("testdrive", meta, autoload_with=session) + print("Table information:") + print(f"Table: {reflected_table}") + print(f"Columns: {reflected_table.columns}") + print(f"Constraints: {reflected_table.constraints}") + print(f"Primary key: {reflected_table.primary_key}") + + return await self.conn_run_sync(reflect) + + @staticmethod + def trace(): + """ + Trace execution flow through SQLAlchemy. + + pip install hunter + """ + from hunter import Q, trace + + constraint = Q(module_startswith="sqlalchemy") + trace(constraint) + + +async def run_example(dsn: str): + example = AsynchronousTableExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + result = await example.run() + print(result) + + # Reflect the table schema. + await example.reflect() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + elif driver == "asyncpg": + dsn = "crate+asyncpg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + asyncio.run(run_example(dsn)) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/by-language/python-sqlalchemy/requirements.txt b/by-language/python-sqlalchemy/requirements.txt index ddd8db2d..bde3120c 100644 --- a/by-language/python-sqlalchemy/requirements.txt +++ b/by-language/python-sqlalchemy/requirements.txt @@ -1,6 +1,6 @@ click<9 colorlog<7 -crate[sqlalchemy] dask==2023.12.1 pandas<2.2 sqlalchemy>=2,<2.1 +sqlalchemy-cratedb[all] @ git+https://github.com/crate-workbench/sqlalchemy-cratedb@amo/postgresql-async diff --git a/by-language/python-sqlalchemy/sync_table.py b/by-language/python-sqlalchemy/sync_table.py new file mode 100644 index 00000000..424b299c --- /dev/null +++ b/by-language/python-sqlalchemy/sync_table.py @@ -0,0 +1,165 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API. + +Both the HTTP driver based on `urllib3`, and the PostgreSQL driver based on +`psycopg` are exercised. The corresponding SQLAlchemy dialect identifiers are:: + + # CrateDB HTTP API on port 4200 + crate+urllib3://localhost:4200/doc + + # PostgreSQL protocol on port 5432 + crate+psycopg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use HTTP API + python sync_table.py urllib3 + + # Use PostgreSQL protocol + python sync_table.py psycopg + + # Use with both variants + python sync_table.py urllib3 psycopg + +""" +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa + + +class SynchronousTableExample: + """ + Demonstrate the CrateDB SQLAlchemy dialect with the `urllib3` and `psycopg` drivers. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return sa.create_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True) + + @property + @lru_cache + def table(self): + """ + Provide an SQLAlchemy table object. + """ + metadata = sa.MetaData() + return sa.Table( + "testdrive", + metadata, + # TODO: When omitting `autoincrement`, SA's DDL generator will use `SERIAL`. + # (psycopg.errors.InternalError_) Cannot find data type: serial + # This is probably one more thing to redirect to the CrateDialect. + sa.Column("x", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("y", sa.Integer), + ) + + def run(self): + """ + Run the whole recipe, returning the result from the "read" step. + """ + self.create() + self.insert(sync=True) + return self.read() + + def create(self): + """ + Create table schema, completely dropping it upfront. + """ + self.table.drop(bind=self.engine, checkfirst=True) + self.table.create(bind=self.engine) + + def insert(self, sync: bool = False): + """ + Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account. + """ + with self.engine.begin() as session: + stmt = self.table.insert().values(x=1, y=42) + session.execute(stmt) + stmt = self.table.insert().values(x=2, y=42) + session.execute(stmt) + if sync and self.dsn.startswith("crate"): + session.execute(sa.text("REFRESH TABLE testdrive;")) + + def read(self): + """ + Read data from the database. + """ + with self.engine.begin() as session: + cursor = session.execute(sa.text("SELECT * FROM testdrive;")) + return cursor.fetchall() + + def reflect(self): + """ + Reflect the table schema from the database. + """ + meta = sa.MetaData() + # Debugging. + # self.trace() + reflected_table = sa.Table("testdrive", meta, autoload_with=self.engine) + print("Table information:") + print(f"Table: {reflected_table}") + print(f"Columns: {reflected_table.columns}") + print(f"Constraints: {reflected_table.constraints}") + print(f"Primary key: {reflected_table.primary_key}") + + @staticmethod + def trace(): + """ + Trace execution flow through SQLAlchemy. + + pip install hunter + """ + from hunter import Q, trace + + constraint = Q(module_startswith="sqlalchemy") + trace(constraint) + + +def run_example(dsn: str): + example = SynchronousTableExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + result = example.run() + print(result) + + # Reflect the table schema. + # example.reflect() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "urllib3": + dsn = "crate+urllib3://localhost:4200/doc" + elif driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + run_example(dsn) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/by-language/python-sqlalchemy/test.py b/by-language/python-sqlalchemy/test.py index 3c21b292..f0345f06 100644 --- a/by-language/python-sqlalchemy/test.py +++ b/by-language/python-sqlalchemy/test.py @@ -34,3 +34,18 @@ def test_insert_pandas(): def test_insert_dask(): cmd = "time python insert_dask.py" run(cmd) + + +def test_sync_table(): + cmd = "time python sync_table.py urllib3 psycopg" + run(cmd) + + +def test_async_table(): + cmd = "time python async_table.py psycopg asyncpg" + run(cmd) + + +def test_async_streaming(): + cmd = "time python async_streaming.py psycopg asyncpg" + run(cmd)