Skip to content

Commit

Permalink
[fix][broker] Avoid introducing delay when there are delayed messages…
Browse files Browse the repository at this point in the history
… or marker messages (apache#23343)
  • Loading branch information
lhotari authored Sep 25, 2024
1 parent 4ce0c75 commit 5ea4252
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;
protected int lastNumberOfEntriesDispatched;
// tracks how many entries were processed by consumers in the last trySendMessagesToConsumers call
// the number includes also delayed messages, marker messages, aborted txn messages and filtered messages
// When no messages were processed, the value is 0. This is also an indication that the dispatcher didn't
// make progress in the last trySendMessagesToConsumers call.
protected int lastNumberOfEntriesProcessed;
protected boolean skipNextBackoff;
private final Backoff retryBackoff;
protected enum ReadType {
Expand Down Expand Up @@ -727,19 +731,22 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType,
boolean needAcquireSendInProgress,
long totalBytesSize) {
boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
int entriesDispatched = lastNumberOfEntriesDispatched;
int entriesProcessed = lastNumberOfEntriesProcessed;
updatePendingBytesToDispatch(-totalBytesSize);
if (entriesDispatched > 0) {
// Reset the backoff when we successfully dispatched messages
boolean canReadMoreImmediately = false;
if (entriesProcessed > 0 || skipNextBackoff) {
// Reset the backoff when messages were processed
retryBackoff.reset();
// Reset the possible flag to skip the backoff delay
skipNextBackoff = false;
canReadMoreImmediately = true;
}
if (triggerReadingMore) {
if (entriesDispatched > 0 || skipNextBackoff) {
skipNextBackoff = false;
if (canReadMoreImmediately) {
// Call readMoreEntries in the same thread to trigger the next read
readMoreEntries();
} else if (entriesDispatched == 0) {
// If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay
} else {
// reschedule a new read with an increasing backoff delay
reScheduleReadWithBackoff();
}
}
Expand Down Expand Up @@ -779,7 +786,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
lastNumberOfEntriesDispatched = 0;
lastNumberOfEntriesProcessed = 0;

int entriesToDispatch = entries.size();
// Trigger read more messages
Expand Down Expand Up @@ -809,6 +816,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

// If the dispatcher is closed, firstAvailableConsumerPermits will be 0, which skips dispatching the
Expand All @@ -820,6 +828,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
log.info("[{}] rewind because no available consumer found from total {}", name, consumerList.size());
entries.subList(start, entries.size()).forEach(Entry::release);
cursor.rewind();
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
return false;
}

Expand Down Expand Up @@ -863,6 +872,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalEntries += filterEntriesForConsumer(metadataArray, start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);
totalEntriesProcessed += entriesForThisConsumer.size();

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);
Expand All @@ -882,7 +892,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
Expand Down Expand Up @@ -917,6 +927,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
final AtomicInteger numConsumers = new AtomicInteger(assignResult.size());
for (Map.Entry<Consumer, List<EntryAndMetadata>> current : assignResult.entrySet()) {
final Consumer consumer = current.getKey();
Expand Down Expand Up @@ -947,6 +958,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,

totalEntries += filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);
totalEntriesProcessed += entryAndMetadataList.size();
consumer.sendMessages(entryAndMetadataList, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(),
sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker()
Expand All @@ -962,7 +974,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

return numConsumers.get() == 0; // trigger a new readMoreEntries() call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE

@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
lastNumberOfEntriesDispatched = 0;
lastNumberOfEntriesProcessed = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
int entriesCount = entries.size();

// Trigger read more messages
Expand Down Expand Up @@ -233,6 +234,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
} else if (readType == ReadType.Replay) {
entries.forEach(Entry::release);
}
skipNextBackoff = true;
return true;
}
}
Expand Down Expand Up @@ -298,6 +300,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForConsumer.size());
totalEntries += filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);
totalEntriesProcessed += entriesForConsumer.size();
consumer.sendMessages(entriesForConsumer, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
Expand Down Expand Up @@ -368,7 +371,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}

lastNumberOfEntriesDispatched = (int) totalEntries;
lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;

// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
Expand All @@ -387,8 +390,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
return true;
}

// if no messages were sent, we should retry after a backoff delay
if (entriesByConsumerForDispatching.size() == 0) {
// if no messages were sent to consumers, we should retry
if (totalEntries == 0) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -996,6 +998,86 @@ protected void reScheduleReadInMs(long readAfterMs) {
);
}


@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testNoBackoffDelayWhenDelayedMessages(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();

doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
.isDispatcherDispatchMessagesInSubscriptionThread();

AtomicInteger readMoreEntriesCalled = new AtomicInteger(0);
AtomicInteger reScheduleReadInMsCalled = new AtomicInteger(0);
AtomicBoolean delayAllMessages = new AtomicBoolean(true);

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
reScheduleReadInMsCalled.incrementAndGet();
}

@Override
public synchronized void readMoreEntries() {
readMoreEntriesCalled.incrementAndGet();
}

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (delayAllMessages.get()) {
// simulate delayed message
return true;
}
return super.trackDelayedDelivery(ledgerId, entryId, msgMetadata);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
reScheduleReadInMsCalled.incrementAndGet();
}

@Override
public synchronized void readMoreEntries() {
readMoreEntriesCalled.incrementAndGet();
}

@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (delayAllMessages.get()) {
// simulate delayed message
return true;
}
return super.trackDelayedDelivery(ledgerId, entryId, msgMetadata);
}
};
}

doAnswer(invocationOnMock -> {
GenericFutureListener<Future<Void>> listener = invocationOnMock.getArgument(0);
Future<Void> future = mock(Future.class);
when(future.isDone()).thenReturn(true);
listener.operationComplete(future);
return channelMock;
}).when(channelMock).addListener(any());

// add a consumer with permits
consumerMockAvailablePermits.set(1000);
dispatcher.addConsumer(consumerMock);

List<Entry> entries = new ArrayList<>(List.of(EntryImpl.create(1, 1, createMessage("message1", 1))));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(reScheduleReadInMsCalled.get(), 0, "reScheduleReadInMs should not be called");
assertTrue(readMoreEntriesCalled.get() >= 1);
});
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
Expand Down

0 comments on commit 5ea4252

Please sign in to comment.