Skip to content

Commit

Permalink
DynamoDB CDC: Accept a few more options for the Kinesis Stream
Browse files Browse the repository at this point in the history
New options: batch-size, create, create-shards, start, seqno,
idle-sleep, buffer-time.
  • Loading branch information
amotl committed Sep 13, 2024
1 parent dbbce50 commit 07b1974
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability
- DynamoDB CDC: Add `ctk load table` interface for processing CDC events
- DynamoDB CDC: Accept a few more options for the Kinesis Stream:
batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
82 changes: 64 additions & 18 deletions cratedb_toolkit/io/kinesis/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,20 @@
from kinesis import Consumer, JsonProcessor, Producer
from yarl import URL

from cratedb_toolkit.util.data import asbool


class KinesisAdapter:
# Configuration for Kinesis shard iterators.
# https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html
# Map `start` option to `ShardIteratorType`.
start_iterator_type_map = {
"earliest": "TRIM_HORIZON",
"latest": "LATEST",
"seqno-at": "AT_SEQUENCE_NUMBER",
"seqno-after": "AFTER_SEQUENCE_NUMBER",
}

def __init__(self, kinesis_url: URL):
self.async_session = AioSession()
self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password)
Expand All @@ -21,18 +33,48 @@ def __init__(self, kinesis_url: URL):
self.endpoint_url = None
if kinesis_url.host and kinesis_url.host.lower() != "aws":
self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}"

self.kinesis_url = kinesis_url
self.region_name = kinesis_url.query.get("region")
self.stream_name = self.kinesis_url.path.lstrip("/")

self.region_name: str = self.kinesis_url.query.get("region", "us-east-1")
self.batch_size: int = int(self.kinesis_url.query.get("batch-size", 100))
self.create: bool = asbool(self.kinesis_url.query.get("create", "false"))
self.create_shards: int = int(self.kinesis_url.query.get("create-shards", 1))
self.start: str = self.kinesis_url.query.get("start", "earliest")
self.seqno: int = int(self.kinesis_url.query.get("seqno", 0))
self.idle_sleep: float = float(self.kinesis_url.query.get("idle-sleep", 0.5))
self.buffer_time: float = float(self.kinesis_url.query.get("buffer-time", 0.5))

self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url)
self.stopping: bool = False

@property
def iterator_type(self):
"""
Map `start` option to Kinesis' `ShardIteratorType`.
"""
if self.start.startswith("seqno"):
raise NotImplementedError(

Check warning on line 58 in cratedb_toolkit/io/kinesis/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/adapter.py#L58

Added line #L58 was not covered by tests
"Consuming Kinesis Stream from sequence number " "not implemented yet, please file an issue."
)
try:
return self.start_iterator_type_map[self.start]
except KeyError as ex:
raise KeyError(f"Value for 'start' option unknown: {self.start}") from ex

Check warning on line 64 in cratedb_toolkit/io/kinesis/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/adapter.py#L63-L64

Added lines #L63 - L64 were not covered by tests

def consumer_factory(self, **kwargs):
return Consumer(
stream_name=self.stream_name,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
max_queue_size=self.batch_size,
sleep_time_no_records=self.idle_sleep,
iterator_type=self.iterator_type,
processor=JsonProcessor(),
create_stream=self.create,
create_stream_shards=self.create_shards,
**kwargs,
)

Expand All @@ -42,42 +84,46 @@ def consume_forever(self, handler: t.Callable):
def consume_once(self, handler: t.Callable):
asyncio.run(self._consume_once(handler))

def stop(self):
self.stopping = True

async def _consume_forever(self, handler: t.Callable):
"""
Consume items from a Kinesis stream.
Consume items from a Kinesis stream, forever.
"""
async with self.consumer_factory(
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
) as consumer:
async with self.consumer_factory() as consumer:
while True:
async for item in consumer:
handler(item)
if self.stopping:
self.stopping = False
break

