Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DynamoDB: Add ctk load table interface for processing CDC events #247

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/dynamodb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ jobs:
os: ["ubuntu-latest"]
# TODO: yarl, dependency of influxio, is currently not available on Python 3.12.
# https://github.com/aio-libs/yarl/pull/942
python-version: ["3.8", "3.11"]
localstack-version: ["3.7"]
python-version: ["3.9", "3.11"]
localstack-version: ["3.6"]

env:
OS: ${{ matrix.os }}
Expand Down Expand Up @@ -78,7 +78,7 @@ jobs:
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,test,develop]
pip install --use-pep517 --prefer-binary --editable=.[dynamodb,kinesis,test,develop]

- name: Run linter and software tests
run: |
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability
- DynamoDB CDC: Add `ctk load table` interface for processing CDC events
- DynamoDB CDC: Accept a few more options for the Kinesis Stream:
batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
9 changes: 9 additions & 0 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,18 @@
logger.error("Data loading failed or incomplete")
return False

elif source_url_obj.scheme.startswith("kinesis"):
if "+cdc" in source_url_obj.scheme:
from cratedb_toolkit.io.kinesis.api import kinesis_relay

Check warning on line 143 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L142-L143

Added lines #L142 - L143 were not covered by tests

return kinesis_relay(str(source_url_obj), target_url)

Check warning on line 145 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L145

Added line #L145 was not covered by tests
else:
raise NotImplementedError("Loading full data via Kinesis not implemented yet")

Check warning on line 147 in cratedb_toolkit/api/main.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/api/main.py#L147

Added line #L147 was not covered by tests

elif source_url_obj.scheme in ["file+bson", "http+bson", "https+bson", "mongodb", "mongodb+srv"]:
if "+cdc" in source_url_obj.scheme:
source_url_obj.scheme = source_url_obj.scheme.replace("+cdc", "")

from cratedb_toolkit.io.mongodb.api import mongodb_relay_cdc

return mongodb_relay_cdc(str(source_url_obj), target_url, progress=True)
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/dynamodb/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self, dynamodb_url: URL):
def scan(
self,
table_name: str,
page_size: int = 1000,
batch_size: int = 100,
consistent_read: bool = False,
on_error: t.Literal["log", "raise"] = "log",
) -> t.Generator[t.Dict, None, None]:
Expand All @@ -35,7 +35,7 @@ def scan(
key = None
while True:
try:
scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": page_size}
scan_kwargs = {"TableName": table_name, "ConsistentRead": consistent_read, "Limit": batch_size}
if key is not None:
scan_kwargs.update({"ExclusiveStartKey": key})
response = self.dynamodb_client.scan(**scan_kwargs)
Expand Down
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
self.progress = progress
self.debug = debug

self.page_size: int = int(self.dynamodb_url.query.get("page-size", 1000))
self.batch_size: int = int(self.dynamodb_url.query.get("batch-size", 100))
self.consistent_read: bool = asbool(self.dynamodb_url.query.get("consistent-read", False))

def start(self):
Expand All @@ -63,7 +63,7 @@ def start(self):
for result in self.dynamodb_adapter.scan(
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
page_size=self.page_size,
batch_size=self.batch_size,
):
result_size = len(result["Items"])
try:
Expand Down
Empty file.
129 changes: 129 additions & 0 deletions cratedb_toolkit/io/kinesis/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import asyncio
import typing as t

import boto3
from aiobotocore.session import AioSession
from kinesis import Consumer, JsonProcessor, Producer
from yarl import URL

from cratedb_toolkit.util.data import asbool


class KinesisAdapter:
# Configuration for Kinesis shard iterators.
# https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html
# Map `start` option to `ShardIteratorType`.
start_iterator_type_map = {
"earliest": "TRIM_HORIZON",
"latest": "LATEST",
"seqno-at": "AT_SEQUENCE_NUMBER",
"seqno-after": "AFTER_SEQUENCE_NUMBER",
}

def __init__(self, kinesis_url: URL):
self.async_session = AioSession()
self.async_session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password)

self.session = boto3.Session(
aws_access_key_id=kinesis_url.user,
aws_secret_access_key=kinesis_url.password,
region_name=kinesis_url.query.get("region"),
)

self.endpoint_url = None
if kinesis_url.host and kinesis_url.host.lower() != "aws":
self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}"

self.kinesis_url = kinesis_url
self.stream_name = self.kinesis_url.path.lstrip("/")

self.region_name: str = self.kinesis_url.query.get("region", "us-east-1")
self.batch_size: int = int(self.kinesis_url.query.get("batch-size", 100))
self.create: bool = asbool(self.kinesis_url.query.get("create", "false"))
self.create_shards: int = int(self.kinesis_url.query.get("create-shards", 1))
self.start: str = self.kinesis_url.query.get("start", "earliest")
self.seqno: int = int(self.kinesis_url.query.get("seqno", 0))
self.idle_sleep: float = float(self.kinesis_url.query.get("idle-sleep", 0.5))
self.buffer_time: float = float(self.kinesis_url.query.get("buffer-time", 0.5))

self.kinesis_client = self.session.client("kinesis", endpoint_url=self.endpoint_url)
self.stopping: bool = False

