Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BulkResponse wrapper for improved decoding of HTTP bulk responses #649

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ Unreleased
"Threads may share the module, but not connections."
- Added ``error_trace`` to string representation of an Error to relay
server stacktraces into exception messages.
- Added ``BulkResponse`` wrapper for improved decoding of CrateDB HTTP bulk
responses including ``rowcount=`` items.

.. _Migrate from crate.client to sqlalchemy-cratedb: https://cratedb.com/docs/sqlalchemy-cratedb/migrate-from-crate-client.html
.. _sqlalchemy-cratedb: https://pypi.org/project/sqlalchemy-cratedb/
Expand Down
2 changes: 1 addition & 1 deletion docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ SRC_MAKE := $(MAKE) -f $(SRC_DIR)/rules.mk

# Parse the JSON file
BUILD_VERSION := $(shell cat $(BUILD_JSON) | \
python -c 'import json, sys; print(json.load(sys.stdin)["message"])')
python3 -c 'import json, sys; print(json.load(sys.stdin)["message"])')

ifeq ($(BUILD_VERSION),)
$(error No build version specified in `$(BUILD_JSON)`.)
Expand Down
14 changes: 14 additions & 0 deletions docs/query.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ for every tuple. This dictionary always has a ``rowcount`` key, indicating
how many rows were inserted. If an error occurs, the ``rowcount`` value is
``-2``, and the dictionary may additionally have an ``error_message`` key.

The package includes a helper utility ``BulkResponse`` that supports parsing
such responses to bulk operation requests. It works like this::

from crate.client.result import BulkResponse

result = cursor.executemany(statement, records)
bulk_response = BulkResponse(records, result)

It provides properties ``failed_records``, ``record_count``, ``success_count``,
and ``failed_count``. ``failed_records`` will provide information which records
of the bulk operation failed to succeed, by evaluating ``{'rowcount': -2}``
items, and matching them against submitted records.


.. _selects:

Selecting data
Expand Down
68 changes: 68 additions & 0 deletions src/crate/client/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import typing as t
from functools import cached_property


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


class BulkResponse:
"""
Manage a response to a CrateDB bulk request.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

def __init__(
self,
records: t.List[t.Dict[str, t.Any]],
results: t.List[BulkResultItem]):
if records is None:
raise ValueError("Processing a bulk response without records is an invalid operation")
if results is None:
raise ValueError("Processing a bulk response without results is an invalid operation")
self.records = records
self.results = results

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.

CrateDB signals failed inserts using `rowcount=-2`.

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.records, self.results):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def record_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.records:
return 0

Check warning on line 53 in src/crate/client/result.py

View check run for this annotation

Codecov / codecov/patch

src/crate/client/result.py#L53

Added line #L53 was not covered by tests
return len(self.records)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.record_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)
88 changes: 88 additions & 0 deletions src/crate/client/test_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import sys
import unittest

from crate import client
from crate.client.exceptions import ProgrammingError
from crate.client.test_support import setUpCrateLayerBaseline, tearDownDropEntitiesBaseline
from crate.testing.settings import crate_host


class BulkOperationTest(unittest.TestCase):

def setUp(self):
setUpCrateLayerBaseline(self)

def tearDown(self):
tearDownDropEntitiesBaseline(self)

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_with_bulk_response_partial(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that only partially succeeds.
invalid_records = [(1, "Hotzenplotz 1"), (1, "Hotzenplotz 2")]
result = cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", invalid_records)

# Verify CrateDB response.
self.assertEqual(result, [{"rowcount": 1}, {"rowcount": -2}])

# Verify decoded response.
bulk_response = BulkResponse(invalid_records, result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this more carefully, I don't like that BulkResponse is something you need con construct manually. Couldn't we directly return it from the insert execution, instead of a list of BulkResultItem?

Copy link
Member Author

@amotl amotl Oct 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi. I don't think we can do anything like this here, because the Python database driver must adhere to the Python Database API Specification, so the BulkResponse is just meant as an optional extension to it.

Copy link
Member Author

@amotl amotl Oct 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

af32409, just added, provides a bit of documentation for that extension in the section about bulk operations.

self.assertEqual(bulk_response.failed_records, [(1, "Hotzenplotz 2")])
self.assertEqual(bulk_response.record_count, 2)
self.assertEqual(bulk_response.success_count, 1)
self.assertEqual(bulk_response.failed_count, 1)

cursor.execute("REFRESH TABLE foobar;")
cursor.execute("SELECT * FROM foobar;")
result = cursor.fetchall()
self.assertEqual(result, [[1, "Hotzenplotz 1"]])

cursor.close()
connection.close()

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_executemany_empty(self):

connection = client.connect(crate_host)
cursor = connection.cursor()

# Run SQL DDL.
cursor.execute("CREATE TABLE foobar (id INTEGER PRIMARY KEY, name STRING);")

# Run a batch insert that is empty.
with self.assertRaises(ProgrammingError) as cm:
cursor.executemany("INSERT INTO foobar (id, name) VALUES (?, ?)", [])
self.assertEqual(
str(cm.exception),
"SQLParseException[The query contains a parameter placeholder $1, "
"but there are only 0 parameter values]")

cursor.close()
connection.close()

@unittest.skipIf(sys.version_info < (3, 8), "BulkResponse needs Python 3.8 or higher")
def test_bulk_response_empty_records_or_results(self):

# Import at runtime is on purpose, to permit skipping the test case.
from crate.client.result import BulkResponse

with self.assertRaises(ValueError) as cm:
BulkResponse(records=None, results=None)
self.assertEqual(
str(cm.exception),
"Processing a bulk response without records is an invalid operation")

with self.assertRaises(ValueError) as cm:
BulkResponse(records=[], results=None)
self.assertEqual(
str(cm.exception),
"Processing a bulk response without results is an invalid operation")
Loading