Skip to content

Commit

Permalink
Python/SQLAlchemy: Demonstrate support for asyncpg and psycopg
Browse files Browse the repository at this point in the history
The `sqlalchemy-cratedb` package supports the vanilla HTTP-based
transport using urllib3, and the standard PostgreSQL drivers `asyncpg`
and `psycopg`.
  • Loading branch information
amotl committed Dec 23, 2023
1 parent 3f15707 commit 75dee5d
Show file tree
Hide file tree
Showing 6 changed files with 552 additions and 1 deletion.
4 changes: 4 additions & 0 deletions by-language/python-sqlalchemy/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ Run example programs::

time python insert_dask.py

time python sync_table.py urllib3 psycopg
time python async_table.py psycopg asyncpg
time python async_streaming.py psycopg asyncpg

Use ``insert_pandas.py`` to connect to any other database instance::

export DBURI="crate://crate@localhost:4200/"
Expand Down
174 changes: 174 additions & 0 deletions by-language/python-sqlalchemy/async_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""
About
=====
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
dialect, and exercise a few basic examples using the low-level table API, this
time in asynchronous mode.
Specific to the asynchronous mode of SQLAlchemy is the streaming of results:
> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()`
> method that returns an `AsyncResult` object. This result object uses a server-side cursor
> and provides an async/await API, such as an async iterator.
>
> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised.
The corresponding SQLAlchemy dialect identifiers are::
# PostgreSQL protocol on port 5432, using `psycopg`
crate+psycopg://crate@localhost:5432/doc
# PostgreSQL protocol on port 5432, using `asyncpg`
crate+asyncpg://crate@localhost:5432/doc
Synopsis
========
::
# Run CrateDB
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
python async_streaming.py psycopg
# Use PostgreSQL protocol, with `asyncpg`
python async_streaming.py asyncpg
# Use with both variants
python async_streaming.py psycopg asyncpg
Bugs
====
When using the `psycopg` driver, the program currently croaks like::
sqlalchemy.exc.InternalError: (psycopg.errors.InternalError_) Cannot find portal: c_10479c0a0_1
"""
import asyncio
import sys
import typing as t
from functools import lru_cache

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine

metadata = sa.MetaData()
table = sa.Table(
"t1",
metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("name", sa.String),
)


class AsynchronousTableStreamingExample:
"""
Demonstrate reading streamed results when using the CrateDB SQLAlchemy
dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
- https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html
"""

def __init__(self, dsn: str):
self.dsn = dsn

@property
@lru_cache
def engine(self):
"""
Provide an SQLAlchemy engine object.
"""
return create_async_engine(self.dsn, echo=True)

async def run(self):
"""
Run the whole recipe.
"""
await self.create_and_insert()
await self.read_buffered()
await self.read_streaming()

async def create_and_insert(self):
"""
Create table schema, completely dropping it upfront, and insert a few records.
"""
# conn is an instance of AsyncConnection
async with self.engine.begin() as conn:
# to support SQLAlchemy DDL methods as well as legacy functions, the
# AsyncConnection.run_sync() awaitable method will pass a "sync"
# version of the AsyncConnection object to any synchronous method,
# where synchronous IO calls will be transparently translated for
# await.
await conn.run_sync(metadata.drop_all, checkfirst=True)
await conn.run_sync(metadata.create_all)

# for normal statement execution, a traditional "await execute()"
# pattern is used.
await conn.execute(
table.insert(),
[{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}],
)

# CrateDB specifics to flush/synchronize the write operation.
await conn.execute(sa.text("REFRESH TABLE t1;"))

async def read_buffered(self):
"""
Read data from the database, in buffered mode.
"""
async with self.engine.connect() as conn:
# the default result object is the
# sqlalchemy.engine.Result object
result = await conn.execute(table.select())

# the results are buffered so no await call is necessary
# for this case.
print(result.fetchall())

async def read_streaming(self):
"""
Read data from the database, in streaming mode.
"""
async with self.engine.connect() as conn:

# for a streaming result that buffers only segments of the
# result at time, the AsyncConnection.stream() method is used.
# this returns a sqlalchemy.ext.asyncio.AsyncResult object.
async_result = await conn.stream(table.select())

# this object supports async iteration and awaitable
# versions of methods like .all(), fetchmany(), etc.
async for row in async_result:
print(row)


async def run_example(dsn: str):
example = AsynchronousTableStreamingExample(dsn)

# Run a basic conversation.
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
await example.run()


def run_drivers(drivers: t.List[str]):
for driver in drivers:
if driver == "psycopg":
dsn = "crate+psycopg://crate@localhost:5432/doc"
elif driver == "asyncpg":
dsn = "crate+asyncpg://crate@localhost:5432/doc"
else:
raise ValueError(f"Unknown driver: {driver}")

asyncio.run(run_example(dsn))


if __name__ == "__main__":

drivers = sys.argv[1:]
if not drivers:
raise ValueError("Please select driver")
run_drivers(drivers)
193 changes: 193 additions & 0 deletions by-language/python-sqlalchemy/async_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""
About
=====
Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
dialect, and exercise a few basic examples using the low-level table API, this
time in asynchronous mode.
Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised.
The corresponding SQLAlchemy dialect identifiers are::
# PostgreSQL protocol on port 5432, using `psycopg`
crate+psycopg://crate@localhost:5432/doc
# PostgreSQL protocol on port 5432, using `asyncpg`
crate+asyncpg://crate@localhost:5432/doc
Synopsis
========
::
# Run CrateDB
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate
# Use PostgreSQL protocol, with asynchronous support of `psycopg`
python async_table.py psycopg
# Use PostgreSQL protocol, with `asyncpg`
python async_table.py asyncpg
# Use with both variants
python async_table.py psycopg asyncpg
"""
import asyncio
import sys
import typing as t
from functools import lru_cache

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine


class AsynchronousTableExample:
"""
Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
"""

def __init__(self, dsn: str):
self.dsn = dsn

@property
@lru_cache
def engine(self):
"""
Provide an SQLAlchemy engine object.
"""
return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True)

@property
@lru_cache
def table(self):
"""
Provide an SQLAlchemy table object.
"""
metadata = sa.MetaData()
return sa.Table(
"testdrive",
metadata,
sa.Column("x", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("y", sa.Integer),
)

async def conn_run_sync(self, func: t.Callable, *args, **kwargs):
"""
To support SQLAlchemy DDL methods as well as legacy functions, the
AsyncConnection.run_sync() awaitable method will pass a "sync"
version of the AsyncConnection object to any synchronous method,
where synchronous IO calls will be transparently translated for
await.
https://docs.sqlalchemy.org/en/20/_modules/asyncio/basic.html
"""
# `conn` is an instance of `AsyncConnection`
async with self.engine.begin() as conn:
return await conn.run_sync(func, *args, **kwargs)

async def run(self):
"""
Run the whole recipe, returning the result from the "read" step.
"""
await self.create()
await self.insert(sync=True)
return await self.read()

async def create(self):
"""
Create table schema, completely dropping it upfront.
"""
await self.conn_run_sync(self.table.drop, checkfirst=True)
await self.conn_run_sync(self.table.create)

async def insert(self, sync: bool = False):
"""
Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account.
"""
async with self.engine.begin() as conn:
stmt = self.table.insert().values(x=1, y=42)
await conn.execute(stmt)
stmt = self.table.insert().values(x=2, y=42)
await conn.execute(stmt)
if sync and self.dsn.startswith("crate"):
await conn.execute(sa.text("REFRESH TABLE testdrive;"))

async def read(self):
"""
Read data from the database.
"""
async with self.engine.begin() as conn:
cursor = await conn.execute(sa.text("SELECT * FROM testdrive;"))
return cursor.fetchall()

async def reflect(self):
"""
Reflect the table schema from the database.
"""

# Debugging.
# self.trace()

def reflect(session):
"""
A function written in "synchronous" style that will be invoked
within the asyncio event loop.
The session object passed is a traditional orm.Session object with
synchronous interface.
https://docs.sqlalchemy.org/en/20/_modules/asyncio/greenlet_orm.html
"""
meta = sa.MetaData()
reflected_table = sa.Table("testdrive", meta, autoload_with=session)
print("Table information:")
print(f"Table: {reflected_table}")
print(f"Columns: {reflected_table.columns}")
print(f"Constraints: {reflected_table.constraints}")
print(f"Primary key: {reflected_table.primary_key}")

return await self.conn_run_sync(reflect)

@staticmethod
def trace():
"""
Trace execution flow through SQLAlchemy.
pip install hunter
"""
from hunter import Q, trace

constraint = Q(module_startswith="sqlalchemy")
trace(constraint)


async def run_example(dsn: str):
example = AsynchronousTableExample(dsn)

# Run a basic conversation.
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
result = await example.run()
print(result)

# Reflect the table schema.
await example.reflect()


def run_drivers(drivers: t.List[str]):
for driver in drivers:
if driver == "psycopg":
dsn = "crate+psycopg://crate@localhost:5432/doc"
elif driver == "asyncpg":
dsn = "crate+asyncpg://crate@localhost:5432/doc"
else:
raise ValueError(f"Unknown driver: {driver}")

asyncio.run(run_example(dsn))


if __name__ == "__main__":

drivers = sys.argv[1:]
if not drivers:
raise ValueError("Please select driver")
run_drivers(drivers)
2 changes: 1 addition & 1 deletion by-language/python-sqlalchemy/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
click<9
colorlog<7
crate[sqlalchemy]
dask==2023.12.1
pandas<2.2
sqlalchemy>=2,<2.1
sqlalchemy-cratedb[all] @ git+https://github.com/crate-workbench/sqlalchemy-cratedb@amo/postgresql-async
Loading

0 comments on commit 75dee5d

Please sign in to comment.