Skip to content

Commit

Permalink
Use value.length instead of record.data().asByteArray().length
Browse files Browse the repository at this point in the history
  • Loading branch information
jackluo923 committed Nov 20, 2024
1 parent 5496198 commit 54118b3
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ private BytesStreamMessage extractStreamMessage(Record record, String shardId) {
String sequenceNumber = record.sequenceNumber();
KinesisPartitionGroupOffset offset = new KinesisPartitionGroupOffset(shardId, sequenceNumber);
// NOTE: Use the same offset as next offset because the consumer starts consuming AFTER the start sequence number.
StreamMessageMetadata.Builder builder = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp)
.setSerializedValueSize(record.data().asByteArray().length).setOffset(offset, offset);
StreamMessageMetadata.Builder builder =
new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setSerializedValueSize(value.length)
.setOffset(offset, offset);
if (_config.isPopulateMetadata()) {
builder.setMetadata(Map.of(KinesisStreamMessageMetadata.APPRX_ARRIVAL_TIMESTAMP_KEY, String.valueOf(timestamp),
KinesisStreamMessageMetadata.SEQUENCE_NUMBER_KEY, sequenceNumber));
Expand Down

0 comments on commit 54118b3

Please sign in to comment.