Skip to content

Commit

Permalink
[fix][broker] Fix out-of-order issues with ConsistentHashingStickyKey…
Browse files Browse the repository at this point in the history
…ConsumerSelector (apache#23327)
  • Loading branch information
lhotari authored Oct 2, 2024
1 parent c41c7e9 commit adb9014
Show file tree
Hide file tree
Showing 8 changed files with 853 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.apache.pulsar.broker.service;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
Expand All @@ -44,7 +42,9 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

// Consistent-Hash ring
private final NavigableMap<Integer, List<Consumer>> hashRing;
private final NavigableMap<Integer, ConsumerIdentityWrapper> hashRing;
// Tracks the used consumer name indexes for each consumer name
private final ConsumerNameIndexTracker consumerNameIndexTracker = new ConsumerNameIndexTracker();

private final int numberOfPoints;

Expand All @@ -57,51 +57,57 @@ public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
public CompletableFuture<Void> addConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
// Insert multiple points on the hash ring for every consumer
// The points are deterministically added based on the hash of the consumer name
for (int i = 0; i < numberOfPoints; i++) {
int hash = calculateHashForConsumerAndIndex(consumer, i);
hashRing.compute(hash, (k, v) -> {
if (v == null) {
return Lists.newArrayList(consumer);
} else {
if (!v.contains(consumer)) {
v.add(consumer);
v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
}
return v;
}
});
int consumerNameIndex =
consumerNameIndexTracker.increaseConsumerRefCountAndReturnIndex(consumerIdentityWrapper);
int hash = calculateHashForConsumerAndIndex(consumer, consumerNameIndex, i);
// When there's a collision, the new consumer will replace the old one.
// This is a rare case, and it is acceptable to replace the old consumer since there
// are multiple points for each consumer. This won't affect the overall distribution significantly.
ConsumerIdentityWrapper removed = hashRing.put(hash, consumerIdentityWrapper);
if (removed != null) {
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
}
}
return CompletableFuture.completedFuture(null);
} finally {
rwLock.writeLock().unlock();
}
}

private static int calculateHashForConsumerAndIndex(Consumer consumer, int index) {
String key = consumer.consumerName() + KEY_SEPARATOR + index;
/**
* Calculate the hash for a consumer and hash ring point.
* The hash is calculated based on the consumer name, consumer name index, and hash ring point index.
* The resulting hash is used as the key to insert the consumer into the hash ring.
*
* @param consumer the consumer
* @param consumerNameIndex the index of the consumer name
* @param hashRingPointIndex the index of the hash ring point
* @return the hash value
*/
private static int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
int hashRingPointIndex) {
String key = consumer.consumerName() + KEY_SEPARATOR + consumerNameIndex + KEY_SEPARATOR + hashRingPointIndex;
return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
}