async def _consume_once(self, handler: t.Callable):
async with self.consumer_factory(
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
) as consumer:
"""
Consume items from a Kinesis stream, one-shot.
"""
async with self.consumer_factory() as consumer:
async for item in consumer:
handler(item)

def produce(self, data: t.Dict[str, t.Any]):
"""
Produce an item to a Kinesis stream.
"""
asyncio.run(self._produce(data))

async def _produce(self, data: t.Dict[str, t.Any]):
# Put item onto queue to be flushed via `put_records()`.
"""
Put item onto queue to be flushed via `put_records()`.
"""
async with Producer(
stream_name=self.stream_name,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
# TODO: Make configurable.
create_stream=True,
buffer_time=0.01,
buffer_time=self.buffer_time,
create_stream=self.create,
create_stream_shards=self.create_shards,
) as producer:
await producer.put(data)
7 changes: 7 additions & 0 deletions cratedb_toolkit/io/kinesis/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ def start(self, once: bool = False):
else:
self.kinesis_adapter.consume_forever(self.process_event)

def stop(self):
self.progress_bar.close()
self.kinesis_adapter.stop()

def process_event(self, event):
try:
record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8"))
Expand All @@ -80,3 +84,6 @@ def process_event(self, event):
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")

Check warning on line 85 in cratedb_toolkit/io/kinesis/relay.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/relay.py#L84-L85

Added lines #L84 - L85 were not covered by tests
self.progress_bar.update()

