From 9ecbd300d3f80156ff236da8912e4c9c1e2577fc Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Tue, 19 Dec 2023 22:29:35 +0800 Subject: [PATCH] [opt](task-assignment) use consistent hash as default task assigner and cache the consistent hash ring (#28522) 1. Use consistent hash algo as the default assigner for file query scan node A consistent assignment can better utilize the page cache of BE node. 2. Cache the consistent hash ring Init a consistent hash ring is time-consuming because there a thousands of virtual node need to be added. So cache it for better performance --- .../external/FederationBackendPolicy.java | 63 +++++++++++++++++-- .../planner/external/FileQueryScanNode.java | 11 ++-- .../planner/FederationBackendPolicyTest.java | 45 ++++++++++++- 3 files changed, 107 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java index 9e23463235f2d3..d1d7e90d35ad16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java @@ -30,6 +30,9 @@ import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Preconditions; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -46,7 +49,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; public class FederationBackendPolicy { @@ -59,6 +64,53 @@ public class FederationBackendPolicy { private int nextBe = 0; private boolean initialized = false; + // Create a ConsistentHash ring may be a time-consuming operation, so we cache it. + private static LoadingCache> consistentHashCache; + + static { + consistentHashCache = CacheBuilder.newBuilder().maximumSize(5) + .build(new CacheLoader>() { + @Override + public ConsistentHash load(HashCacheKey key) { + return new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), + new BackendHash(), key.bes, Config.virtual_node_number); + } + }); + } + + private static class HashCacheKey { + // sorted backend ids as key + private List beIds; + // backends is not part of key, just an attachment + private List bes; + + HashCacheKey(List backends) { + this.bes = backends; + this.beIds = backends.stream().map(b -> b.getId()).sorted().collect(Collectors.toList()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof HashCacheKey)) { + return false; + } + return Objects.equals(beIds, ((HashCacheKey) obj).beIds); + } + + @Override + public int hashCode() { + return Objects.hash(beIds); + } + + @Override + public String toString() { + return "HashCache{" + "beIds=" + beIds + '}'; + } + } + public void init() throws UserException { if (!initialized) { init(Collections.emptyList()); @@ -96,8 +148,11 @@ public void init(BeSelectionPolicy policy) throws UserException { throw new UserException("No available backends"); } backendMap.putAll(backends.stream().collect(Collectors.groupingBy(Backend::getHost))); - consistentHash = new ConsistentHash<>(Hashing.murmur3_128(), new ScanRangeHash(), - new BackendHash(), backends, Config.virtual_node_number); + try { + consistentHash = consistentHashCache.get(new HashCacheKey(backends)); + } catch (ExecutionException e) { + throw new UserException("failed to get consistent hash", e); + } } public Backend getNextBe() { @@ -111,7 +166,7 @@ public Backend getNextConsistentBe(TScanRangeLocations scanRangeLocations) { } // Try to find a local BE, if not exists, use `getNextBe` instead - public Backend getNextLocalBe(List hosts) { + public Backend getNextLocalBe(List hosts, TScanRangeLocations scanRangeLocations) { List candidateBackends = Lists.newArrayListWithCapacity(hosts.size()); for (String host : hosts) { List backends = backendMap.get(host); @@ -121,7 +176,7 @@ public Backend getNextLocalBe(List hosts) { } return CollectionUtils.isEmpty(candidateBackends) - ? getNextBe() + ? getNextConsistentBe(scanRangeLocations) : candidateBackends.get(random.nextInt(candidateBackends.size())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index 85245de1020ddd..e11299df876364 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -288,7 +288,6 @@ public void createScanRangeLocations() throws UserException { params.setProperties(locationProperties); } - boolean enableSqlCache = ConnectContext.get().getSessionVariable().enableFileCache; boolean enableShortCircuitRead = HdfsResource.enableShortCircuitRead(locationProperties); List pathPartitionKeys = getPathPartitionKeys(); for (Split split : inputSplits) { @@ -346,14 +345,12 @@ public void createScanRangeLocations() throws UserException { curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); TScanRangeLocation location = new TScanRangeLocation(); Backend selectedBackend; - if (enableSqlCache) { - // Use consistent hash to assign the same scan range into the same backend among different queries - selectedBackend = backendPolicy.getNextConsistentBe(curLocations); - } else if (enableShortCircuitRead) { + if (enableShortCircuitRead) { // Try to find a local BE if enable hdfs short circuit read - selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts())); + selectedBackend = backendPolicy.getNextLocalBe(Arrays.asList(fileSplit.getHosts()), curLocations); } else { - selectedBackend = backendPolicy.getNextBe(); + // Use consistent hash to assign the same scan range into the same backend among different queries + selectedBackend = backendPolicy.getNextConsistentBe(curLocations); } setLocationPropertiesIfNecessary(selectedBackend, locationType, locationProperties); location.setBackendId(selectedBackend.getId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java index 05a7f6985df381..ef65d1b6165d92 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/FederationBackendPolicyTest.java @@ -22,6 +22,11 @@ import org.apache.doris.planner.external.FederationBackendPolicy; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TExternalScanRange; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileScanRange; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.Stopwatch; import mockit.Mock; @@ -93,12 +98,50 @@ public void testGetNextLocalBe() throws UserException { int invokeTimes = 1000000; Assertions.assertEquals(policy.numBackends(), backendNum); List localHosts = Arrays.asList("192.168.1.0", "192.168.1.1", "192.168.1.2"); + TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); Stopwatch sw = Stopwatch.createStarted(); for (int i = 0; i < invokeTimes; i++) { - Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts).getHost())); + Assertions.assertTrue(localHosts.contains(policy.getNextLocalBe(localHosts, scanRangeLocations).getHost())); } sw.stop(); System.out.println("Invoke getNextLocalBe() " + invokeTimes + " times cost [" + sw.elapsed(TimeUnit.MILLISECONDS) + "] ms"); } + + @Test + public void testConsistentHash() throws UserException { + FederationBackendPolicy policy = new FederationBackendPolicy(); + policy.init(); + int backendNum = 200; + Assertions.assertEquals(policy.numBackends(), backendNum); + + TScanRangeLocations scanRangeLocations = getScanRangeLocations("path1", 0, 100); + Assertions.assertEquals(39, policy.getNextConsistentBe(scanRangeLocations).getId()); + + scanRangeLocations = getScanRangeLocations("path2", 0, 100); + Assertions.assertEquals(78, policy.getNextConsistentBe(scanRangeLocations).getId()); + } + + private TScanRangeLocations getScanRangeLocations(String path, long startOffset, long size) { + // Generate on file scan range + TFileScanRange fileScanRange = new TFileScanRange(); + // Scan range + TExternalScanRange externalScanRange = new TExternalScanRange(); + externalScanRange.setFileScanRange(fileScanRange); + TScanRange scanRange = new TScanRange(); + scanRange.setExtScanRange(externalScanRange); + scanRange.getExtScanRange().getFileScanRange().addToRanges(createRangeDesc(path, startOffset, size)); + // Locations + TScanRangeLocations locations = new TScanRangeLocations(); + locations.setScanRange(scanRange); + return locations; + } + + private TFileRangeDesc createRangeDesc(String path, long startOffset, long size) { + TFileRangeDesc rangeDesc = new TFileRangeDesc(); + rangeDesc.setPath(path); + rangeDesc.setStartOffset(startOffset); + rangeDesc.setSize(size); + return rangeDesc; + } }