Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] SA20: Add compatibility adapters for psycopg3 and asyncpg dialects #532

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ Changes for crate
Unreleased
==========

- SQLAlchemy: Add compatibility adapter for SQLAlchemy's ``psycopg`` and ``asyncpg``
dialects, introducing the ``crate+psycopg://``, ``crate+asyncpg://``, and
``crate+urllib3://`` dialect identifiers. The asynchronous variant of ``psycopg``
is also supported.
- SQLAlchemy: Add example demonstrating asynchronous streaming mode, using server-side
cursors


2023/03/02 0.30.1
=================
Expand Down
172 changes: 172 additions & 0 deletions examples/async_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""
About
=====

Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
dialect, and exercise a few basic examples using the low-level table API, this
time in asynchronous mode.

Specific to the asynchronous mode of SQLAlchemy is the streaming of results:

> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()`
> method that returns an `AsyncResult` object. This result object uses a server-side cursor
> and provides an async/await API, such as an async iterator.
>
> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core

Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised.
The corresponding SQLAlchemy dialect identifiers are::

# PostgreSQL protocol on port 5432, using `psycopg`
crate+psycopg://crate@localhost:5432/doc

# PostgreSQL protocol on port 5432, using `asyncpg`
crate+asyncpg://crate@localhost:5432/doc

Synopsis
========
::

# Run CrateDB
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate

# Use PostgreSQL protocol, with asynchronous support of `psycopg`
python examples/async_streaming.py psycopg

# Use PostgreSQL protocol, with `asyncpg`
python examples/async_streaming.py asyncpg

# Use with both variants
python examples/async_streaming.py psycopg asyncpg

Bugs
====

When using the `psycopg` driver, the program currently croaks like::

sqlalchemy.exc.InternalError: (psycopg.errors.InternalError_) Cannot find portal: c_10479c0a0_1

"""
import asyncio
import sys
import typing as t
from functools import lru_cache

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine

metadata = sa.MetaData()
table = sa.Table(
"t1",
metadata,
sa.Column("id", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("name", sa.String),
)


class AsynchronousTableStreamingExample:
"""
Demonstrate reading streamed results when using the CrateDB SQLAlchemy
dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.

- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
- https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/basic.html
"""

def __init__(self, dsn: str):
self.dsn = dsn

@property
@lru_cache
def engine(self):
"""
Provide an SQLAlchemy engine object.
"""
return create_async_engine(self.dsn, echo=True)

async def run(self):
"""
Run the whole recipe.
"""
await self.create_and_insert()
await self.read_buffered()
await self.read_streaming()

async def create_and_insert(self):
"""
Create table schema, completely dropping it upfront, and insert a few records.
"""
# conn is an instance of AsyncConnection
async with self.engine.begin() as conn:
# to support SQLAlchemy DDL methods as well as legacy functions, the
# AsyncConnection.run_sync() awaitable method will pass a "sync"
# version of the AsyncConnection object to any synchronous method,
# where synchronous IO calls will be transparently translated for
# await.
await conn.run_sync(metadata.drop_all, checkfirst=True)
await conn.run_sync(metadata.create_all)

# for normal statement execution, a traditional "await execute()"
# pattern is used.
await conn.execute(
table.insert(),
[{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}],
)

# CrateDB specifics to flush/synchronize the write operation.
await conn.execute(sa.text("REFRESH TABLE t1;"))

async def read_buffered(self):
"""
Read data from the database, in buffered mode.
"""
async with self.engine.connect() as conn:
# the default result object is the
# sqlalchemy.engine.Result object
result = await conn.execute(table.select())

# the results are buffered so no await call is necessary
# for this case.
print(result.fetchall())

async def read_streaming(self):
"""
Read data from the database, in streaming mode.
"""
async with self.engine.connect() as conn:

# for a streaming result that buffers only segments of the
# result at time, the AsyncConnection.stream() method is used.
# this returns a sqlalchemy.ext.asyncio.AsyncResult object.
async_result = await conn.stream(table.select())

# this object supports async iteration and awaitable
# versions of methods like .all(), fetchmany(), etc.
async for row in async_result:
print(row)


async def run_example(dsn: str):
example = AsynchronousTableStreamingExample(dsn)

# Run a basic conversation.
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
await example.run()


def run_drivers(drivers: t.List[str]):
for driver in drivers:
if driver == "psycopg":
dsn = "crate+psycopg://crate@localhost:5432/doc"
elif driver == "asyncpg":
dsn = "crate+asyncpg://crate@localhost:5432/doc"
else:
raise ValueError(f"Unknown driver: {driver}")

asyncio.run(run_example(dsn))


if __name__ == "__main__":

drivers = sys.argv[1:]
run_drivers(drivers)
191 changes: 191 additions & 0 deletions examples/async_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""
About
=====

Example program to demonstrate how to connect to CrateDB using its SQLAlchemy
dialect, and exercise a few basic examples using the low-level table API, this
time in asynchronous mode.

Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised.
The corresponding SQLAlchemy dialect identifiers are::

# PostgreSQL protocol on port 5432, using `psycopg`
crate+psycopg://crate@localhost:5432/doc

# PostgreSQL protocol on port 5432, using `asyncpg`
crate+asyncpg://crate@localhost:5432/doc

Synopsis
========
::

# Run CrateDB
docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate

# Use PostgreSQL protocol, with asynchronous support of `psycopg`
python examples/async_table.py psycopg

# Use PostgreSQL protocol, with `asyncpg`
python examples/async_table.py asyncpg

# Use with both variants
python examples/async_table.py psycopg asyncpg

"""
import asyncio
import sys
import typing as t
from functools import lru_cache

import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine


class AsynchronousTableExample:
"""
Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers.
"""

def __init__(self, dsn: str):
self.dsn = dsn

@property
@lru_cache
def engine(self):
"""
Provide an SQLAlchemy engine object.
"""
return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True)

@property
@lru_cache
def table(self):
"""
Provide an SQLAlchemy table object.
"""
metadata = sa.MetaData()
return sa.Table(
"testdrive",
metadata,
sa.Column("x", sa.Integer, primary_key=True, autoincrement=False),
sa.Column("y", sa.Integer),
)

async def conn_run_sync(self, func: t.Callable, *args, **kwargs):
"""
To support SQLAlchemy DDL methods as well as legacy functions, the
AsyncConnection.run_sync() awaitable method will pass a "sync"
version of the AsyncConnection object to any synchronous method,
where synchronous IO calls will be transparently translated for
await.

https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/basic.html
"""
# `conn` is an instance of `AsyncConnection`
async with self.engine.begin() as conn:
return await conn.run_sync(func, *args, **kwargs)

async def run(self):
"""
Run the whole recipe, returning the result from the "read" step.
"""
await self.create()
await self.insert(sync=True)
return await self.read()

async def create(self):
"""
Create table schema, completely dropping it upfront.
"""
await self.conn_run_sync(self.table.drop, checkfirst=True)
await self.conn_run_sync(self.table.create)

async def insert(self, sync: bool = False):
"""
Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account.
"""
async with self.engine.begin() as conn:
stmt = self.table.insert().values(x=1, y=42)
await conn.execute(stmt)
stmt = self.table.insert().values(x=2, y=42)
await conn.execute(stmt)
if sync and self.dsn.startswith("crate"):
await conn.execute(sa.text("REFRESH TABLE testdrive;"))

async def read(self):
"""
Read data from the database.
"""
async with self.engine.begin() as conn:
cursor = await conn.execute(sa.text("SELECT * FROM testdrive;"))
return cursor.fetchall()

async def reflect(self):
"""
Reflect the table schema from the database.
"""

# Debugging.
# self.trace()

def reflect(session):
"""
A function written in "synchronous" style that will be invoked
within the asyncio event loop.

The session object passed is a traditional orm.Session object with
synchronous interface.

https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/greenlet_orm.html
"""
meta = sa.MetaData()
reflected_table = sa.Table("testdrive", meta, autoload_with=session)
print("Table information:")
print(f"Table: {reflected_table}")
print(f"Columns: {reflected_table.columns}")
print(f"Constraints: {reflected_table.constraints}")
print(f"Primary key: {reflected_table.primary_key}")

return await self.conn_run_sync(reflect)

@staticmethod
def trace():
"""
Trace execution flow through SQLAlchemy.

pip install hunter
"""
from hunter import Q, trace

constraint = Q(module_startswith="sqlalchemy")
trace(constraint)


async def run_example(dsn: str):
example = AsynchronousTableExample(dsn)

# Run a basic conversation.
# It also includes a catalog inquiry at `table.drop(checkfirst=True)`.
result = await example.run()
print(result)

# Reflect the table schema.
await example.reflect()


def run_drivers(drivers: t.List[str]):
for driver in drivers:
if driver == "psycopg":
dsn = "crate+psycopg://crate@localhost:5432/doc"
elif driver == "asyncpg":
dsn = "crate+asyncpg://crate@localhost:5432/doc"
else:
raise ValueError(f"Unknown driver: {driver}")

asyncio.run(run_example(dsn))


if __name__ == "__main__":

drivers = sys.argv[1:]
run_drivers(drivers)
Loading