def __del__(self):
self.stop()
84 changes: 84 additions & 0 deletions doc/io/dynamodb/cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,88 @@ ctk shell --command "SELECT * FROM testdrive.demo;"
ctk show table "testdrive.demo"
```

## Options

### Batch Size
The source URL option `batch-size` configures how many items to consume from
the Kinesis Stream at once. The default value is `100`.
For many datasets, a much larger batch size is applicable for most efficient
data transfers.
```shell
ctk load table .../cdc-stream?batch-size=5000
```

### Create
The source URL option `create` configures whether the designated Kinesis Stream
should be created upfront. The default value is `false`.
```shell
ctk load table .../cdc-stream?create=true
```

### Create Shards
The source URL option `create-shards` configures whether the designated number
of shards when a Kinesis Stream is created before consuming.
The default value is `1`.
```shell
ctk load table .../cdc-stream?create=true&create-shards=4
```

### Region
The source URL accepts the `region` option to configure the AWS region
label. The default value is `us-east-1`.
```shell
ctk load table .../cdc-stream?region=eu-central-1
```

### Start
The source URL accepts the `start` option to configure the DynamoDB [ShardIteratorType].
It accepts the following values, mapping to corresponding original options. The default
value is `earliest`.

```shell
ctk load table .../cdc-stream?start=latest
```

- `start=earliest`

Start reading at the last (untrimmed) stream record, which is the oldest record in the
shard. In DynamoDB Streams, there is a 24 hour limit on data retention. Stream records
whose age exceeds this limit are subject to removal (trimming) from the stream.
This option equals `ShardIteratorType=TRIM_HORIZON`.

- `start=latest`

Start reading just after the most recent stream record in the shard, so that you always
read the most recent data in the shard. This option equals `ShardIteratorType=LATEST`.

- `start=seqno-at&seqno=...`

Start reading exactly from the position denoted by a specific sequence number.
This option equals `ShardIteratorType=AT_SEQUENCE_NUMBER` and `SequenceNumber=...`.

- `start=seqno-after&seqno=...`

Start reading right after the position denoted by a specific sequence number.
This option equals `ShardIteratorType=AFTER_SEQUENCE_NUMBER` and `SequenceNumber=...`.


### SeqNo
The source URL accepts the `seqno` option to configure the DynamoDB [SequenceNumber]
parameter. It accepts the sequence number of a stream record in the shard from which
to start reading.
```shell
ctk load table .../cdc-stream?start=seqno-after&seqno=49590338271490256608559692538361571095921575989136588898
```

### Idle Sleep
The `idle-sleep` option configures the waiting time to hibernate the event loop after
running out of items to consume. The default value is `0.5`.

### Buffer Time
The `buffer-time` option configures the time to wait before flushing produced items
to the wire. The default value is `0.5`.


## Variants

### CrateDB Cloud
Expand Down Expand Up @@ -74,3 +156,5 @@ docker run \
[Credentials for accessing LocalStack AWS API]: https://docs.localstack.cloud/references/credentials/
[Get started with Kinesis on LocalStack]: https://docs.localstack.cloud/user-guide/aws/kinesis/
[Kinesis Data Streams]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html
[SequenceNumber]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-SequenceNumber
[ShardIteratorType]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-ShardIteratorType
77 changes: 72 additions & 5 deletions tests/io/dynamodb/test_relay.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import threading
import time

import botocore
Expand All @@ -14,13 +15,19 @@
from commons_codec.transform.dynamodb import DynamoDBCDCTranslator # noqa: E402


def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
def test_kinesis_earliest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
"""
Roughly verify that the AWS DynamoDB CDC processing works as expected.
Roughly verify that the AWS DynamoDB CDC processing through Kinesis works as expected.
This test case consumes the Kinesis Stream from the "earliest" point, i.e. from the beginning.
No option is configured, because `start=earliest` is the default mode.
"""

# Define source and target URLs.
kinesis_url = f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo?region=us-east-1"
kinesis_url = (
f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo"
f"?region=us-east-1&create=true&buffer-time=0.01&idle-sleep=0.01"
)
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Define target table name.
Expand All @@ -40,7 +47,7 @@ def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):

# Delete stream for blank canvas.
try:
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo")
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo", EnforceConsumerDeletion=True)
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise
Expand All @@ -56,7 +63,67 @@ def test_kinesis_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
# Run transfer command, consuming once not forever.
table_loader.start(once=True)

# Verify data in target database.
# Verify data in target database, more specifically that both events have been processed well.
assert cratedb.database.count_records(table_name) == 1
results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608
assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}]


def test_kinesis_latest_dynamodb_cdc_insert_update(caplog, cratedb, dynamodb):
"""
Roughly verify that the AWS DynamoDB CDC processing through Kinesis works as expected.
This test case consumes the Kinesis Stream from the "latest" point, i.e. from "now".
"""

# Define source and target URLs.
kinesis_url = (
f"{dynamodb.get_connection_url_kinesis_dynamodb_cdc()}/demo"
f"?region=us-east-1&create=true&buffer-time=0.01&idle-sleep=0.01&start=latest"
)
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Define target table name.
table_name = '"testdrive"."demo"'

# Create target table.
cratedb.database.run_sql(DynamoDBCDCTranslator(table_name=table_name).sql_ddl)

# Define two CDC events: INSERT and UPDATE.
events = [
wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED),
wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED),
]

# Initialize table loader.
table_loader = KinesisRelay(kinesis_url=kinesis_url, cratedb_url=cratedb_url)

# Delete stream for blank canvas.
try:
table_loader.kinesis_adapter.kinesis_client.delete_stream(StreamName="demo")
except botocore.exceptions.ClientError as error:
if error.response["Error"]["Code"] != "ResourceNotFoundException":
raise

# LocalStack needs a while when deleting the Stream.
# FIXME: Can this be made more efficient instead of waiting multiple times to orchestrate this sequence?
time.sleep(0.5)

# Start event processor / stream consumer in separate thread, consuming forever.
thread = threading.Thread(target=table_loader.start)
thread.start()
time.sleep(1)

# Populate source database with data.
for event in events:
table_loader.kinesis_adapter.produce(event)

# Stop stream consumer.
table_loader.stop()
thread.join()

# Verify data in target database, more specifically that both events have been processed well.
assert cratedb.database.refresh_table(table_name) is True
assert cratedb.database.count_records(table_name) == 1
results = cratedb.database.run_sql(f"SELECT * FROM {table_name}", records=True) # noqa: S608
assert results[0]["data"]["list_of_objects"] == [{"foo": "bar"}, {"baz": "qux"}]

0 comments on commit 07b1974

Please sign in to comment.