Skip to content

Commit

Permalink
[fix][client] Fix failover consumer-listener stuck with cumulative ac…
Browse files Browse the repository at this point in the history
…k and epoch time (apache#23345)
  • Loading branch information
rdhabalia authored Oct 2, 2024
1 parent adb9014 commit 53e996c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -37,6 +42,8 @@
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
Expand All @@ -49,8 +56,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;

@Test(groups = "broker-impl")
public class MessageRedeliveryTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -539,4 +544,57 @@ public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws
// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);
}

/**
* This test validates that client lib correctly increases permits of individual consumer to retrieve data in case
* of incorrect epoch for partition-topic multi-consumer.
*
* @throws Exception
*/
@Test
public void testRedeliveryWithMultiConsumerAndListenerAddEpoch() throws Exception {
final String topic = "testRedeliveryWithMultiConsumerAndListenerAddEpoch";
final String subName = "my-sub";
int totalMessages = 100;
admin.topics().createPartitionedTopic(topic, 2);

Map<MessageId, String> ids = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(totalMessages);
MessageListener<String> msgListener = (Consumer<String> consumer, Message<String> msg) -> {
String id = msg.getMessageId().toString();
consumer.acknowledgeCumulativeAsync(msg);
if (ids.put(msg.getMessageId(), id) == null) {
latch.countDown();
}
};
@Cleanup
Consumer<String> newConsumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subName)
.messageListener(msgListener).subscriptionType(SubscriptionType.Failover)
.receiverQueueSize(totalMessages / 10).subscribe();

MultiTopicsConsumerImpl<String> consumer = (MultiTopicsConsumerImpl<String>) newConsumer;
long epoch = consumer.getConsumerEpoch() + 1;
consumer.setConsumerEpoch(epoch);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false)
.create();

for (int i = 0; i < totalMessages; i++) {
producer.sendAsync("test" + i);
}
producer.flush();

// make sure listener has not received any messages until
// we call redelivery with correct epoch
for (int i = 0; i < 2; i++) {
assertTrue(ids.isEmpty());
Thread.sleep(1000);
}
// make epoch valid to consume redelivery message again
consumer.setConsumerEpoch(epoch - 1);
consumer.redeliverUnacknowledgedMessages();

latch.await(10, TimeUnit.SECONDS);
assertEquals(ids.size(), totalMessages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,9 @@ protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Sending permit-cmd to broker with available permits = {}", topic, available);
}
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,13 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer, boolean batchR
// Process the message, add to the queue and trigger listener or async callback
messages.forEach(msg -> {
final boolean skipDueToSeek = duringSeek;
if (isValidConsumerEpoch((MessageImpl<T>) msg) && !skipDueToSeek) {
MessageImpl<T> msgImpl = (MessageImpl<T>) msg;
ClientCnx cnx = msgImpl.getCnx();
boolean isValidEpoch = isValidConsumerEpoch(msgImpl);
if (isValidEpoch && !skipDueToSeek) {
messageReceived(consumer, msg);
} else if (!isValidEpoch) {
consumer.increaseAvailablePermits(cnx);
} else if (skipDueToSeek) {
log.info("[{}] [{}] Skip processing message {} received during seek", topic, subscription,
msg.getMessageId());
Expand Down

0 comments on commit 53e996c

Please sign in to comment.