@property
def iterator_type(self):
"""
Map `start` option to Kinesis' `ShardIteratorType`.
"""
if self.start.startswith("seqno"):
raise NotImplementedError(

Check warning on line 58 in cratedb_toolkit/io/kinesis/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/adapter.py#L58

Added line #L58 was not covered by tests
"Consuming Kinesis Stream from sequence number " "not implemented yet, please file an issue."
)
try:
return self.start_iterator_type_map[self.start]
except KeyError as ex:
raise KeyError(f"Value for 'start' option unknown: {self.start}") from ex

Check warning on line 64 in cratedb_toolkit/io/kinesis/adapter.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/adapter.py#L63-L64

Added lines #L63 - L64 were not covered by tests

def consumer_factory(self, **kwargs):
return Consumer(
stream_name=self.stream_name,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
max_queue_size=self.batch_size,
sleep_time_no_records=self.idle_sleep,
iterator_type=self.iterator_type,
processor=JsonProcessor(),
create_stream=self.create,
create_stream_shards=self.create_shards,
**kwargs,
)

def consume_forever(self, handler: t.Callable):
asyncio.run(self._consume_forever(handler))

def consume_once(self, handler: t.Callable):
asyncio.run(self._consume_once(handler))

def stop(self):
self.stopping = True

async def _consume_forever(self, handler: t.Callable):
"""
Consume items from a Kinesis stream, forever.
"""
async with self.consumer_factory() as consumer:
while True:
async for item in consumer:
handler(item)
if self.stopping:
self.stopping = False
break

async def _consume_once(self, handler: t.Callable):
"""
Consume items from a Kinesis stream, one-shot.
"""
async with self.consumer_factory() as consumer:
async for item in consumer:
handler(item)

def produce(self, data: t.Dict[str, t.Any]):
"""
Produce an item to a Kinesis stream.
"""
asyncio.run(self._produce(data))

async def _produce(self, data: t.Dict[str, t.Any]):
"""
Put item onto queue to be flushed via `put_records()`.
"""
async with Producer(
stream_name=self.stream_name,
session=self.async_session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
buffer_time=self.buffer_time,
create_stream=self.create,
create_stream_shards=self.create_shards,
) as producer:
await producer.put(data)
6 changes: 6 additions & 0 deletions cratedb_toolkit/io/kinesis/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from cratedb_toolkit.io.kinesis.relay import KinesisRelay

Check warning on line 1 in cratedb_toolkit/io/kinesis/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/api.py#L1

Added line #L1 was not covered by tests


def kinesis_relay(source_url, target_url):
ka = KinesisRelay(source_url, target_url)
ka.start()

Check warning on line 6 in cratedb_toolkit/io/kinesis/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/api.py#L4-L6

Added lines #L4 - L6 were not covered by tests
89 changes: 89 additions & 0 deletions cratedb_toolkit/io/kinesis/relay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import base64
import json
import logging

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoDBCDCTranslator
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from yarl import URL

from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class KinesisRelay:
"""
Relay events from Kinesis into CrateDB table.
"""

def __init__(
self,
kinesis_url: str,
cratedb_url: str,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.kinesis_url = URL(kinesis_url)
self.kinesis_adapter = KinesisAdapter(self.kinesis_url)
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)

if "dynamodb+cdc" in self.kinesis_url.scheme:
self.translator = DynamoDBCDCTranslator(table_name=self.cratedb_table)
else:
raise NotImplementedError(f"Data processing not implemented for {self.kinesis_url}")

Check warning on line 40 in cratedb_toolkit/io/kinesis/relay.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/relay.py#L40

Added line #L40 was not covered by tests

self.connection: sa.Connection
self.progress_bar: tqdm

def start(self, once: bool = False):
"""
Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB.
"""
logger.info(f"Source: Kinesis stream={self.kinesis_adapter.stream_name} count=unknown")
self.connection = self.cratedb_adapter.engine.connect()
if not self.cratedb_adapter.table_exists(self.cratedb_table):
self.connection.execute(sa.text(self.translator.sql_ddl))
self.connection.commit()

Check warning on line 53 in cratedb_toolkit/io/kinesis/relay.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/relay.py#L52-L53

Added lines #L52 - L53 were not covered by tests
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
# Harmonize logging and progress bar.
# https://github.com/tqdm/tqdm#redirecting-logging
self.progress_bar = tqdm()
with logging_redirect_tqdm():
if once:
self.kinesis_adapter.consume_once(self.process_event)
else:
self.kinesis_adapter.consume_forever(self.process_event)

def stop(self):
self.progress_bar.close()
self.kinesis_adapter.stop()

def process_event(self, event):
try:
record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8"))
operation = self.translator.to_sql(record)
except Exception:
logger.exception("Decoding Kinesis event failed")
return

Check warning on line 75 in cratedb_toolkit/io/kinesis/relay.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/relay.py#L73-L75

Added lines #L73 - L75 were not covered by tests
try:
# Process record.
self.connection.execute(sa.text(operation.statement), operation.parameters)

# Processing alternating CDC events requires write synchronization.
self.connection.execute(sa.text(f"REFRESH TABLE {self.cratedb_table}"))

self.connection.commit()
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")

Check warning on line 85 in cratedb_toolkit/io/kinesis/relay.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/kinesis/relay.py#L84-L85

Added lines #L84 - L85 were not covered by tests
self.progress_bar.update()

def __del__(self):
self.stop()
Loading