Skip to content

Commit

Permalink
DynamoDB: Add ctk load table interface for processing CDC events
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Aug 28, 2024
1 parent 2295b91 commit a7c703d
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


## Unreleased
- DynamoDB: Add `ctk load table` interface for processing CDC events

## 2024/08/27 v0.0.20
- DMS/DynamoDB: Fix table name quoting within CDC processor handler
Expand Down
7 changes: 7 additions & 0 deletions cratedb_toolkit/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ def load_table(self, resource: InputOutputResource, target: TableAddress, transf
msg = "Data loading failed"
logger.error(msg)
raise OperationFailed(msg)
elif source_url.startswith("kinesis"):
if "+cdc" in source_url:
from cratedb_toolkit.io.kinesis.api import kinesis_relay

kinesis_relay(source_url, target_url)
else:
raise NotImplementedError("Loading full data via Kinesis not implemented yet")
elif source_url.startswith("mongodb"):
if "+cdc" in source_url:
source_url = source_url.replace("+cdc", "")
Expand Down
Empty file.
56 changes: 56 additions & 0 deletions cratedb_toolkit/io/kinesis/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import asyncio
import typing as t

from aiobotocore.session import AioSession
from kinesis import Consumer, JsonProcessor, Producer
from yarl import URL


class KinesisAdapter:
def __init__(self, kinesis_url: URL):
self.session = AioSession()
self.session.set_credentials(access_key=kinesis_url.user, secret_key=kinesis_url.password)
self.endpoint_url = None
if kinesis_url.host and kinesis_url.host.lower() != "aws":
self.endpoint_url = f"http://{kinesis_url.host}:{kinesis_url.port}"
self.kinesis_url = kinesis_url
self.region_name = kinesis_url.query.get("region")
self.stream_name = self.kinesis_url.path.lstrip("/")

def consume_forever(self, handler: t.Callable):
asyncio.run(self._consume_forever(handler))

async def _consume_forever(self, handler: t.Callable):
"""
Consume items from a Kinesis stream.
"""
async with Consumer(
stream_name=self.stream_name,
session=self.session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
# TODO: Make configurable.
create_stream=True,
iterator_type="TRIM_HORIZON",
sleep_time_no_records=0.2,
processor=JsonProcessor(),
) as consumer:
while True:
async for item in consumer:
handler(item)

def produce(self, data: t.Dict[str, t.Any]):
asyncio.run(self._produce(data))

async def _produce(self, data: t.Dict[str, t.Any]):
# Put item onto queue to be flushed via `put_records()`.
async with Producer(
stream_name=self.stream_name,
session=self.session,
endpoint_url=self.endpoint_url,
region_name=self.region_name,
# TODO: Make configurable.
create_stream=True,
buffer_time=0.01,
) as producer:
await producer.put(data)
6 changes: 6 additions & 0 deletions cratedb_toolkit/io/kinesis/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from cratedb_toolkit.io.kinesis.relay import KinesisRelay


def kinesis_relay(source_url, target_url):
ka = KinesisRelay(source_url, target_url)
ka.start()
79 changes: 79 additions & 0 deletions cratedb_toolkit/io/kinesis/relay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import base64
import json
import logging

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoDBCDCTranslator
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
from yarl import URL

from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)


class KinesisRelay:
"""
Relay events from Kinesis into CrateDB table.
"""

def __init__(
self,
kinesis_url: str,
cratedb_url: str,
):
cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname

self.kinesis_url = URL(kinesis_url)
self.kinesis_adapter = KinesisAdapter(self.kinesis_url)
self.cratedb_adapter = DatabaseAdapter(str(cratedb_sqlalchemy_url), echo=False)
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)

if "dynamodb+cdc" in self.kinesis_url.scheme:
self.translator = DynamoDBCDCTranslator(table_name=self.cratedb_table)
else:
raise NotImplementedError(f"Data processing not implemented for {self.kinesis_url}")

self.connection: sa.Connection
self.progress_bar: tqdm

def start(self):
"""
Read events from Kinesis stream, convert to SQL statements, and submit to CrateDB.
"""
logger.info(f"Source: Kinesis stream={self.kinesis_adapter.stream_name} count=unknown")
self.connection = self.cratedb_adapter.engine.connect()
if not self.cratedb_adapter.table_exists(self.cratedb_table):
self.connection.execute(sa.text(self.translator.sql_ddl))
self.connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
# Harmonize logging and progress bar.
# https://github.com/tqdm/tqdm#redirecting-logging
self.progress_bar = tqdm()
with logging_redirect_tqdm():
self.kinesis_adapter.consume_forever(self.process)

def process(self, event):
try:
record = json.loads(base64.b64decode(event["kinesis"]["data"]).decode("utf-8"))
operation = self.translator.to_sql(record)
except Exception:
logger.exception("Decoding Kinesis event failed")
return
try:
# Process record.
self.connection.execute(sa.text(operation.statement), operation.parameters)

# Processing alternating CDC events requires write synchronization.
self.connection.execute(sa.text(f"REFRESH TABLE {self.translator.quote_table_name(self.cratedb_table)}"))

self.connection.commit()
except sa.exc.ProgrammingError as ex:
logger.warning(f"Running query failed: {ex}")
self.progress_bar.update()
14 changes: 14 additions & 0 deletions examples/aws/kinesis_put.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from yarl import URL

from cratedb_toolkit.io.kinesis.adapter import KinesisAdapter
from tests.io.test_processor import DYNAMODB_CDC_INSERT_NESTED, DYNAMODB_CDC_MODIFY_NESTED, wrap_kinesis


def main():
ka = KinesisAdapter(URL("kinesis://LSIAQAAAAAAVNCBMPNSG:dummy@localhost:4566/cdc-stream?region=eu-central-1"))
ka.produce(wrap_kinesis(DYNAMODB_CDC_INSERT_NESTED))
ka.produce(wrap_kinesis(DYNAMODB_CDC_MODIFY_NESTED))


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ io = [
"sqlalchemy>=2",
]
kinesis = [
"async-kinesis<1.2",
"commons-codec>=0.0.12",
"lorrystream[carabas]",
]
Expand Down

0 comments on commit a7c703d

Please sign in to comment.