Skip to content

Commit

Permalink
ShardId is not passed to ShardConsumer, resulting in logs saying shar…
Browse files Browse the repository at this point in the history
…d is null on shutdown (opensearch-project#3683)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Nov 16, 2023
1 parent 92224c2 commit 1c28285
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,7 @@ public void run() {
}

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
if (shardId != null) {
LOG.info("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId);
}
LOG.debug("{} records written to buffer for shard {}", recordsWrittenToBuffer, shardId);
checkpointer.checkpoint(sequenceNumber);
lastCheckpointTime = System.currentTimeMillis();
}
Expand Down Expand Up @@ -289,10 +287,6 @@ public void run() {
acknowledgementSet.complete();
}

if (shardId != null) {
LOG.info("Completed writing shard {} to buffer after reaching the end of the shard", shardId);
}

if (waitForExport) {
waitForExport();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public Runnable createConsumer(final StreamPartition streamPartition,
.tableInfo(tableInfo)
.checkpointer(checkpointer)
.shardIterator(shardIterator)
.shardId(streamPartition.getShardId())
.lastShardIterator(lastShardIterator)
.startTime(startTime)
.waitForExport(waitForExport)
Expand Down

0 comments on commit 1c28285

Please sign in to comment.