Skip to content

Commit

Permalink
MongoDB: Improve error handling wrt. bulk operations vs. usability
Browse files Browse the repository at this point in the history
In order to have both, efficient bulk insert operations, and on-the-spot
error messages on records that fail to insert, let's introduce a
two-stage approach:

First, try to insert a batch. When it fails, determine invalid records
and insert them one-by-one, in order to relay corresponding error
messages to the user.
  • Loading branch information
amotl committed Sep 13, 2024
1 parent 9e52464 commit 72f608b
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ def mongodb_copy(
for task in tasks:
try:
outcome_task = task.start()
except (Exception, PanicException):
logger.exception("Task failed")
except (Exception, PanicException) as ex:
logger.error(f"Task failed: {ex}")

Check warning on line 172 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L171-L172

Added lines #L171 - L172 were not covered by tests
outcome_task = False
outcome = outcome and outcome_task

Expand Down
74 changes: 52 additions & 22 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB
from pymongo.cursor import Cursor
from sqlalchemy.exc import ProgrammingError
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from zyp.model.collection import CollectionAddress
Expand All @@ -16,7 +17,9 @@
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.database import BulkResponse

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,9 +47,7 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if isinstance(data, Cursor):
data = list(data)
if not isinstance(data, list):
if not isinstance(data, Cursor) and not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
Expand All @@ -72,10 +73,12 @@ def __init__(
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "raise",
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Expand Down Expand Up @@ -124,10 +127,14 @@ def start(self):
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out: int = 0

count_success_total: int = 0
count_error_total: int = 0

# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for documents in self.mongodb_adapter.query():
current_batch_size = len(documents)

progress_bar.set_description("ACQUIRE")

try:
Expand All @@ -138,27 +145,50 @@ def start(self):
raise
continue

# Submit operation to CrateDB.
statement = sa.text(operation.statement)

# Submit operation to CrateDB, using `bulk_args`.
progress_bar.set_description("SUBMIT ")
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise IOError("Unable to insert one or more records")
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(
f"Executing operation failed: {ex}\n"
f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]"
cursor = connection.execute(statement=statement, parameters=operation.parameters)
connection.commit()
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
count_success_total += bulk_response.success_count
progress_bar.update(n=bulk_response.success_count)

# When a batch is of size one, an exception is raised.
# Just signal the same condition as if a batch would have failed.
except ProgrammingError:
failed_records = [operation.parameters]
count_success_local = 0

Check warning on line 166 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L164-L166

Added lines #L164 - L166 were not covered by tests

# When bulk operations fail, try inserting failed records record-by-record,
# in order to relay proper error messages to the user.
if failed_records:
logger.warning(
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. "
f"Falling back to per-record operations."
)
if self.on_error == "raise":
raise
continue
for record in failed_records:
try:
connection.execute(statement=statement, parameters=record)
connection.commit()
count_success_total += 1

Check warning on line 179 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L178-L179

Added lines #L178 - L179 were not covered by tests
except Exception as ex:
logger.warning(f"Operation failed: {ex}")
logger.debug(f"Failing record: {record}")
count_error_total += 1
if self.on_error == "raise":
raise

Check warning on line 185 in cratedb_toolkit/io/mongodb/copy.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/copy.py#L185

Added line #L185 was not covered by tests
progress_bar.update(n=1)

progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:

logger.info(f"Number of records written: success={count_success_total}, error={count_error_total}")
if count_success_total == 0:
logger.warning("No data has been copied")

return True
Empty file.
19 changes: 19 additions & 0 deletions cratedb_toolkit/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from sqlalchemy_cratedb import dialect


def do_executemany(self, cursor, statement, parameters, context=None):
"""
Improved version of `do_executemany` that stores its response into the request context instance.
TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`.
"""
result = cursor.executemany(statement, parameters)
if context is not None:
context.last_executemany_result = result


def monkeypatch_executemany():
"""
Enable improved version of `do_executemany`.
"""
dialect.do_executemany = do_executemany
65 changes: 65 additions & 0 deletions cratedb_toolkit/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import logging
import os
import typing as t
from functools import cached_property
from pathlib import Path

import sqlalchemy as sa
import sqlparse
from attrs import define
from boltons.urlutils import URL
from cratedb_sqlparse import sqlparse as sqlparse_cratedb
from sqlalchemy.exc import ProgrammingError
Expand Down Expand Up @@ -428,3 +430,66 @@ def get_table_names(sql: str) -> t.List[t.List[str]]:
local_names.append(table.name)
names.append(local_names)
return names


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


@define
class BulkResponse:
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
TODO: Think about refactoring this to `sqlalchemy_cratedb.support`.
"""

parameters: t.Union[t.List[t.Dict[str, t.Any]], None]
cratedb_bulk_result: t.Union[t.List[BulkResultItem], None]

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.
CrateDB signals failed insert using `rowcount=-2`.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.parameters is None or self.cratedb_bulk_result is None:
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.parameters, self.cratedb_bulk_result):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def parameter_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.parameters:
return 0

Check warning on line 480 in cratedb_toolkit/util/database.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/util/database.py#L480

Added line #L480 was not covered by tests
return len(self.parameters)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.parameter_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)
2 changes: 2 additions & 0 deletions tests/io/mongodb/mixed.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"_id":1,"name":"Foo","date":{"$date":"2011-01-14T08:00:00Z"}}
{"_id":2,"name":"Bar","date":{"$date":"2011-01-15T08:00:00Z"},"nested_array":[[1,2]]}
22 changes: 21 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb):
assert cratedb.database.count_records("testdrive.books-relaxed") == 4


def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
def test_mongodb_copy_filesystem_json_relaxed_success(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
"""
Expand Down Expand Up @@ -187,6 +187,26 @@ def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
assert timestamp_type == "bigint"


def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record.
"""

# Define source and target URLs.
json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
mongodb_copy(json_resource, cratedb_url)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 1

assert "Dynamic nested arrays are not supported" in caplog.text


def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
Expand Down

0 comments on commit 72f608b

Please sign in to comment.