Skip to content

Commit

Permalink
[improve][broker] PIP-379: Key_Shared Draining Hashes for Improved Me…
Browse files Browse the repository at this point in the history
…ssage Ordering (apache#23352)
  • Loading branch information
lhotari authored Oct 8, 2024
1 parent 4efcc15 commit 3d0625b
Show file tree
Hide file tree
Showing 47 changed files with 3,324 additions and 1,838 deletions.
1 change: 1 addition & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ The Apache Software License, Version 2.0
- com.fasterxml.jackson.module-jackson-module-parameter-names-2.17.2.jar
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
* Fastutil -- it.unimi.dsi-fastutil-8.5.14.jar
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.17.0.jar
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
* Gson
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ flexible messaging model and an intuitive client API.</description>
<bouncycastle.bcpkix-fips.version>1.0.7</bouncycastle.bcpkix-fips.version>
<bouncycastle.bc-fips.version>1.0.2.5</bouncycastle.bc-fips.version>
<jackson.version>2.17.2</jackson.version>
<fastutil.version>8.5.14</fastutil.version>
<reflections.version>0.10.2</reflections.version>
<swagger.version>1.6.2</swagger.version>
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
Expand Down Expand Up @@ -911,6 +912,12 @@ flexible messaging model and an intuitive client API.</description>
<scope>import</scope>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, when a blocked key hash gets unblocked,"
+ " a redelivery will be attempted after a delay. This setting controls the delay."
+ " The reason to have the delay is to batch multiple unblocking events instead of triggering"
+ " redelivery for each unblocking event.",
dynamic = true
)
private long keySharedUnblockingIntervalMs = 10L;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
Expand Down
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>protobuf-java</artifactId>
</dependency>

<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void run(Timeout timeout) throws Exception {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
dispatcher.readMoreEntries();
dispatcher.readMoreEntriesAsync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3416,7 +3416,7 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().execute(() -> dispatcher.readMoreEntries());
dispatcher.readMoreEntriesAsync();
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;

import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -28,13 +27,10 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.util.Murmur3_32Hash;

/**
* This is a consumer selector based fixed hash range.
*
* The implementation uses consistent hashing to evenly split, the
* number of keys assigned to each consumer.
* This is a consumer selector using consistent hashing to evenly split
* the number of keys assigned to each consumer.
*/
public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
// use NUL character as field separator for hash key calculation
Expand All @@ -47,14 +43,22 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
private final ConsumerNameIndexTracker consumerNameIndexTracker = new ConsumerNameIndexTracker();

private final int numberOfPoints;
private final Range keyHashRange;
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
this(numberOfPoints, DEFAULT_RANGE_SIZE - 1);
}

public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints, int rangeMaxValue) {
this.hashRing = new TreeMap<>();
this.numberOfPoints = numberOfPoints;
this.keyHashRange = Range.of(STICKY_KEY_HASH_NOT_SET + 1, rangeMaxValue);
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
}

@Override
public CompletableFuture<Void> addConsumer(Consumer consumer) {
public CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -72,7 +76,11 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
}
}
return CompletableFuture.completedFuture(null);
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return CompletableFuture.completedFuture(impactedConsumers);
} finally {
rwLock.writeLock().unlock();
}
Expand All @@ -88,14 +96,14 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
* @param hashRingPointIndex the index of the hash ring point
* @return the hash value
*/
private static int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
private int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
int hashRingPointIndex) {
String key = consumer.consumerName() + KEY_SEPARATOR + consumerNameIndex + KEY_SEPARATOR + hashRingPointIndex;
return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
return makeStickyKeyHash(key.getBytes());
}

@Override
public void removeConsumer(Consumer consumer) {
public ImpactedConsumersResult removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
Expand All @@ -109,6 +117,11 @@ public void removeConsumer(Consumer consumer) {
}
}
}
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
ImpactedConsumersResult impactedConsumers =
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
consumerHashAssignmentsSnapshot = assignmentsAfter;
return impactedConsumers;
} finally {
rwLock.writeLock().unlock();
}
Expand All @@ -134,32 +147,58 @@ public Consumer select(int hash) {
}

@Override
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new IdentityHashMap<>();
public Range getKeyHashRange() {
return keyHashRange;
}

@Override
public ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
return result;
}
int start = 0;
int lastKey = 0;
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().consumer;
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
lastKey = entry.getKey();
start = lastKey + 1;
}
// Handle wrap-around in the hash ring, the first consumer will also contain the range from the last key
// to the maximum value of the hash range
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
List<Range> ranges = result.get(firstConsumer);
if (lastKey != Integer.MAX_VALUE - 1) {
ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
}
return consumerHashAssignmentsSnapshot;
} finally {
rwLock.readLock().unlock();
}
return result;
}

private ConsumerHashAssignmentsSnapshot internalGetConsumerHashAssignmentsSnapshot() {
if (hashRing.isEmpty()) {
return ConsumerHashAssignmentsSnapshot.empty();
}
List<HashRangeAssignment> result = new ArrayList<>();
int start = getKeyHashRange().getStart();
int lastKey = -1;
Consumer previousConsumer = null;
Range previousRange = null;
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().consumer;
Range range;
if (consumer == previousConsumer) {
// join ranges
result.remove(result.size() - 1);
range = Range.of(previousRange.getStart(), entry.getKey());
} else {
range = Range.of(start, entry.getKey());
}
result.add(new HashRangeAssignment(range, consumer));
lastKey = entry.getKey();
start = lastKey + 1;
previousConsumer = consumer;
previousRange = range;
}
// Handle wrap-around
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
if (lastKey != getKeyHashRange().getEnd()) {
Range range;
if (firstConsumer == previousConsumer && previousRange.getEnd() == lastKey) {
// join ranges
result.remove(result.size() - 1);
range = Range.of(previousRange.getStart(), getKeyHashRange().getEnd());
} else {
range = Range.of(lastKey + 1, getKeyHashRange().getEnd());
}
result.add(new HashRangeAssignment(range, firstConsumer));
}
return ConsumerHashAssignmentsSnapshot.of(result);
}
}
Loading

0 comments on commit 3d0625b

Please sign in to comment.