@Override
public void removeConsumer(Consumer consumer) {
rwLock.writeLock().lock();
try {
// Remove all the points that were added for this consumer
for (int i = 0; i < numberOfPoints; i++) {
int hash = calculateHashForConsumerAndIndex(consumer, i);
hashRing.compute(hash, (k, v) -> {
if (v == null) {
return null;
} else {
v.removeIf(c -> c.equals(consumer));
if (v.isEmpty()) {
v = null;
}
return v;
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
int consumerNameIndex = consumerNameIndexTracker.getTrackedIndex(consumerIdentityWrapper);
if (consumerNameIndex > -1) {
// Remove all the points that were added for this consumer
for (int i = 0; i < numberOfPoints; i++) {
int hash = calculateHashForConsumerAndIndex(consumer, consumerNameIndex, i);
if (hashRing.remove(hash, consumerIdentityWrapper)) {
consumerNameIndexTracker.decreaseConsumerRefCount(consumerIdentityWrapper);
}
});
}
}
} finally {
rwLock.writeLock().unlock();
Expand All @@ -115,33 +121,41 @@ public Consumer select(int hash) {
if (hashRing.isEmpty()) {
return null;
}

List<Consumer> consumerList;
Map.Entry<Integer, List<Consumer>> ceilingEntry = hashRing.ceilingEntry(hash);
Map.Entry<Integer, ConsumerIdentityWrapper> ceilingEntry = hashRing.ceilingEntry(hash);
if (ceilingEntry != null) {
consumerList = ceilingEntry.getValue();
return ceilingEntry.getValue().consumer;
} else {
consumerList = hashRing.firstEntry().getValue();
// Handle wrap-around in the hash ring, return the first consumer
return hashRing.firstEntry().getValue().consumer;
}

return consumerList.get(hash % consumerList.size());
} finally {
rwLock.readLock().unlock();
}
}

@Override
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new LinkedHashMap<>();
Map<Consumer, List<Range>> result = new IdentityHashMap<>();
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
return result;
}
int start = 0;
for (Map.Entry<Integer, List<Consumer>> entry: hashRing.entrySet()) {
for (Consumer consumer: entry.getValue()) {
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
}
start = entry.getKey() + 1;
int lastKey = 0;
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().consumer;
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
lastKey = entry.getKey();
start = lastKey + 1;
}
// Handle wrap-around in the hash ring, the first consumer will also contain the range from the last key
// to the maximum value of the hash range
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
List<Range> ranges = result.get(firstConsumer);
if (lastKey != Integer.MAX_VALUE - 1) {
ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
}
} finally {
rwLock.readLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

/**
* A wrapper class for a Consumer instance that provides custom implementations
* of equals and hashCode methods. The equals method returns true if and only if
* the compared instance is the same instance.
*
* <p>The reason for this class is the custom implementation of {@link Consumer#equals(Object)}.
* Using this wrapper class will be useful in use cases where it's necessary to match a key
* in a map by instance or a value in a set by instance.</p>
*/
class ConsumerIdentityWrapper {
final Consumer consumer;

public ConsumerIdentityWrapper(Consumer consumer) {
this.consumer = consumer;
}

/**
* Compares this wrapper to the specified object. The result is true if and only if
* the argument is not null and is a ConsumerIdentityWrapper object that wraps
* the same Consumer instance.
*
* @param obj the object to compare this ConsumerIdentityWrapper against
* @return true if the given object represents a ConsumerIdentityWrapper
* equivalent to this wrapper, false otherwise
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof ConsumerIdentityWrapper) {
ConsumerIdentityWrapper other = (ConsumerIdentityWrapper) obj;
return consumer == other.consumer;
}
return false;
}

/**
* Returns a hash code for this wrapper. The hash code is computed based on
* the wrapped Consumer instance.
*
* @return a hash code value for this object
*/
@Override
public int hashCode() {
return consumer.hashCode();
}

@Override
public String toString() {
return consumer.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.lang3.mutable.MutableInt;
import org.roaringbitmap.RoaringBitmap;

/**
* Tracks the used consumer name indexes for each consumer name.
* This is used by {@link ConsistentHashingStickyKeyConsumerSelector} to get a unique "consumer name index"
* for each consumer name. It is useful when there are multiple consumers with the same name, but they are
* different consumers. The purpose of the index is to prevent collisions in the hash ring.
*
* The consumer name index serves as an additional key for the hash ring assignment. The logic keeps track of
* used "index slots" for each consumer name and assigns the first unused index when a new consumer is added.
* This approach minimizes hash collisions due to using the same consumer name.
*
* An added benefit of this tracking approach is that a consumer that leaves and then rejoins immediately will get the
* same index and therefore the same assignments in the hash ring. This improves stability since the hash assignment
* changes are minimized over time, although a better solution would be to avoid reusing the same consumer name
* in the first place.
*
* When a consumer is removed, the index is deallocated. RoaringBitmap is used to keep track of the used indexes.
* The data structure to track a consumer name is removed when the reference count of the consumer name is zero.
*
* This class is not thread-safe and should be used in a synchronized context in the caller.
*/
@NotThreadSafe
class ConsumerNameIndexTracker {
// tracks the used index slots for each consumer name
private final Map<String, ConsumerNameIndexSlots> consumerNameIndexSlotsMap = new HashMap<>();
// tracks the active consumer entries
private final Map<ConsumerIdentityWrapper, ConsumerEntry> consumerEntries = new HashMap<>();

// Represents a consumer entry in the tracker, including the consumer name, index, and reference count.
record ConsumerEntry(String consumerName, int nameIndex, MutableInt refCount) {
}

/*
* Tracks the used indexes for a consumer name using a RoaringBitmap.
* A specific index slot is used when the bit is set.
* When all bits are cleared, the customer name can be removed from tracking.
*/
static class ConsumerNameIndexSlots {
private RoaringBitmap indexSlots = new RoaringBitmap();

public int allocateIndexSlot() {
// find the first index that is not set, if there is no such index, add a new one
int index = (int) indexSlots.nextAbsentValue(0);
if (index == -1) {
index = indexSlots.getCardinality();
}
indexSlots.add(index);
return index;
}

public boolean deallocateIndexSlot(int index) {
indexSlots.remove(index);
return indexSlots.isEmpty();
}
}

/*
* Adds a reference to the consumer and returns the index assigned to this consumer.
*/
public int increaseConsumerRefCountAndReturnIndex(ConsumerIdentityWrapper wrapper) {
ConsumerEntry entry = consumerEntries.computeIfAbsent(wrapper, k -> {
String consumerName = wrapper.consumer.consumerName();
return new ConsumerEntry(consumerName, allocateConsumerNameIndex(consumerName), new MutableInt(0));
});
entry.refCount.increment();
return entry.nameIndex;
}

private int allocateConsumerNameIndex(String consumerName) {
return getConsumerNameIndexBitmap(consumerName).allocateIndexSlot();
}

private ConsumerNameIndexSlots getConsumerNameIndexBitmap(String consumerName) {
return consumerNameIndexSlotsMap.computeIfAbsent(consumerName, k -> new ConsumerNameIndexSlots());
}

/*
* Decreases the reference count of the consumer and removes the consumer name from tracking if the ref count is
* zero.
*/
public void decreaseConsumerRefCount(ConsumerIdentityWrapper removed) {
ConsumerEntry consumerEntry = consumerEntries.get(removed);
int refCount = consumerEntry.refCount.decrementAndGet();
if (refCount == 0) {
deallocateConsumerNameIndex(consumerEntry.consumerName, consumerEntry.nameIndex);
consumerEntries.remove(removed, consumerEntry);
}
}

private void deallocateConsumerNameIndex(String consumerName, int index) {
if (getConsumerNameIndexBitmap(consumerName).deallocateIndexSlot(index)) {
consumerNameIndexSlotsMap.remove(consumerName);
}
}

/*
* Returns the currently tracked index for the consumer.
*/
public int getTrackedIndex(ConsumerIdentityWrapper wrapper) {
ConsumerEntry consumerEntry = consumerEntries.get(wrapper);
return consumerEntry != null ? consumerEntry.nameIndex : -1;
}

int getTrackedConsumerNamesCount() {
return consumerNameIndexSlotsMap.size();
}

int getTrackedConsumersCount() {
return consumerEntries.size();
}
}
Loading

0 comments on commit adb9014

Please sign in to comment.