diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index 289bd343..d5408490 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -59,6 +59,21 @@ DECLARE_LOG_OBJECT() using std::chrono::milliseconds; using std::chrono::seconds; +static boost::optional getStartMessageId(const boost::optional& startMessageId, + bool inclusive) { + if (!inclusive || !startMessageId) { + return startMessageId; + } + // The default ledger id and entry id of a chunked message refer the fields of the last chunk. When the + // start message id is inclusive, we need to start from the first chunk. + auto chunkMsgIdImpl = + dynamic_cast(Commands::getMessageIdImpl(startMessageId.value()).get()); + if (chunkMsgIdImpl) { + return boost::optional{chunkMsgIdImpl->getChunkedMessageIds().front()}; + } + return startMessageId; +} + ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscriptionName, const ConsumerConfiguration& conf, bool isPersistent, const ConsumerInterceptorsPtr& interceptors, @@ -91,7 +106,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, messageListenerRunning_(!conf.isStartPaused()), negativeAcksTracker_(std::make_shared(client, *this, conf)), readCompacted_(conf.isReadCompacted()), - startMessageId_(startMessageId), + startMessageId_(getStartMessageId(startMessageId, conf.isStartMessageIdInclusive())), maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()), autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()), expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs()), @@ -469,7 +484,15 @@ boost::optional ConsumerImpl::processMessageChunk(const SharedBuff auto& chunkedMsgCtx = it->second; if (it == chunkedMessageCache_.end() || !chunkedMsgCtx.validateChunkId(chunkId)) { - if (it == chunkedMessageCache_.end()) { + auto startMessageId = startMessageId_.get().value_or(MessageId::earliest()); + if (!config_.isStartMessageIdInclusive() && startMessageId.ledgerId() == messageId.ledgerId() && + startMessageId.entryId() == messageId.entryId()) { + // When the start message id is not inclusive, the last chunk of the previous chunked message will + // be delivered, which is expected and we only need to filter it out. + chunkedMessageCache_.remove(uuid); + LOG_INFO("Filtered the chunked message before the start message id (uuid: " + << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); + } else if (it == chunkedMessageCache_.end()) { LOG_ERROR("Received an uncached chunk (uuid: " << uuid << " chunkId: " << chunkId << ", messageId: " << messageId << ")"); } else { diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc index 92fdf624..ad81949b 100644 --- a/tests/ReaderTest.cc +++ b/tests/ReaderTest.cc @@ -888,5 +888,34 @@ TEST_P(ReaderSeekTest, testHasMessageAvailableAfterSeekTimestamp) { } } +TEST_F(ReaderSeekTest, testSeekInclusiveChunkMessage) { + const auto topic = "test-seek-inclusive-chunk-message-" + std::to_string(time(nullptr)); + + Producer producer; + ProducerConfiguration producerConf; + producerConf.setBatchingEnabled(false); + producerConf.setChunkingEnabled(true); + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer)); + + std::string largeValue(1024 * 1024 * 6, 'a'); + MessageId firstMsgId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeValue).build(), firstMsgId)); + MessageId secondMsgId; + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeValue).build(), secondMsgId)); + + auto assertStartMessageId = [&](bool inclusive, MessageId expectedMsgId) { + Reader reader; + ReaderConfiguration readerConf; + readerConf.setStartMessageIdInclusive(inclusive); + ASSERT_EQ(ResultOk, client.createReader(topic, firstMsgId, readerConf, reader)); + Message msg; + ASSERT_EQ(ResultOk, reader.readNext(msg, 3000)); + ASSERT_EQ(expectedMsgId, msg.getMessageId()); + ASSERT_EQ(ResultOk, reader.close()); + }; + assertStartMessageId(true, firstMsgId); + assertStartMessageId(false, secondMsgId); +} + INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false)); INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderSeekTest, ::testing::Values(true, false));