From cd318c313b5cca44977fc5bb15142de0d3b7852b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 22:11:11 +0200 Subject: [PATCH 1/3] DynamoDB Full: Adjust parameter names, use `batch-size` not `page-size` --- cratedb_toolkit/io/dynamodb/adapter.py | 4 ++-- cratedb_toolkit/io/dynamodb/copy.py | 4 ++-- doc/io/dynamodb/loader.md | 19 +++++++++++++------ tests/io/dynamodb/test_adapter.py | 2 +- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/cratedb_toolkit/io/dynamodb/adapter.py b/cratedb_toolkit/io/dynamodb/adapter.py index c3e55ef1..6d88a5e1 100644 --- a/cratedb_toolkit/io/dynamodb/adapter.py +++ b/cratedb_toolkit/io/dynamodb/adapter.py @@ -23,7 +23,7 @@ def __init__(self, dynamodb_url: URL): def scan( self, table_name: str, - page_size: int = 1000, + batch_size: int = 100, consistent_read: bool = False, on_error: t.Literal["log", "raise"] = "log", ) -> t.Generator[t.Dict, None, None]: @@ -35,7 +35,7 @@ def scan( key = None while True: try: - scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": page_size} + scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": batch_size} if key is not None: scan_kwargs.update({"ExclusiveStartKey": key}) response = self.dynamodb_client.scan(**scan_kwargs) diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py index 8f99bef6..c059faa5 100644 --- a/cratedb_toolkit/io/dynamodb/copy.py +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -40,7 +40,7 @@ def __init__( self.progress = progress self.debug = debug - self.page_size: int = int(self.dynamodb_url.query.get("page-size", 1000)) + self.batch_size: int = int(self.dynamodb_url.query.get("batch-size", 100)) self.consistent_read: bool = asbool(self.dynamodb_url.query.get("consistent-read", False)) def start(self): @@ -63,7 +63,7 @@ def start(self): for result in self.dynamodb_adapter.scan( table_name=self.dynamodb_table, consistent_read=self.consistent_read, - page_size=self.page_size, + batch_size=self.batch_size, ): result_size = len(result["Items"]) try: diff --git a/doc/io/dynamodb/loader.md b/doc/io/dynamodb/loader.md index 9d3a2e78..63ea4d23 100644 --- a/doc/io/dynamodb/loader.md +++ b/doc/io/dynamodb/loader.md @@ -27,18 +27,25 @@ ctk show table "testdrive.demo" ## Options -### `page-size` -The source URL accepts the `page-size` option to configure DynamoDB -[pagination]. The default value is `1000`. +### Batch Size +The source URL accepts the `batch-size` option to configure DynamoDB +[pagination]. The default value is `100`. ```shell -ctk load table .../ProductCatalog?region=us-east-1&page-size=5000 +ctk load table .../ProductCatalog?batch-size=5000 ``` -### `consistent-read` +### Consistent Read The source URL accepts the `consistent-read` option to configure DynamoDB [read consistency]. The default value is `false`. ```shell -ctk load table .../ProductCatalog?region=us-east-1&consistent-read=true +ctk load table .../ProductCatalog?consistent-read=true +``` + +### Region +The source URL accepts the `region` option to configure the AWS region +label. The default value is `us-east-1`. +```shell +ctk load table .../ProductCatalog?region=eu-central-1 ``` diff --git a/tests/io/dynamodb/test_adapter.py b/tests/io/dynamodb/test_adapter.py index f8c8e7ca..6da06ae7 100644 --- a/tests/io/dynamodb/test_adapter.py +++ b/tests/io/dynamodb/test_adapter.py @@ -38,5 +38,5 @@ def test_adapter_scan_failure_page_size(dynamodb): adapter = DynamoDBAdapter(URL(dynamodb_url)) with pytest.raises(ParamValidationError) as ex: - next(adapter.scan("demo", page_size=-1, on_error="raise")) + next(adapter.scan("demo", batch_size=-1, on_error="raise")) assert ex.match("Parameter validation failed:\nInvalid value for parameter Limit, value: -1, valid min value: 1") From dbbce504283b35caa816b21ebbec7efe36177603 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 29 Aug 2024 01:23:51 +0200 Subject: [PATCH 2/3] DynamoDB CDC: Add `ctk load table` interface for processing CDC events In contrast to the Lambda-based processor implementation, this one is a standalone one that can be used optimally in any Python environment, managed or not. --- .github/workflows/dynamodb.yml | 6 +- CHANGES.md | 1 + cratedb_toolkit/api/main.py | 9 + cratedb_toolkit/io/kinesis/__init__.py | 0 cratedb_toolkit/io/kinesis/adapter.py | 83 ++++++++ cratedb_toolkit/io/kinesis/api.py | 6 + cratedb_toolkit/io/kinesis/relay.py | 82 ++++++++ doc/io/dynamodb/cdc-lambda.md | 250 +++++++++++++++++++++++ doc/io/dynamodb/cdc.md | 270 +++++-------------------- doc/io/dynamodb/index.md | 1 + doc/io/dynamodb/loader.md | 3 + examples/aws/kinesis_put.py | 14 ++ pyproject.toml | 2 + tests/io/dynamodb/conftest.py | 10 +- tests/io/dynamodb/test_adapter.py | 6 +- tests/io/dynamodb/test_cli.py | 2 +- tests/io/dynamodb/test_copy.py | 2 +- tests/io/dynamodb/test_relay.py | 62 ++++++ tests/io/test_processor.py | 4 +- 19 files changed, 577 insertions(+), 236 deletions(-) create mode 100644 cratedb_toolkit/io/kinesis/__init__.py create mode 100644 cratedb_toolkit/io/kinesis/adapter.py create mode 100644 cratedb_toolkit/io/kinesis/api.py create mode 100644 cratedb_toolkit/io/kinesis/relay.py create mode 100644 doc/io/dynamodb/cdc-lambda.md create mode 100644 examples/aws/kinesis_put.py create mode 100644 tests/io/dynamodb/test_relay.py diff --git a/.github/workflows/dynamodb.yml b/.github/workflows/dynamodb.yml index d44d7ae2..cad71307 100644 --- a/.github/workflows/dynamodb.yml +++ b/.github/workflows/dynamodb.yml @@ -42,8 +42,8 @@ jobs: os: ["ubuntu-latest"] # TODO: yarl, dependency of influxio, is currently not available on Python 3.12. # https://github.com/aio-libs/yarl/pull/942 - python-version: ["3.8", "3.11"] - localstack-version: ["3.7"] + python-version: ["3.9", "3.11"] + localstack-version: ["3.6"] env: OS: ${{ matrix.os }} @@ -78,7 +78,7 @@ jobs: pip install "setuptools>=64" --upgrade # Install package in editable mode. - pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop] + pip install --use-pep517 --prefer-binary --editable=.[dynamodb,kinesis,test,develop] - name: Run linter and software tests run: | diff --git a/CHANGES.md b/CHANGES.md index f16e011d..17fd9160 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -7,6 +7,7 @@ - MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://` - MongoDB: Optionally filter server collection using MongoDB query expression - MongoDB: Improve error handling wrt. bulk operations vs. usability +- DynamoDB CDC: Add `ctk load table` interface for processing CDC events ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/api/main.py b/cratedb_toolkit/api/main.py index db516915..b4060744 100644 --- a/cratedb_toolkit/api/main.py +++ b/cratedb_toolkit/api/main.py @@ -138,9 +138,18 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf logger.error("Data loading failed or incomplete") return False + elif source_url_obj.scheme.startswith("kinesis"): + if "+cdc" in source_url_obj.scheme: + from cratedb_toolkit.io.kinesis.api import kinesis_relay + + return kinesis_relay(str(source_url_obj), target_url) + else: + raise NotImplementedError("Loading full data via Kinesis not implemented yet") + elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]: if "+cdc" in source_url_obj.scheme: source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "") + from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True) diff --git a/cratedb_toolkit/io/kinesis/__init__.py b/cratedb_toolkit/io/kinesis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/io/kinesis/adapter.py b/cratedb_toolkit/io/kinesis/adapter.py new file mode 100644 index 00000000..f6f709a4 --- /dev/null +++ b/cratedb_toolkit/io/kinesis/adapter.py @@ -0,0 +1,83 @@ +import asyncio +import typing as t + +import boto3 +from aiobotocore.session import AioSession +from kinesis import Consumer, JsonProcessor, Producer +from yarl import URL + + +class KinesisAdapter: + def __init__(self, kinesis_url: URL): + self.async_session = AioSession() + self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password) + + self.session = boto3.Session( + aws_access_key_id=kinesis_url.user, + aws_secret_access_key=kinesis_url.password, + region_name=kinesis_url.query.get("region"), + ) + + self.endpoint_url = None + if kinesis_url.host and kinesis_url.host.lower() != "aws": + self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}" + self.kinesis_url = kinesis_url + self.region_name = kinesis_url.query.get("region") + self.stream_name = self.kinesis_url.path.lstrip("/") + self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url) + + def consumer_factory(self, **kwargs): + return Consumer( + stream_name=self.stream_name, + session=self.async_session, + endpoint_url=self.endpoint_url, + region_name=self.region_name, + processor=JsonProcessor(), + **kwargs, + ) + + def consume_forever(self, handler: t.Callable): + asyncio.run(self._consume_forever(handler)) + + def consume_once(self, handler: t.Callable): + asyncio.run(self._consume_once(handler)) + + async def _consume_forever(self, handler: t.Callable): + """ + Consume items from a Kinesis stream. + """ + async with self.consumer_factory( + # TODO: Make configurable. + create_stream=True, + iterator_type="TRIM_HORIZON", + sleep_time_no_records=0.2, + ) as consumer: + while True: + async for item in consumer: + handler(item) + + async def _consume_once(self, handler: t.Callable): + async with self.consumer_factory( + # TODO: Make configurable. + create_stream=True, + iterator_type="TRIM_HORIZON", + sleep_time_no_records=0.2, + ) as consumer: + async for item in consumer: + handler(item) + + def produce(self, data: t.Dict[str, t.Any]): + asyncio.run(self._produce(data)) + + async def _produce(self, data: t.Dict[str, t.Any]): + # Put item onto queue to be flushed via `put_records()`. + async with Producer( + stream_name=self.stream_name, + session=self.async_session, + endpoint_url=self.endpoint_url, + region_name=self.region_name, + # TODO: Make configurable. + create_stream=True, + buffer_time=0.01, + ) as producer: + await producer.put(data) diff --git a/cratedb_toolkit/io/kinesis/api.py b/cratedb_toolkit/io/kinesis/api.py new file mode 100644 index 00000000..9d302736 --- /dev/null +++ b/cratedb_toolkit/io/kinesis/api.py @@ -0,0 +1,6 @@ +from cratedb_toolkit.io.kinesis.relay import KinesisRelay + + +def kinesis_relay(source_url, target_url): + ka = KinesisRelay(source_url, target_url) + ka.start() diff --git a/cratedb_toolkit/io/kinesis/relay.py b/cratedb_toolkit/io/kinesis/relay.py new file mode 100644 index 00000000..9a9ec8de --- /dev/null +++ b/cratedb_toolkit/io/kinesis/relay.py @@ -0,0 +1,82 @@ +import base64 +import json +import logging + +import sqlalchemy as sa +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator +from tqdm import tqdm +from tqdm.contrib.logging import logging_redirect_tqdm +from yarl import URL + +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter +from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.util import DatabaseAdapter + +logger = logging.getLogger(__name__) + + +class KinesisRelay: + """ + Relay events from Kinesis into CrateDB table. + """ + + def __init__( + self, + kinesis_url: str, + cratedb_url: str, + ): + cratedb_address = DatabaseAddress.from_string(cratedb_url) + cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() + cratedb_table = cratedb_table_address.fullname + + self.kinesis_url = URL(kinesis_url) + self.kinesis_adapter = KinesisAdapter(self.kinesis_url) + self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False) + self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) + + if "dynamodb+cdc" in self.kinesis_url.scheme: + self.translator = DynamoDBCDCTranslator(table_name=self.cratedb_table) + else: + raise NotImplementedError(f"Data processing not implemented for {self.kinesis_url}") + + self.connection: sa.Connection + self.progress_bar: tqdm + + def start(self, once: bool = False): + """ + Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB. + """ + logger.info(f"Source: Kinesis stream={self.kinesis_adapter.stream_name} count=unknown") + self.connection = self.cratedb_adapter.engine.connect() + if not self.cratedb_adapter.table_exists(self.cratedb_table): + self.connection.execute(sa.text(self.translator.sql_ddl)) + self.connection.commit() + records_target = self.cratedb_adapter.count_records(self.cratedb_table) + logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") + # Harmonize logging and progress bar. + # https://github.com/tqdm/tqdm#redirecting-logging + self.progress_bar = tqdm() + with logging_redirect_tqdm(): + if once: + self.kinesis_adapter.consume_once(self.process_event) + else: + self.kinesis_adapter.consume_forever(self.process_event) + + def process_event(self, event): + try: + record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8")) + operation = self.translator.to_sql(record) + except Exception: + logger.exception("Decoding Kinesis event failed") + return + try: + # Process record. + self.connection.execute(sa.text(operation.statement), operation.parameters) + + # Processing alternating CDC events requires write synchronization. + self.connection.execute(sa.text(f"REFRESH TABLE {self.cratedb_table}")) + + self.connection.commit() + except sa.exc.ProgrammingError as ex: + logger.warning(f"Running query failed: {ex}") + self.progress_bar.update() diff --git a/doc/io/dynamodb/cdc-lambda.md b/doc/io/dynamodb/cdc-lambda.md new file mode 100644 index 00000000..30ac67dd --- /dev/null +++ b/doc/io/dynamodb/cdc-lambda.md @@ -0,0 +1,250 @@ +# DynamoDB CDC Relay with AWS Lambda + + +## What's Inside +- A convenient [Infrastructure as code (IaC)] procedure to define data pipelines on [AWS]. +- Written in Python, using [AWS CloudFormation] stack deployments. To learn + what's behind, see also [How CloudFormation works]. +- Code for running on [AWS Lambda] is packaged into [OCI] images, for efficient + delta transfers, built-in versioning, and testing purposes. + + +## Details +- This specific document includes a few general guidelines, and a + a few specifics coming from `examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py`. +- That program defines a pipeline which looks like this: + + DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB Cloud + +For exercising an AWS pipeline, you need two components: The IaC description, +and a record processor implementation for the AWS Lambda. + +The IaC description will deploy a complete software stack for demonstration +purposes, including a DynamoDB Table, connected to a Kinesis Stream. + + +## Prerequisites + +### CrateDB +This walkthrough assumes a running CrateDB cluster, and focuses on CrateDB Cloud. +It does not provide relevant guidelines to set up a cluster, yet. + +### OCI image +In order to package code for AWS Lambda functions packages into OCI images, +and use them, you will need to publish them to the AWS ECR container image +registry. + +You will need to authenticate your local Docker environment, and create a +container image repository once for each project using a different runtime +image. + +Define your AWS ID, region label, and repository name, to be able to use +the templated commands 1:1. +```shell +aws_id=831394476016 +aws_region=eu-central-1 +repository_name=kinesis-cratedb-processor-lambda +``` +```shell +aws ecr get-login-password --region=${aws_region} | \ + docker login --username AWS --password-stdin ${aws_id}.dkr.ecr.${aws_region}.amazonaws.com +``` + +(ecr-repository)= +### ECR Repository +Just once, before proceeding, create an image repository hosting the runtime +code for your Lambda function. +```shell +aws ecr create-repository --region=${aws_region} \ + --repository-name=${repository_name} --image-tag-mutability=MUTABLE +``` +In order to allow others to pull that image, you will need to define a +[repository policy] using the [set-repository-policy] subcommend of the AWS CLI. +In order to invoke that command, put the [](project:#ecr-repository-policy) +JSON definition into a file called `policy.json`. +```shell +aws ecr set-repository-policy --repository-name=${repository_name} --policy-text file://policy.json +``` + + +## Install +In order to exercise the example outlined below, you need to install +CrateDB Toolkit with the "kinesis" extension, because CDC data will be +relayed using AWS Kinesis. +```shell +pip install 'cratedb-toolkit[kinesis]' +``` + + +## Usage + +:::{rubric} Configure +::: +```shell +export CRATEDB_HTTP_URL='https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/' +export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' +``` + +:::{rubric} CrateDB Table +::: +The destination table name in CrateDB, where the CDC record +processor will re-materialize CDC events into. +```shell +pip install crash +crash --hosts "${CRATEDB_HTTP_URL}" -c 'CREATE TABLE "demo-sink" (data OBJECT(DYNAMIC));' +``` + +:::{rubric} Invoke pipeline +::: +Package the Lambda function, upload it, and deploy demo software stack. +```shell +python dynamodb_kinesis_lambda_oci_cratedb.py +``` +For example, choose those two variants: + +- IaC driver: [dynamodb_kinesis_lambda_oci_cratedb.py] +- Record processor: [kinesis_lambda.py] + +Putting them next to each other into a directory, and adjusting +`LambdaPythonImage(entrypoint_file=...)` to point to the second, +should be enough to get you started. + + +:::{rubric} Trigger CDC events +::: +Inserting a document into the DynamoDB table, and updating it, will trigger two CDC events. +```shell +READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}" +READING_WHERE="\"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42'" + +aws dynamodb execute-statement --statement \ + "INSERT INTO \"demo-source\" VALUE ${READING_SQL};" + +aws dynamodb execute-statement --statement \ + "UPDATE \"demo-source\" SET temperature=43.59 WHERE ${READING_WHERE};" +``` + +:::{rubric} Query data in CrateDB +::: +When the stream delivered the CDC data to the processor, and everything worked well, +data should have materialized in the target table in CrateDB. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'SELECT * FROM "demo-sink";' +``` + +:::{rubric} Shut down AWS stack +::: +In order to complete the experiment, you may want to shut down the AWS stack again. +```shell +aws cloudformation delete-stack --stack-name testdrive-dynamodb-dev +``` + + +## Appendix + +### Processor +Check status of Lambda function. +```shell +aws lambda get-function \ + --function-name arn:aws:lambda:eu-central-1:831394476016:function:testdrive-dynamodb-dev-lambda-processor +``` +Check status of stream mapping(s). +```shell +aws lambda list-event-source-mappings +``` +Check logs. +```shell +aws logs describe-log-groups +aws logs start-live-tail --log-group-identifiers arn:aws:logs:eu-central-1:831394476016:log-group:/aws/lambda/DynamoDBCrateDBProcessor +``` + +### Database + +There are a few utility commands that help you operate the stack, that have not +been absorbed yet. See also [Monitoring and troubleshooting Lambda functions]. + +Query records in CrateDB table. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'SELECT * FROM "demo-sink";' +``` + +Truncate CrateDB table. +```shell +crash --hosts "${CRATEDB_HTTP_URL}" --command \ + 'DELETE FROM "demo-sink";' +``` + +Query documents in DynamoDB table. +```shell +aws dynamodb execute-statement --statement \ + "SELECT * FROM \"demo-source\";" +``` + + +(ecr-repository-policy)= +### ECR Repository Policy +```json +{ + "Version": "2008-10-17", + "Statement": [ + { + "Sid": "allow public pull", + "Effect": "Allow", + "Principal": "*", + "Action": [ + "ecr:BatchCheckLayerAvailability", + "ecr:BatchGetImage", + "ecr:GetDownloadUrlForLayer" + ] + } + ] +} +``` + +## Troubleshooting + +### ECR Repository +If you receive such an error message, your session has expired, and you need +to re-run the authentication step. +```text +denied: Your authorization token has expired. Reauthenticate and try again. +``` + +This error message indicates your ECR repository does not exist. The solution +is to create it, using the command shared above. +```text +name unknown: The repository with name 'kinesis-cratedb-processor-lambda' does +not exist in the registry with id '831394476016' +``` + +### AWS CloudFormation +If you receive such an error, ... +```text +botocore.exceptions.ClientError: An error occurred (ValidationError) when calling +the CreateChangeSet operation: Stack:arn:aws:cloudformation:eu-central-1:931394475905:stack/testdrive-dynamodb-dev/ea8c32e0-492c-11ef-b9b3-06b708ecd03f +is in UPDATE_ROLLBACK_FAILED state and can not be updated. +``` +because some detail when deploying or updating the CloudFormation recipe fails, +the CloudFormation stack is stuck, and you will need to [continue rolling back +an update] manually. +```shell +aws cloudformation continue-update-rollback --stack-name testdrive-dynamodb-dev +``` + + + +[AWS]: https://en.wikipedia.org/wiki/Amazon_Web_Services +[AWS CloudFormation]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html +[AWS Lambda]: https://en.wikipedia.org/wiki/AWS_Lambda +[continue rolling back an update]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-continueupdaterollback.html +[dynamodb_kinesis_lambda_oci_cratedb.py]: https://github.com/crate/cratedb-toolkit/blob/main/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py +[example program]: https://github.com/crate/cratedb-toolkit/tree/main/examples/aws +[How CloudFormation works]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-overview.html +[Infrastructure as code (IaC)]: https://en.wikipedia.org/wiki/Infrastructure_as_code +[kinesis_lambda.py]: https://github.com/crate/cratedb-toolkit/blob/main/cratedb_toolkit/io/processor/kinesis_lambda.py +[Monitoring and troubleshooting Lambda functions]: https://docs.aws.amazon.com/lambda/latest/dg/lambda-monitoring.html +[OCI]: https://en.wikipedia.org/wiki/Open_Container_Initiative +[repository policy]: https://docs.aws.amazon.com/lambda/latest/dg/images-create.html#gettingstarted-images-permissions +[set-repository-policy]: https://docs.aws.amazon.com/cli/latest/reference/ecr/set-repository-policy.html diff --git a/doc/io/dynamodb/cdc.md b/doc/io/dynamodb/cdc.md index eb1917cf..fd5aac7b 100644 --- a/doc/io/dynamodb/cdc.md +++ b/doc/io/dynamodb/cdc.md @@ -1,250 +1,76 @@ +(dynamodb-cdc)= # DynamoDB CDC Relay +## About +Relay data changes from DynamoDB into CrateDB using a one-stop command +`ctk load table kinesis+dynamodb+cdc://...`, in order to facilitate +convenient data transfers to be used within data pipelines or ad hoc +operations. -## What's Inside -- A convenient [Infrastructure as code (IaC)] procedure to define data pipelines on [AWS]. -- Written in Python, using [AWS CloudFormation] stack deployments. To learn - what's behind, see also [How CloudFormation works]. -- Code for running on [AWS Lambda] is packaged into [OCI] images, for efficient - delta transfers, built-in versioning, and testing purposes. - - -## Details -- This specific document includes a few general guidelines, and a - a few specifics coming from `examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py`. -- That program defines a pipeline which looks like this: - - DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB Cloud - -For exercising an AWS pipeline, you need two components: The IaC description, -and a record processor implementation for the AWS Lambda. - -The IaC description will deploy a complete software stack for demonstration -purposes, including a DynamoDB Table, connected to a Kinesis Stream. - - -## Prerequisites - -### CrateDB -This walkthrough assumes a running CrateDB cluster, and focuses on CrateDB Cloud. -It does not provide relevant guidelines to set up a cluster, yet. - -### OCI image -In order to package code for AWS Lambda functions packages into OCI images, -and use them, you will need to publish them to the AWS ECR container image -registry. - -You will need to authenticate your local Docker environment, and create a -container image repository once for each project using a different runtime -image. - -Define your AWS ID, region label, and repository name, to be able to use -the templated commands 1:1. -```shell -aws_id=831394476016 -aws_region=eu-central-1 -repository_name=kinesis-cratedb-processor-lambda -``` -```shell -aws ecr get-login-password --region=${aws_region} | \ - docker login --username AWS --password-stdin ${aws_id}.dkr.ecr.${aws_region}.amazonaws.com -``` - -(ecr-repository)= -### ECR Repository -Just once, before proceeding, create an image repository hosting the runtime -code for your Lambda function. -```shell -aws ecr create-repository --region=${aws_region} \ - --repository-name=${repository_name} --image-tag-mutability=MUTABLE -``` -In order to allow others to pull that image, you will need to define a -[repository policy] using the [set-repository-policy] subcommend of the AWS CLI. -In order to invoke that command, put the [](project:#ecr-repository-policy) -JSON definition into a file called `policy.json`. -```shell -aws ecr set-repository-policy --repository-name=${repository_name} --policy-text file://policy.json -``` - +It taps into [Change data capture for DynamoDB Streams], in this case +using [Kinesis Data Streams]. It is the sister to the corresponding +full-load implementation, [](#dynamodb-loader). ## Install -In order to exercise the example outlined below, you need to install -CrateDB Toolkit with the "kinesis" extension, because CDC data will be -relayed using AWS Kinesis. ```shell -pip install 'cratedb-toolkit[kinesis]' +pip install --upgrade 'cratedb-toolkit[kinesis]' ``` - ## Usage - -:::{rubric} Configure -::: +Consume data from Kinesis Data Stream of DynamoDB CDC events into +CrateDB schema/table. ```shell -export CRATEDB_HTTP_URL='https://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/' -export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk load table kinesis+dynamodb+cdc://AWS_ACCESS_KEY:AWS_SECRET_ACCESS_KEY@aws/cdc-stream?region=eu-central-1 ``` -:::{rubric} CrateDB Table -::: -The destination table name in CrateDB, where the CDC record -processor will re-materialize CDC events into. +Query data in CrateDB. ```shell -pip install crash -crash --hosts "${CRATEDB_HTTP_URL}" -c 'CREATE TABLE "demo-sink" (data OBJECT(DYNAMIC));' +export CRATEDB_SQLALCHEMY_URL=crate://crate@localhost:4200/testdrive/demo +ctk shell --command "SELECT * FROM testdrive.demo;" +ctk show table "testdrive.demo" ``` -:::{rubric} Invoke pipeline -::: -Package the Lambda function, upload it, and deploy demo software stack. -```shell -python dynamodb_kinesis_lambda_oci_cratedb.py -``` -For example, choose those two variants: - -- IaC driver: [dynamodb_kinesis_lambda_oci_cratedb.py] -- Record processor: [kinesis_lambda.py] - -Putting them next to each other into a directory, and adjusting -`LambdaPythonImage(entrypoint_file=...)` to point to the second, -should be enough to get you started. - +## Variants -:::{rubric} Trigger CDC events -::: -Inserting a document into the DynamoDB table, and updating it, will trigger two CDC events. +### CrateDB Cloud +When aiming to transfer data to CrateDB Cloud, the shape of the target URL +looks like that. ```shell -READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}" -READING_WHERE="\"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42'" - -aws dynamodb execute-statement --statement \ - "INSERT INTO \"demo-source\" VALUE ${READING_SQL};" - -aws dynamodb execute-statement --statement \ - "UPDATE \"demo-source\" SET temperature=43.59 WHERE ${READING_WHERE};" -``` - -:::{rubric} Query data in CrateDB -::: -When the stream delivered the CDC data to the processor, and everything worked well, -data should have materialized in the target table in CrateDB. -```shell -crash --hosts "${CRATEDB_HTTP_URL}" --command \ - 'SELECT * FROM "demo-sink";' -``` - -:::{rubric} Shut down AWS stack -::: -In order to complete the experiment, you may want to shut down the AWS stack again. -```shell -aws cloudformation delete-stack --stack-name testdrive-dynamodb-dev -``` - - -## Appendix - -### Processor -Check status of Lambda function. -```shell -aws lambda get-function \ - --function-name arn:aws:lambda:eu-central-1:831394476016:function:testdrive-dynamodb-dev-lambda-processor -``` -Check status of stream mapping(s). -```shell -aws lambda list-event-source-mappings -``` -Check logs. -```shell -aws logs describe-log-groups -aws logs start-live-tail --log-group-identifiers arn:aws:logs:eu-central-1:831394476016:log-group:/aws/lambda/DynamoDBCrateDBProcessor -``` - -### Database - -There are a few utility commands that help you operate the stack, that have not -been absorbed yet. See also [Monitoring and troubleshooting Lambda functions]. - -Query records in CrateDB table. -```shell -crash --hosts "${CRATEDB_HTTP_URL}" --command \ - 'SELECT * FROM "demo-sink";' +export CRATEDB_SQLALCHEMY_URL='crate://admin:dZ...6LqB@testdrive.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true' ``` -Truncate CrateDB table. -```shell -crash --hosts "${CRATEDB_HTTP_URL}" --command \ - 'DELETE FROM "demo-sink";' -``` +### LocalStack +In order to exercise data transfers exclusively on your workstation, you can +use LocalStack to run DynamoDB and Kinesis service surrogates locally. See +also the [Get started with Kinesis on LocalStack] tutorial. -Query documents in DynamoDB table. +For addressing a Kinesis Data Stream on LocalStack, use a command of that shape. +See [Credentials for accessing LocalStack AWS API] for further information. ```shell -aws dynamodb execute-statement --statement \ - "SELECT * FROM \"demo-source\";" +ctk load table kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1 ``` +:::{tip} +LocalStack is a cloud service emulator that runs in a single container on your +laptop or in your CI environment. With LocalStack, you can run your AWS +applications or Lambdas entirely on your local machine without connecting to +a remote cloud provider. -(ecr-repository-policy)= -### ECR Repository Policy -```json -{ - "Version": "2008-10-17", - "Statement": [ - { - "Sid": "allow public pull", - "Effect": "Allow", - "Principal": "*", - "Action": [ - "ecr:BatchCheckLayerAvailability", - "ecr:BatchGetImage", - "ecr:GetDownloadUrlForLayer" - ] - } - ] -} -``` - -## Troubleshooting - -### ECR Repository -If you receive such an error message, your session has expired, and you need -to re-run the authentication step. -```text -denied: Your authorization token has expired. Reauthenticate and try again. -``` - -This error message indicates your ECR repository does not exist. The solution -is to create it, using the command shared above. -```text -name unknown: The repository with name 'kinesis-cratedb-processor-lambda' does -not exist in the registry with id '831394476016' -``` - -### AWS CloudFormation -If you receive such an error, ... -```text -botocore.exceptions.ClientError: An error occurred (ValidationError) when calling -the CreateChangeSet operation: Stack:arn:aws:cloudformation:eu-central-1:931394475905:stack/testdrive-dynamodb-dev/ea8c32e0-492c-11ef-b9b3-06b708ecd03f -is in UPDATE_ROLLBACK_FAILED state and can not be updated. -``` -because some detail when deploying or updating the CloudFormation recipe fails, -the CloudFormation stack is stuck, and you will need to [continue rolling back -an update] manually. +In order to invoke LocalStack on your workstation, you can use this Docker +command. ```shell -aws cloudformation continue-update-rollback --stack-name testdrive-dynamodb-dev +docker run \ + --rm -it \ + -p 127.0.0.1:4566:4566 \ + -p 127.0.0.1:4510-4559:4510-4559 \ + -v /var/run/docker.sock:/var/run/docker.sock \ + localstack/localstack:latest ``` +::: - -[AWS]: https://en.wikipedia.org/wiki/Amazon_Web_Services -[AWS CloudFormation]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html -[AWS Lambda]: https://en.wikipedia.org/wiki/AWS_Lambda -[continue rolling back an update]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/using-cfn-updating-stacks-continueupdaterollback.html -[dynamodb_kinesis_lambda_oci_cratedb.py]: https://github.com/crate/cratedb-toolkit/blob/main/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py -[example program]: https://github.com/crate/cratedb-toolkit/tree/main/examples/aws -[How CloudFormation works]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-overview.html -[Infrastructure as code (IaC)]: https://en.wikipedia.org/wiki/Infrastructure_as_code -[kinesis_lambda.py]: https://github.com/crate/cratedb-toolkit/blob/main/cratedb_toolkit/io/processor/kinesis_lambda.py -[Monitoring and troubleshooting Lambda functions]: https://docs.aws.amazon.com/lambda/latest/dg/lambda-monitoring.html -[OCI]: https://en.wikipedia.org/wiki/Open_Container_Initiative -[repository policy]: https://docs.aws.amazon.com/lambda/latest/dg/images-create.html#gettingstarted-images-permissions -[set-repository-policy]: https://docs.aws.amazon.com/cli/latest/reference/ecr/set-repository-policy.html +[Change data capture for DynamoDB Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html +[Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/ +[Get started with Kinesis on LocalStack]: https://docs.localstack.cloud/user-guide/aws/kinesis/ +[Kinesis Data Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html diff --git a/doc/io/dynamodb/index.md b/doc/io/dynamodb/index.md index 9b37c671..f61dfbcf 100644 --- a/doc/io/dynamodb/index.md +++ b/doc/io/dynamodb/index.md @@ -10,4 +10,5 @@ Using the DynamoDB subsystem, you can transfer data from and to DynamoDB. loader cdc +cdc-lambda ``` diff --git a/doc/io/dynamodb/loader.md b/doc/io/dynamodb/loader.md index 63ea4d23..51757e46 100644 --- a/doc/io/dynamodb/loader.md +++ b/doc/io/dynamodb/loader.md @@ -6,6 +6,9 @@ Load data from DynamoDB into CrateDB using a one-stop command `ctk load table dynamodb://...`, in order to facilitate convenient data transfers to be used within data pipelines or ad hoc operations. +It is the brother to the corresponding cdc-relay implementation, +[](#dynamodb-cdc). + ## Install ```shell pip install --upgrade 'cratedb-toolkit[dynamodb]' diff --git a/examples/aws/kinesis_put.py b/examples/aws/kinesis_put.py new file mode 100644 index 00000000..8e554f6d --- /dev/null +++ b/examples/aws/kinesis_put.py @@ -0,0 +1,14 @@ +from yarl import URL + +from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter +from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis + + +def main(): + ka = KinesisAdapter(URL("kinesis://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1")) + ka.produce(wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED)) + ka.produce(wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED)) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 135396e3..2db06f9a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -158,6 +158,8 @@ io = [ "sqlalchemy>=2", ] kinesis = [ + "aiobotocore<2.15", + "async-kinesis<1.2", "commons-codec>=0.0.14", "lorrystream[carabas]>=0.0.6", ] diff --git a/tests/io/dynamodb/conftest.py b/tests/io/dynamodb/conftest.py index 8806c508..c21d0e86 100644 --- a/tests/io/dynamodb/conftest.py +++ b/tests/io/dynamodb/conftest.py @@ -29,7 +29,7 @@ def setup(self): from cratedb_toolkit.testing.testcontainers.localstack import LocalStackContainerWithKeepalive self.container = LocalStackContainerWithKeepalive() - self.container.with_services("dynamodb") + self.container.with_services("dynamodb", "kinesis") self.container.start() def finalize(self): @@ -44,10 +44,14 @@ def reset(self): for database_name in RESET_TABLES: self.client.drop_database(database_name) - def get_connection_url(self): + def get_connection_url_dynamodb(self): url = URL(self.container.get_url()) return f"dynamodb://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}" + def get_connection_url_kinesis_dynamodb_cdc(self): + url = URL(self.container.get_url()) + return f"kinesis+dynamodb+cdc://LSIAQAAAAAAVNCBMPNSG:dummy@{url.host}:{url.port}" + @pytest.fixture(scope="session") def dynamodb_service(): @@ -71,4 +75,4 @@ def dynamodb(dynamodb_service): @pytest.fixture(scope="session") def dynamodb_test_manager(dynamodb_service): - return DynamoDBTestManager(dynamodb_service.get_connection_url()) + return DynamoDBTestManager(dynamodb_service.get_connection_url_dynamodb()) diff --git a/tests/io/dynamodb/test_adapter.py b/tests/io/dynamodb/test_adapter.py index 6da06ae7..cc419619 100644 --- a/tests/io/dynamodb/test_adapter.py +++ b/tests/io/dynamodb/test_adapter.py @@ -13,7 +13,7 @@ def test_adapter_scan_success(dynamodb): - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" adapter = DynamoDBAdapter(URL(dynamodb_url)) adapter.scan("foo") @@ -22,7 +22,7 @@ def test_adapter_scan_failure_consistent_read(dynamodb): """ Check supplying invalid parameters to `DynamoDBAdapter` fails as expected. """ - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" adapter = DynamoDBAdapter(URL(dynamodb_url)) with pytest.raises(ParamValidationError) as ex: @@ -34,7 +34,7 @@ def test_adapter_scan_failure_page_size(dynamodb): """ Check supplying invalid parameters to `DynamoDBAdapter` fails as expected. """ - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" adapter = DynamoDBAdapter(URL(dynamodb_url)) with pytest.raises(ParamValidationError) as ex: diff --git a/tests/io/dynamodb/test_cli.py b/tests/io/dynamodb/test_cli.py index 71d33e11..45f7e2fe 100644 --- a/tests/io/dynamodb/test_cli.py +++ b/tests/io/dynamodb/test_cli.py @@ -10,7 +10,7 @@ def test_dynamodb_load_table(caplog, cratedb, dynamodb, dynamodb_test_manager): """ CLI test: Invoke `ctk load table` for DynamoDB. """ - dynamodb_url = f"{dynamodb.get_connection_url()}/ProductCatalog?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/ProductCatalog?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with sample dataset. diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py index 2ced15b1..609f29b0 100644 --- a/tests/io/dynamodb/test_copy.py +++ b/tests/io/dynamodb/test_copy.py @@ -16,7 +16,7 @@ def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager) """ # Define source and target URLs. - dynamodb_url = f"{dynamodb.get_connection_url()}/demo?region=us-east-1" + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with data. diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py new file mode 100644 index 00000000..d65f8097 --- /dev/null +++ b/tests/io/dynamodb/test_relay.py @@ -0,0 +1,62 @@ +import time + +import botocore +import pytest + +from cratedb_toolkit.io.kinesis.relay import KinesisRelay +from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis + +pytestmark = pytest.mark.kinesis + +pytest.importorskip("commons_codec", reason="Only works with commons-codec installed") +pytest.importorskip("kinesis", reason="Only works with async-kinesis installed") + +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402 + + +def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): + """ + Roughly verify that the AWS DynamoDB CDC processing works as expected. + """ + + # Define source and target URLs. + kinesis_url = f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define target table name. + table_name = '"testdrive"."demo"' + + # Create target table. + cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + + # Define two CDC events: INSERT and UPDATE. + events = [ + wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), + wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), + ] + + # Initialize table loader. + table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) + + # Delete stream for blank canvas. + try: + table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + + # LocalStack needs a while when deleting the Stream. + # FIXME: Can this be made more efficient? + time.sleep(0.5) + + # Populate source database with data. + for event in events: + table_loader.kinesis_adapter.produce(event) + + # Run transfer command, consuming once not forever. + table_loader.start(once=True) + + # Verify data in target database. + assert cratedb.database.count_records(table_name) == 1 + results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 + assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}] diff --git a/tests/io/test_processor.py b/tests/io/test_processor.py index 6de1bad0..3439380b 100644 --- a/tests/io/test_processor.py +++ b/tests/io/test_processor.py @@ -133,8 +133,6 @@ def test_processor_kinesis_dynamodb_insert_update(cratedb, reset_handler, mocker from cratedb_toolkit.io.processor.kinesis_lambda import handler # Define two CDC events: INSERT and UPDATE. - # They have to be conveyed separately because CrateDB needs a - # `REFRESH TABLE` operation between them. event = { "Records": [ wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), @@ -162,6 +160,6 @@ def wrap_kinesis(data): "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "kinesis": { "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", - "data": base64.b64encode(json.dumps(data).encode("utf-8")), + "data": base64.b64encode(json.dumps(data).encode("utf-8")).decode(), }, } From 07b19749ffff2f4c99e000d370fb8983dca52fe1 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 22:10:33 +0200 Subject: [PATCH 3/3] DynamoDB CDC: Accept a few more options for the Kinesis Stream New options: batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time. --- CHANGES.md | 2 + cratedb_toolkit/io/kinesis/adapter.py | 82 ++++++++++++++++++++------ cratedb_toolkit/io/kinesis/relay.py | 7 +++ doc/io/dynamodb/cdc.md | 84 +++++++++++++++++++++++++++ tests/io/dynamodb/test_relay.py | 77 ++++++++++++++++++++++-- 5 files changed, 229 insertions(+), 23 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 17fd9160..c58595f8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ - MongoDB: Optionally filter server collection using MongoDB query expression - MongoDB: Improve error handling wrt. bulk operations vs. usability - DynamoDB CDC: Add `ctk load table` interface for processing CDC events +- DynamoDB CDC: Accept a few more options for the Kinesis Stream: + batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/io/kinesis/adapter.py b/cratedb_toolkit/io/kinesis/adapter.py index f6f709a4..93f0d557 100644 --- a/cratedb_toolkit/io/kinesis/adapter.py +++ b/cratedb_toolkit/io/kinesis/adapter.py @@ -6,8 +6,20 @@ from kinesis import Consumer, JsonProcessor, Producer from yarl import URL +from cratedb_toolkit.util.data import asbool + class KinesisAdapter: + # Configuration for Kinesis shard iterators. + # https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html + # Map `start` option to `ShardIteratorType`. + start_iterator_type_map = { + "earliest": "TRIM_HORIZON", + "latest": "LATEST", + "seqno-at": "AT_SEQUENCE_NUMBER", + "seqno-after": "AFTER_SEQUENCE_NUMBER", + } + def __init__(self, kinesis_url: URL): self.async_session = AioSession() self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password) @@ -21,10 +33,35 @@ def __init__(self, kinesis_url: URL): self.endpoint_url = None if kinesis_url.host and kinesis_url.host.lower() != "aws": self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}" + self.kinesis_url = kinesis_url - self.region_name = kinesis_url.query.get("region") self.stream_name = self.kinesis_url.path.lstrip("/") + + self.region_name: str = self.kinesis_url.query.get("region", "us-east-1") + self.batch_size: int = int(self.kinesis_url.query.get("batch-size", 100)) + self.create: bool = asbool(self.kinesis_url.query.get("create", "false")) + self.create_shards: int = int(self.kinesis_url.query.get("create-shards", 1)) + self.start: str = self.kinesis_url.query.get("start", "earliest") + self.seqno: int = int(self.kinesis_url.query.get("seqno", 0)) + self.idle_sleep: float = float(self.kinesis_url.query.get("idle-sleep", 0.5)) + self.buffer_time: float = float(self.kinesis_url.query.get("buffer-time", 0.5)) + self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url) + self.stopping: bool = False + + @property + def iterator_type(self): + """ + Map `start` option to Kinesis' `ShardIteratorType`. + """ + if self.start.startswith("seqno"): + raise NotImplementedError( + "Consuming Kinesis Stream from sequence number " "not implemented yet, please file an issue." + ) + try: + return self.start_iterator_type_map[self.start] + except KeyError as ex: + raise KeyError(f"Value for 'start' option unknown: {self.start}") from ex def consumer_factory(self, **kwargs): return Consumer( @@ -32,7 +69,12 @@ def consumer_factory(self, **kwargs): session=self.async_session, endpoint_url=self.endpoint_url, region_name=self.region_name, + max_queue_size=self.batch_size, + sleep_time_no_records=self.idle_sleep, + iterator_type=self.iterator_type, processor=JsonProcessor(), + create_stream=self.create, + create_stream_shards=self.create_shards, **kwargs, ) @@ -42,42 +84,46 @@ def consume_forever(self, handler: t.Callable): def consume_once(self, handler: t.Callable): asyncio.run(self._consume_once(handler)) + def stop(self): + self.stopping = True + async def _consume_forever(self, handler: t.Callable): """ - Consume items from a Kinesis stream. + Consume items from a Kinesis stream, forever. """ - async with self.consumer_factory( - # TODO: Make configurable. - create_stream=True, - iterator_type="TRIM_HORIZON", - sleep_time_no_records=0.2, - ) as consumer: + async with self.consumer_factory() as consumer: while True: async for item in consumer: handler(item) + if self.stopping: + self.stopping = False + break async def _consume_once(self, handler: t.Callable): - async with self.consumer_factory( - # TODO: Make configurable. - create_stream=True, - iterator_type="TRIM_HORIZON", - sleep_time_no_records=0.2, - ) as consumer: + """ + Consume items from a Kinesis stream, one-shot. + """ + async with self.consumer_factory() as consumer: async for item in consumer: handler(item) def produce(self, data: t.Dict[str, t.Any]): + """ + Produce an item to a Kinesis stream. + """ asyncio.run(self._produce(data)) async def _produce(self, data: t.Dict[str, t.Any]): - # Put item onto queue to be flushed via `put_records()`. + """ + Put item onto queue to be flushed via `put_records()`. + """ async with Producer( stream_name=self.stream_name, session=self.async_session, endpoint_url=self.endpoint_url, region_name=self.region_name, - # TODO: Make configurable. - create_stream=True, - buffer_time=0.01, + buffer_time=self.buffer_time, + create_stream=self.create, + create_stream_shards=self.create_shards, ) as producer: await producer.put(data) diff --git a/cratedb_toolkit/io/kinesis/relay.py b/cratedb_toolkit/io/kinesis/relay.py index 9a9ec8de..6cb4fa45 100644 --- a/cratedb_toolkit/io/kinesis/relay.py +++ b/cratedb_toolkit/io/kinesis/relay.py @@ -62,6 +62,10 @@ def start(self, once: bool = False): else: self.kinesis_adapter.consume_forever(self.process_event) + def stop(self): + self.progress_bar.close() + self.kinesis_adapter.stop() + def process_event(self, event): try: record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8")) @@ -80,3 +84,6 @@ def process_event(self, event): except sa.exc.ProgrammingError as ex: logger.warning(f"Running query failed: {ex}") self.progress_bar.update() + + def __del__(self): + self.stop() diff --git a/doc/io/dynamodb/cdc.md b/doc/io/dynamodb/cdc.md index fd5aac7b..bc766c26 100644 --- a/doc/io/dynamodb/cdc.md +++ b/doc/io/dynamodb/cdc.md @@ -31,6 +31,88 @@ ctk shell --command "SELECT * FROM testdrive.demo;" ctk show table "testdrive.demo" ``` +## Options + +### Batch Size +The source URL option `batch-size` configures how many items to consume from +the Kinesis Stream at once. The default value is `100`. +For many datasets, a much larger batch size is applicable for most efficient +data transfers. +```shell +ctk load table .../cdc-stream?batch-size=5000 +``` + +### Create +The source URL option `create` configures whether the designated Kinesis Stream +should be created upfront. The default value is `false`. +```shell +ctk load table .../cdc-stream?create=true +``` + +### Create Shards +The source URL option `create-shards` configures whether the designated number +of shards when a Kinesis Stream is created before consuming. +The default value is `1`. +```shell +ctk load table .../cdc-stream?create=true&create-shards=4 +``` + +### Region +The source URL accepts the `region` option to configure the AWS region +label. The default value is `us-east-1`. +```shell +ctk load table .../cdc-stream?region=eu-central-1 +``` + +### Start +The source URL accepts the `start` option to configure the DynamoDB [ShardIteratorType]. +It accepts the following values, mapping to corresponding original options. The default +value is `earliest`. + +```shell +ctk load table .../cdc-stream?start=latest +``` + +- `start=earliest` + + Start reading at the last (untrimmed) stream record, which is the oldest record in the + shard. In DynamoDB Streams, there is a 24 hour limit on data retention. Stream records + whose age exceeds this limit are subject to removal (trimming) from the stream. + This option equals `ShardIteratorType=TRIM_HORIZON`. + +- `start=latest` + + Start reading just after the most recent stream record in the shard, so that you always + read the most recent data in the shard. This option equals `ShardIteratorType=LATEST`. + +- `start=seqno-at&seqno=...` + + Start reading exactly from the position denoted by a specific sequence number. + This option equals `ShardIteratorType=AT_SEQUENCE_NUMBER` and `SequenceNumber=...`. + +- `start=seqno-after&seqno=...` + + Start reading right after the position denoted by a specific sequence number. + This option equals `ShardIteratorType=AFTER_SEQUENCE_NUMBER` and `SequenceNumber=...`. + + +### SeqNo +The source URL accepts the `seqno` option to configure the DynamoDB [SequenceNumber] +parameter. It accepts the sequence number of a stream record in the shard from which +to start reading. +```shell +ctk load table .../cdc-stream?start=seqno-after&seqno=49590338271490256608559692538361571095921575989136588898 +``` + +### Idle Sleep +The `idle-sleep` option configures the waiting time to hibernate the event loop after +running out of items to consume. The default value is `0.5`. + +### Buffer Time +The `buffer-time` option configures the time to wait before flushing produced items +to the wire. The default value is `0.5`. + + ## Variants ### CrateDB Cloud @@ -74,3 +156,5 @@ docker run \ [Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/ [Get started with Kinesis on LocalStack]: https://docs.localstack.cloud/user-guide/aws/kinesis/ [Kinesis Data Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html +[SequenceNumber]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-SequenceNumber +[ShardIteratorType]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-ShardIteratorType diff --git a/tests/io/dynamodb/test_relay.py b/tests/io/dynamodb/test_relay.py index d65f8097..7576cf28 100644 --- a/tests/io/dynamodb/test_relay.py +++ b/tests/io/dynamodb/test_relay.py @@ -1,3 +1,4 @@ +import threading import time import botocore @@ -14,13 +15,19 @@ from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402 -def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): +def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): """ - Roughly verify that the AWS DynamoDB CDC processing works as expected. + Roughly verify that the AWS DynamoDB CDC processing through Kinesis works as expected. + + This test case consumes the Kinesis Stream from the "earliest" point, i.e. from the beginning. + No option is configured, because `start=earliest` is the default mode. """ # Define source and target URLs. - kinesis_url = f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo?region=us-east-1" + kinesis_url = ( + f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo" + f"?region=us-east-1&create=true&buffer-time=0.01&idle-sleep=0.01" + ) cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Define target table name. @@ -40,7 +47,7 @@ def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Delete stream for blank canvas. try: - table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") + table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo", EnforceConsumerDeletion=True) except botocore.exceptions.ClientError as error: if error.response["Error"]["Code"] != "ResourceNotFoundException": raise @@ -56,7 +63,67 @@ def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): # Run transfer command, consuming once not forever. table_loader.start(once=True) - # Verify data in target database. + # Verify data in target database, more specifically that both events have been processed well. + assert cratedb.database.count_records(table_name) == 1 + results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 + assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}] + + +def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb): + """ + Roughly verify that the AWS DynamoDB CDC processing through Kinesis works as expected. + + This test case consumes the Kinesis Stream from the "latest" point, i.e. from "now". + """ + + # Define source and target URLs. + kinesis_url = ( + f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo" + f"?region=us-east-1&create=true&buffer-time=0.01&idle-sleep=0.01&start=latest" + ) + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Define target table name. + table_name = '"testdrive"."demo"' + + # Create target table. + cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl) + + # Define two CDC events: INSERT and UPDATE. + events = [ + wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED), + wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED), + ] + + # Initialize table loader. + table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url) + + # Delete stream for blank canvas. + try: + table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo") + except botocore.exceptions.ClientError as error: + if error.response["Error"]["Code"] != "ResourceNotFoundException": + raise + + # LocalStack needs a while when deleting the Stream. + # FIXME: Can this be made more efficient instead of waiting multiple times to orchestrate this sequence? + time.sleep(0.5) + + # Start event processor / stream consumer in separate thread, consuming forever. + thread = threading.Thread(target=table_loader.start) + thread.start() + time.sleep(1) + + # Populate source database with data. + for event in events: + table_loader.kinesis_adapter.produce(event) + + # Stop stream consumer. + table_loader.stop() + thread.join() + + # Verify data in target database, more specifically that both events have been processed well. + assert cratedb.database.refresh_table(table_name) is True assert cratedb.database.count_records(table_name) == 1 results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608 assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}]