From 1c28285fa3a45e315ce900198ccea8022b863cab Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 16 Nov 2023 17:32:55 -0600 Subject: [PATCH] ShardId is not passed to ShardConsumer, resulting in logs saying shard is null on shutdown (#3683) Signed-off-by: Taylor Gray --- .../plugins/source/dynamodb/stream/ShardConsumer.java | 8 +------- .../source/dynamodb/stream/ShardConsumerFactory.java | 1 + 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 049065885e..554496b99d 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -233,9 +233,7 @@ public void run() { } if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) { - if (shardId != null) { - LOG.info("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); - } + LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId); checkpointer.checkpoint(sequenceNumber); lastCheckpointTime = System.currentTimeMillis(); } @@ -289,10 +287,6 @@ public void run() { acknowledgementSet.complete(); } - if (shardId != null) { - LOG.info("Completed writing shard {} to buffer after reaching the end of the shard", shardId); - } - if (waitForExport) { waitForExport(); } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index 6f17087c97..aea98740c5 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -96,6 +96,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, .tableInfo(tableInfo) .checkpointer(checkpointer) .shardIterator(shardIterator) + .shardId(streamPartition.getShardId()) .lastShardIterator(lastShardIterator) .startTime(startTime) .waitForExport(waitForExport)