Skip to content

Commit

Permalink
ConsistentHashingRouter: Use XXH3 instead of 4 byte ketama for consis… (
Browse files Browse the repository at this point in the history
#715)

* ConsistentHashingRouter: Use XXH3 instead of 4 byte ketama for consistent hashing

* Add test for 500 workers with 4 partitions

* Remove ketamaExtended and unpin zero-allocation-hashing dep version
  • Loading branch information
crioux-stripe authored Oct 2, 2024
1 parent a59f5f6 commit fe13f9b
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 33 deletions.
3 changes: 2 additions & 1 deletion mantis-network/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ ext {
dependencies {
api "io.netty:netty-handler:$nettyVersion"
implementation "io.mantisrx:mql-jvm:$mqlVersion"
implementation "net.openhft:zero-allocation-hashing:0.+"
api project(':mantis-common')
compileOnly libraries.spectatorApi
testImplementation libraries.spectatorApi

testImplementation libraries.junitJupiter
testImplementation libraries.mockitoCore
testImplementation libraries.slf4jLog4j12
Expand Down
45 changes: 16 additions & 29 deletions mantis-network/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,6 @@
"locked": "1.18.20"
}
},
"baseline-exact-dependencies-main": {
"io.mantisrx:mql-jvm": {
"locked": "3.4.0"
}
},
"baseline-exact-dependencies-test": {
"com.netflix.spectator:spectator-api": {
"locked": "1.3.10"
},
"org.junit.jupiter:junit-jupiter-api": {
"locked": "5.4.2"
},
"org.junit.jupiter:junit-jupiter-engine": {
"locked": "5.4.2"
},
"org.junit.jupiter:junit-jupiter-params": {
"locked": "5.4.2"
},
"org.mockito:mockito-core": {
"locked": "2.0.111-beta"
},
"org.slf4j:slf4j-log4j12": {
"locked": "1.7.0"
}
},
"compileClasspath": {
"com.google.code.findbugs:jsr305": {
"firstLevelTransitive": [
Expand Down Expand Up @@ -91,6 +66,9 @@
],
"locked": "1.3.8"
},
"net.openhft:zero-allocation-hashing": {
"locked": "0.26ea0"
},
"org.jctools:jctools-core": {
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
Expand All @@ -116,7 +94,7 @@
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
],
"locked": "1.1.10.5"
"locked": "1.1.10.7"
}
},
"lombok": {
Expand Down Expand Up @@ -201,6 +179,9 @@
],
"locked": "1.0"
},
"net.openhft:zero-allocation-hashing": {
"locked": "0.26ea0"
},
"org.jctools:jctools-core": {
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
Expand All @@ -223,7 +204,7 @@
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
],
"locked": "1.1.10.5"
"locked": "1.1.10.7"
}
},
"testAnnotationProcessor": {
Expand Down Expand Up @@ -293,6 +274,9 @@
],
"locked": "1.3.8"
},
"net.openhft:zero-allocation-hashing": {
"locked": "0.26ea0"
},
"org.jctools:jctools-core": {
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
Expand Down Expand Up @@ -330,7 +314,7 @@
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
],
"locked": "1.1.10.5"
"locked": "1.1.10.7"
}
},
"testRuntimeClasspath": {
Expand Down Expand Up @@ -413,6 +397,9 @@
],
"locked": "1.0"
},
"net.openhft:zero-allocation-hashing": {
"locked": "0.26ea0"
},
"org.jctools:jctools-core": {
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
Expand Down Expand Up @@ -447,7 +434,7 @@
"firstLevelTransitive": [
"io.mantisrx:mantis-common"
],
"locked": "1.1.10.5"
"locked": "1.1.10.7"
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,28 @@ private void computeRing(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {
}
byte[] connectionBytes = (connectionId + "-" + i).getBytes();
long hash = hashFunction.computeHash(connectionBytes);
if (ring.containsKey(hash)) {
logger.error("Hash collision when computing ring. {} hashed to a value already in the ring.", connectionId + "-" + i);
}
ring.put(hash, connection);
}
}
cachedRingRef.set(new SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>>(ring));
}

private SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> hashConnections(Set<AsyncConnection<KeyValuePair<K, V>>> connections) {

SnapshotCache<SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>>> cache = cachedRingRef.get();

if (cache == null) {
logger.info("Recomputing ring due null reference");
logger.info("Recomputing ring due to null reference");
computeRing(connections);
} else {
SortedMap<Long, AsyncConnection<KeyValuePair<K, V>>> cachedRing = cache.getCache();
// determine if need to recompute cache
if (cachedRing.size() != (connections.size() * connectionRepetitionOnRing)) {
// number of connections not equal
logger.info("Recomputing ring due to difference in number of connections versus cache");
logger.info("Recomputing ring due to difference in number of connections ({}) versus cache size ({}).", connections.size() * connectionRepetitionOnRing, cachedRing.size());
computeRing(connections);
} else {
// number of connections equal, check timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import net.openhft.hashing.LongHashFunction;


public class HashFunctions {
Expand All @@ -37,6 +38,10 @@ public long computeHash(byte[] keyBytes) {
};
}

public static HashFunction xxh3() {
return bytes -> LongHashFunction.xx3().hashBytes(bytes);
}

public static byte[] computeMd5(byte[] keyBytes) {
MessageDigest md5;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public byte[] call(KeyValuePair<K, V> kvp) {
.put(valueBytes) // value bytes
.array();
}
}, HashFunctions.ketama());
}, HashFunctions.xxh3());
}

private static byte[] dataPayload(byte[] data) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.reactivex.mantis.network.push;

import org.junit.jupiter.api.Test;
import rx.subjects.PublishSubject;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.*;

public class ConsistentHashingRouterTest {

@Test
public void shouldNotHaveHashCollisionsUsingKetamaExtended() {
final AtomicLong hashInvocationCounter = new AtomicLong(0);
HashFunction instrumentedKetama = bytes -> {
hashInvocationCounter.getAndIncrement();
return HashFunctions.xxh3().computeHash(bytes);
};

ConsistentHashingRouter<String, String> router = new ConsistentHashingRouter<>("test-router", x -> x.getKeyBytes(), instrumentedKetama);

PublishSubject<List<byte[]>> subj = PublishSubject.create();
Set<AsyncConnection<KeyValuePair<String, String>>> connections = generateStageToStageSlots("2", 40, 2).stream()
.map(slot -> new AsyncConnection<KeyValuePair<String, String>>("fakehost", 123456, slot, slot, "test-group", subj, x -> true))
.collect(Collectors.toSet());

List<KeyValuePair<String, String>> data = new ArrayList<>();
data.add(new KeyValuePair<>(12345, "12345".getBytes(), "test-value"));

router.route(connections, data);
router.route(connections, data);

// We should perform 1000 hashes per connection, and used a cached value on subsequent calls
assertEquals(connections.size() * 1000, hashInvocationCounter.get());
}

@Test
public void shouldNotHaveHashCollisionsStageToStage() {

int numberOfRingEntriesPerSlot = 1000;
List<String> ringEntries = generateStageToStageSlots("2", 40, 2)
.stream()
.flatMap(slot -> IntStream.range(0, numberOfRingEntriesPerSlot)
.boxed()
.map(entryNum -> slot + "-" + entryNum))
.collect(Collectors.toList());

HashFunction hashFunction = HashFunctions.xxh3();

Set<Long> ring = new HashSet<>();
ringEntries.stream().forEach(entry -> ring.add(hashFunction.computeHash(entry.getBytes())));

assertEquals(ringEntries.size(), ring.size());
}

@Test
public void shouldNotHaveHashCollisionsLargeJob() {

int numberOfRingEntriesPerSlot = 1000;
List<String> ringEntries = generateStageToStageSlots("2", 500, 4)
.stream()
.flatMap(slot -> IntStream.range(0, numberOfRingEntriesPerSlot)
.boxed()
.map(entryNum -> slot + "-" + entryNum))
.collect(Collectors.toList());

HashFunction hashFunction = HashFunctions.xxh3();

Set<Long> ring = new HashSet<>();
ringEntries.stream().forEach(entry -> ring.add(hashFunction.computeHash(entry.getBytes())));

assertEquals(ringEntries.size(), ring.size());
}

/**
* Generates slot ids that look the same as those that come from mantis stages.
* Example: stage_2_index_38_partition_1
* @param stage The stage number as a string.
* @param indices The number of indices. Typically the number of workers in said stage.
* @param partitions The number of partitions the stage uses when connecting upstream.
* @return A List of slotId / AsyncConnection id values for use in testing.
*/
private List<String> generateStageToStageSlots(String stage, int indices, int partitions) {
return IntStream.range(0, indices).boxed().map(index -> "stage_" + stage + "_index_" + index)
.flatMap(prefix -> IntStream.range(1, partitions+1).boxed().map(partition -> prefix + "_partition_" + partition))
.collect(Collectors.toList());
}
}

0 comments on commit fe13f9b

Please sign in to comment.