From 54118b38a6a305d87d61764ac3d5d788b87b047e Mon Sep 17 00:00:00 2001 From: Jack Luo Date: Thu, 21 Nov 2024 05:09:24 +0800 Subject: [PATCH] Use `value.length` instead of `record.data().asByteArray().length` --- .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 086d89722d08..de876b3071f4 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -150,8 +150,9 @@ private BytesStreamMessage extractStreamMessage(Record record, String shardId) { String sequenceNumber = record.sequenceNumber(); KinesisPartitionGroupOffset offset = new KinesisPartitionGroupOffset(shardId, sequenceNumber); // NOTE: Use the same offset as next offset because the consumer starts consuming AFTER the start sequence number. - StreamMessageMetadata.Builder builder = new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp) - .setSerializedValueSize(record.data().asByteArray().length).setOffset(offset, offset); + StreamMessageMetadata.Builder builder = + new StreamMessageMetadata.Builder().setRecordIngestionTimeMs(timestamp).setSerializedValueSize(value.length) + .setOffset(offset, offset); if (_config.isPopulateMetadata()) { builder.setMetadata(Map.of(KinesisStreamMessageMetadata.APPRX_ARRIVAL_TIMESTAMP_KEY, String.valueOf(timestamp), KinesisStreamMessageMetadata.SEQUENCE_NUMBER_KEY, sequenceNumber));