diff --git a/mantis-network/build.gradle b/mantis-network/build.gradle index 006f068ed..314091388 100644 --- a/mantis-network/build.gradle +++ b/mantis-network/build.gradle @@ -22,10 +22,11 @@ ext { dependencies { api "io.netty:netty-handler:$nettyVersion" implementation "io.mantisrx:mql-jvm:$mqlVersion" + implementation "net.openhft:zero-allocation-hashing:0.+" api project(':mantis-common') compileOnly libraries.spectatorApi testImplementation libraries.spectatorApi - + testImplementation libraries.junitJupiter testImplementation libraries.mockitoCore testImplementation libraries.slf4jLog4j12 diff --git a/mantis-network/dependencies.lock b/mantis-network/dependencies.lock index 49a9bc57a..36815288a 100644 --- a/mantis-network/dependencies.lock +++ b/mantis-network/dependencies.lock @@ -4,31 +4,6 @@ "locked": "1.18.20" } }, - "baseline-exact-dependencies-main": { - "io.mantisrx:mql-jvm": { - "locked": "3.4.0" - } - }, - "baseline-exact-dependencies-test": { - "com.netflix.spectator:spectator-api": { - "locked": "1.3.10" - }, - "org.junit.jupiter:junit-jupiter-api": { - "locked": "5.4.2" - }, - "org.junit.jupiter:junit-jupiter-engine": { - "locked": "5.4.2" - }, - "org.junit.jupiter:junit-jupiter-params": { - "locked": "5.4.2" - }, - "org.mockito:mockito-core": { - "locked": "2.0.111-beta" - }, - "org.slf4j:slf4j-log4j12": { - "locked": "1.7.0" - } - }, "compileClasspath": { "com.google.code.findbugs:jsr305": { "firstLevelTransitive": [ @@ -91,6 +66,9 @@ ], "locked": "1.3.8" }, + "net.openhft:zero-allocation-hashing": { + "locked": "0.26ea0" + }, "org.jctools:jctools-core": { "firstLevelTransitive": [ "io.mantisrx:mantis-common" @@ -116,7 +94,7 @@ "firstLevelTransitive": [ "io.mantisrx:mantis-common" ], - "locked": "1.1.10.5" + "locked": "1.1.10.7" } }, "lombok": { @@ -201,6 +179,9 @@ ], "locked": "1.0" }, + "net.openhft:zero-allocation-hashing": { + "locked": "0.26ea0" + }, "org.jctools:jctools-core": { "firstLevelTransitive": [ "io.mantisrx:mantis-common" @@ -223,7 +204,7 @@ "firstLevelTransitive": [ "io.mantisrx:mantis-common" ], - "locked": "1.1.10.5" + "locked": "1.1.10.7" } }, "testAnnotationProcessor": { @@ -293,6 +274,9 @@ ], "locked": "1.3.8" }, + "net.openhft:zero-allocation-hashing": { + "locked": "0.26ea0" + }, "org.jctools:jctools-core": { "firstLevelTransitive": [ "io.mantisrx:mantis-common" @@ -330,7 +314,7 @@ "firstLevelTransitive": [ "io.mantisrx:mantis-common" ], - "locked": "1.1.10.5" + "locked": "1.1.10.7" } }, "testRuntimeClasspath": { @@ -413,6 +397,9 @@ ], "locked": "1.0" }, + "net.openhft:zero-allocation-hashing": { + "locked": "0.26ea0" + }, "org.jctools:jctools-core": { "firstLevelTransitive": [ "io.mantisrx:mantis-common" @@ -447,7 +434,7 @@ "firstLevelTransitive": [ "io.mantisrx:mantis-common" ], - "locked": "1.1.10.5" + "locked": "1.1.10.7" } } } \ No newline at end of file diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java index 92a31adfb..4a90ab476 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java @@ -107,6 +107,9 @@ private void computeRing(Set>> connections) { } byte[] connectionBytes = (connectionId + "-" + i).getBytes(); long hash = hashFunction.computeHash(connectionBytes); + if (ring.containsKey(hash)) { + logger.error("Hash collision when computing ring. {} hashed to a value already in the ring.", connectionId + "-" + i); + } ring.put(hash, connection); } } @@ -114,17 +117,18 @@ private void computeRing(Set>> connections) { } private SortedMap>> hashConnections(Set>> connections) { + SnapshotCache>>> cache = cachedRingRef.get(); if (cache == null) { - logger.info("Recomputing ring due null reference"); + logger.info("Recomputing ring due to null reference"); computeRing(connections); } else { SortedMap>> cachedRing = cache.getCache(); // determine if need to recompute cache if (cachedRing.size() != (connections.size() * connectionRepetitionOnRing)) { // number of connections not equal - logger.info("Recomputing ring due to difference in number of connections versus cache"); + logger.info("Recomputing ring due to difference in number of connections ({}) versus cache size ({}).", connections.size() * connectionRepetitionOnRing, cachedRing.size()); computeRing(connections); } else { // number of connections equal, check timestamp diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/HashFunctions.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/HashFunctions.java index d8657b57e..823b1ac77 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/HashFunctions.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/HashFunctions.java @@ -18,6 +18,7 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import net.openhft.hashing.LongHashFunction; public class HashFunctions { @@ -37,6 +38,10 @@ public long computeHash(byte[] keyBytes) { }; } + public static HashFunction xxh3() { + return bytes -> LongHashFunction.xx3().hashBytes(bytes); + } + public static byte[] computeMd5(byte[] keyBytes) { MessageDigest md5; try { diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java index 14d47224c..d22225343 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/Routers.java @@ -45,7 +45,7 @@ public byte[] call(KeyValuePair kvp) { .put(valueBytes) // value bytes .array(); } - }, HashFunctions.ketama()); + }, HashFunctions.xxh3()); } private static byte[] dataPayload(byte[] data) { diff --git a/mantis-network/src/test/java/io/reactivex/mantis/network/push/ConsistentHashingRouterTest.java b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ConsistentHashingRouterTest.java new file mode 100644 index 000000000..8701d94c1 --- /dev/null +++ b/mantis-network/src/test/java/io/reactivex/mantis/network/push/ConsistentHashingRouterTest.java @@ -0,0 +1,113 @@ +/* + * Copyright 2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.mantis.network.push; + +import org.junit.jupiter.api.Test; +import rx.subjects.PublishSubject; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.*; + +public class ConsistentHashingRouterTest { + + @Test + public void shouldNotHaveHashCollisionsUsingKetamaExtended() { + final AtomicLong hashInvocationCounter = new AtomicLong(0); + HashFunction instrumentedKetama = bytes -> { + hashInvocationCounter.getAndIncrement(); + return HashFunctions.xxh3().computeHash(bytes); + }; + + ConsistentHashingRouter router = new ConsistentHashingRouter<>("test-router", x -> x.getKeyBytes(), instrumentedKetama); + + PublishSubject> subj = PublishSubject.create(); + Set>> connections = generateStageToStageSlots("2", 40, 2).stream() + .map(slot -> new AsyncConnection>("fakehost", 123456, slot, slot, "test-group", subj, x -> true)) + .collect(Collectors.toSet()); + + List> data = new ArrayList<>(); + data.add(new KeyValuePair<>(12345, "12345".getBytes(), "test-value")); + + router.route(connections, data); + router.route(connections, data); + + // We should perform 1000 hashes per connection, and used a cached value on subsequent calls + assertEquals(connections.size() * 1000, hashInvocationCounter.get()); + } + + @Test + public void shouldNotHaveHashCollisionsStageToStage() { + + int numberOfRingEntriesPerSlot = 1000; + List ringEntries = generateStageToStageSlots("2", 40, 2) + .stream() + .flatMap(slot -> IntStream.range(0, numberOfRingEntriesPerSlot) + .boxed() + .map(entryNum -> slot + "-" + entryNum)) + .collect(Collectors.toList()); + + HashFunction hashFunction = HashFunctions.xxh3(); + + Set ring = new HashSet<>(); + ringEntries.stream().forEach(entry -> ring.add(hashFunction.computeHash(entry.getBytes()))); + + assertEquals(ringEntries.size(), ring.size()); + } + + @Test + public void shouldNotHaveHashCollisionsLargeJob() { + + int numberOfRingEntriesPerSlot = 1000; + List ringEntries = generateStageToStageSlots("2", 500, 4) + .stream() + .flatMap(slot -> IntStream.range(0, numberOfRingEntriesPerSlot) + .boxed() + .map(entryNum -> slot + "-" + entryNum)) + .collect(Collectors.toList()); + + HashFunction hashFunction = HashFunctions.xxh3(); + + Set ring = new HashSet<>(); + ringEntries.stream().forEach(entry -> ring.add(hashFunction.computeHash(entry.getBytes()))); + + assertEquals(ringEntries.size(), ring.size()); + } + + /** + * Generates slot ids that look the same as those that come from mantis stages. + * Example: stage_2_index_38_partition_1 + * @param stage The stage number as a string. + * @param indices The number of indices. Typically the number of workers in said stage. + * @param partitions The number of partitions the stage uses when connecting upstream. + * @return A List of slotId / AsyncConnection id values for use in testing. + */ + private List generateStageToStageSlots(String stage, int indices, int partitions) { + return IntStream.range(0, indices).boxed().map(index -> "stage_" + stage + "_index_" + index) + .flatMap(prefix -> IntStream.range(1, partitions+1).boxed().map(partition -> prefix + "_partition_" + partition)) + .collect(Collectors.toList()); + } +}