From dc09b3059ba63b43d67d8f60358223a8db770144 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Sat, 23 Dec 2023 10:27:06 +0800 Subject: [PATCH 1/7] default create tablet round robin --- fe/fe-common/src/main/java/org/apache/doris/common/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 04d33fd4cc0bd0..d75095204bc054 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2063,7 +2063,7 @@ public class Config extends ConfigBase { public static boolean skip_localhost_auth_check = true; @ConfField(mutable = true) - public static boolean enable_round_robin_create_tablet = false; + public static boolean enable_round_robin_create_tablet = true; /** * To prevent different types (V1, V2, V3) of behavioral inconsistencies, From 04357796caae1b87be2ac67348cfe2e4acdb76c7 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Sat, 23 Dec 2023 15:44:58 +0800 Subject: [PATCH 2/7] refactor --- .../org/apache/doris/backup/RestoreJob.java | 3 +- .../org/apache/doris/catalog/OlapTable.java | 3 +- .../common/util/DynamicPartitionUtil.java | 13 +- .../doris/common/util/PropertyAnalyzer.java | 3 +- .../doris/datasource/InternalCatalog.java | 37 +--- .../doris/system/BeSelectionPolicy.java | 14 ++ .../doris/system/SystemInfoService.java | 194 +++++++----------- .../apache/doris/backup/RestoreJobTest.java | 10 +- .../doris/catalog/ReplicaAllocationTest.java | 3 +- .../load/sync/canal/CanalSyncDataTest.java | 2 +- .../doris/system/SystemInfoServiceTest.java | 2 +- 11 files changed, 125 insertions(+), 159 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 6d4f68e985d7b2..23ca4a335fdd90 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1113,6 +1113,7 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT long visibleVersion = remotePart.getVisibleVersion(); // tablets + Map nextIndexs = Maps.newHashMap(); for (MaterializedIndex remoteIdx : remotePart.getMaterializedIndices(IndexExtState.VISIBLE)) { int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId()); int remotetabletSize = remoteIdx.getTablets().size(); @@ -1127,7 +1128,7 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT // replicas try { Map> beIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, null, false, false); + .selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, false, false); for (Map.Entry> entry : beIds.entrySet()) { for (Long beId : entry.getValue()) { long newReplicaId = env.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 0ce2df5e6f64f4..d12dc9512b5b40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -589,6 +589,7 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore } // for each partition, reset rollup index map + Map nextIndexs = Maps.newHashMap(); for (Map.Entry entry : idToPartition.entrySet()) { Partition partition = entry.getValue(); // entry.getKey() is the new partition id, use it to get the restore specified replica allocation @@ -616,7 +617,7 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore try { Map> tag2beIds = Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation( - replicaAlloc, null, false, false); + replicaAlloc, nextIndexs, null, false, false); for (Map.Entry> entry3 : tag2beIds.entrySet()) { for (Long beId : entry3.getValue()) { long newReplicaId = env.getNextId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java index de6101181f48ef..ff2c9433057347 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java @@ -40,10 +40,12 @@ import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.policy.StoragePolicy; +import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TStorageMedium; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import com.google.common.collect.Range; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -228,7 +230,8 @@ private static void checkReplicationNum(String val, Database db) throws DdlExcep ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT, val); } ReplicaAllocation replicaAlloc = new ReplicaAllocation(Short.valueOf(val)); - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, Maps.newHashMap(), + null, false, true); } private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int hotPartitionNum, @@ -237,14 +240,16 @@ private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int h ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO); } - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true); + Map nextIndexs = Maps.newHashMap(); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, + false, true); if (hotPartitionNum <= 0) { return; } try { - Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, TStorageMedium.SSD, false, - true); + Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, + TStorageMedium.SSD, false, true); } catch (DdlException e) { throw new DdlException("Failed to find enough backend for ssd storage medium. When setting " + DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the hot partitions will store " diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index ca0310769ee277..c8c0978ca74015 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -1069,6 +1069,7 @@ private static ReplicaAllocation analyzeReplicaAllocationImpl(Map nextIndexs = Maps.newHashMap(); for (String location : locations) { String[] parts = location.split(":"); if (parts.length != 2) { @@ -1092,7 +1093,7 @@ private static ReplicaAllocation analyzeReplicaAllocationImpl(Map tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified) throws DdlException { ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); Map>> backendsPerBucketSeq = null; GroupId groupId = null; if (colocateIndex.isColocateTable(tabletMeta.getTableId())) { @@ -2757,16 +2758,13 @@ public void createTablets(MaterializedIndex index, ReplicaState replicaState, backendsPerBucketSeq = Maps.newHashMap(); } + TStorageMedium storageMedium = Config.disable_storage_medium_check ? null : tabletMeta.getStorageMedium(); Map nextIndexs = new HashMap<>(); - if (Config.enable_round_robin_create_tablet) { - for (Map.Entry entry : replicaAlloc.getAllocMap().entrySet()) { - int startPos = Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(), - tabletMeta.getStorageMedium()); - if (startPos == -1) { - throw new DdlException("The number of BEs that match the policy is insufficient"); - } - nextIndexs.put(entry.getKey(), startPos); + for (Tag tag : replicaAlloc.getAllocMap().keySet()) { + int startPos = systemInfoService.getStartPosOfRoundRobin(tag, storageMedium, + isStorageMediumSpecified); + nextIndexs.put(tag, startPos); } } @@ -2783,27 +2781,8 @@ public void createTablets(MaterializedIndex index, ReplicaState replicaState, if (chooseBackendsArbitrary) { // This is the first colocate table in the group, or just a normal table, // choose backends - if (Config.enable_round_robin_create_tablet) { - if (!Config.disable_storage_medium_check) { - chosenBackendIds = Env.getCurrentSystemInfo() - .getBeIdRoundRobinForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(), - nextIndexs); - } else { - chosenBackendIds = Env.getCurrentSystemInfo() - .getBeIdRoundRobinForReplicaCreation(replicaAlloc, null, - nextIndexs); - } - } else { - if (!Config.disable_storage_medium_check) { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(), - isStorageMediumSpecified, false); - } else { - chosenBackendIds = Env.getCurrentSystemInfo() - .selectBackendIdsForReplicaCreation(replicaAlloc, null, - isStorageMediumSpecified, false); - } - } + chosenBackendIds = systemInfoService.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, + storageMedium, isStorageMediumSpecified, false); for (Map.Entry> entry : chosenBackendIds.entrySet()) { backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 3a711307bd5595..573a1853b17621 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -51,6 +51,10 @@ public class BeSelectionPolicy { public boolean preferComputeNode = false; public int expectBeNum = 0; + public boolean enableRoundRobin = false; + // if enable round robin, choose next be from nextRoundRobinIndex + public int nextRoundRobinIndex = -1; + public List preferredLocations = new ArrayList<>(); private BeSelectionPolicy() { @@ -114,6 +118,16 @@ public Builder addPreLocations(List preferredLocations) { return this; } + public Builder setEnableRoundRobin(boolean enableRoundRobin) { + policy.enableRoundRobin = enableRoundRobin; + return this; + } + + public Builder setNextRoundRobinIndex(int nextRoundRobinIndex) { + policy.nextRoundRobinIndex = nextRoundRobinIndex; + return this; + } + public BeSelectionPolicy build() { return policy; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 3ccb6d6345642b..9fcbe73fbbc5cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -54,6 +54,7 @@ import java.io.DataInputStream; import java.io.IOException; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -415,86 +416,42 @@ public List getCnBackends() { return idToBackendRef.values().stream().filter(backend -> backend.isComputeNode()).collect(Collectors.toList()); } - class BeComparator implements Comparator { + class BeIdComparator implements Comparator { public int compare(Backend a, Backend b) { return (int) (a.getId() - b.getId()); } } - public List selectBackendIdsRoundRobinByPolicy(BeSelectionPolicy policy, int number, - int nextIndex) { - Preconditions.checkArgument(number >= -1); - List candidates = getCandidates(policy); - if (number != -1 && candidates.size() < number) { - LOG.info("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); - return Lists.newArrayList(); - } - - int realIndex = nextIndex % candidates.size(); - List partialOrderList = new ArrayList(); - partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) - .stream().map(b -> b.getId()).collect(Collectors.toList())); - partialOrderList.addAll(candidates.subList(0, realIndex) - .stream().map(b -> b.getId()).collect(Collectors.toList())); - - if (number == -1) { - return partialOrderList; - } else { - return partialOrderList.subList(0, number); - } - } - - public List getCandidates(BeSelectionPolicy policy) { - List candidates = policy.getCandidateBackends(idToBackendRef.values()); - if (candidates.isEmpty()) { - LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); - return Lists.newArrayList(); - } - - if (!policy.allowOnSameHost) { - Map> backendMaps = Maps.newHashMap(); - for (Backend backend : candidates) { - if (backendMaps.containsKey(backend.getHost())) { - backendMaps.get(backend.getHost()).add(backend); - } else { - List list = Lists.newArrayList(); - list.add(backend); - backendMaps.put(backend.getHost(), list); - } - } - candidates.clear(); - for (List list : backendMaps.values()) { - candidates.add(list.get(0)); - } - } - - if (candidates.isEmpty()) { - LOG.info("Not match policy: {}. candidates num: {}", policy, candidates.size()); - return Lists.newArrayList(); + class BeHostComparator implements Comparator { + public int compare(Backend a, Backend b) { + return a.getHost().compareTo(b.getHost()); } - - Collections.sort(candidates, new BeComparator()); - return candidates; } // Select the smallest number of tablets as the starting position of // round robin in the BE that match the policy - public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium) { + public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium, boolean isStorageMediumSpecified) { BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() - .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(tag)) + .needScheduleAvailable() + .needCheckDiskUsage() + .addTags(Sets.newHashSet(tag)) .setStorageMedium(storageMedium); if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { builder.allowOnSameHost(); } BeSelectionPolicy policy = builder.build(); - List candidates = getCandidates(policy); + List beIds = selectBackendIdsByPolicy(policy, -1); + if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified) { + storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD; + policy = builder.setStorageMedium(storageMedium).build(); + beIds = selectBackendIdsByPolicy(policy, -1); + } long minBeTabletsNum = Long.MAX_VALUE; int minIndex = -1; - for (int i = 0; i < candidates.size(); ++i) { - long tabletsNum = Env.getCurrentInvertedIndex() - .getTabletIdsByBackendId(candidates.get(i).getId()).size(); + for (int i = 0; i < beIds.size(); ++i) { + long tabletsNum = Env.getCurrentInvertedIndex().getTabletIdsByBackendId(beIds.get(i)).size(); if (tabletsNum < minBeTabletsNum) { minBeTabletsNum = tabletsNum; minIndex = i; @@ -503,40 +460,12 @@ public int getStartPosOfRoundRobin(Tag tag, TStorageMedium storageMedium) { return minIndex; } - public Map> getBeIdRoundRobinForReplicaCreation( - ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, - Map nextIndexs) throws DdlException { - Map> chosenBackendIds = Maps.newHashMap(); - Map allocMap = replicaAlloc.getAllocMap(); - short totalReplicaNum = 0; - for (Map.Entry entry : allocMap.entrySet()) { - BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() - .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) - .setStorageMedium(storageMedium); - if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { - builder.allowOnSameHost(); - } - - BeSelectionPolicy policy = builder.build(); - int nextIndex = nextIndexs.get(entry.getKey()); - List beIds = selectBackendIdsRoundRobinByPolicy(policy, entry.getValue(), nextIndex); - nextIndexs.put(entry.getKey(), nextIndex + beIds.size()); - - if (beIds.isEmpty()) { - throw new DdlException("Failed to find " + entry.getValue() + " backend(s) for policy: " + policy); - } - chosenBackendIds.put(entry.getKey(), beIds); - totalReplicaNum += beIds.size(); - } - Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum()); - return chosenBackendIds; - } - /** * Select a set of backends for replica creation. * The following parameters need to be considered when selecting backends. * * @param replicaAlloc + * @param nextIndexs create tablet round robin next be index, when enable_round_robin_create_tablet * @param storageMedium * @param isStorageMediumSpecified * @param isOnlyForCheck set true if only used for check available backend @@ -544,7 +473,8 @@ public Map> getBeIdRoundRobinForReplicaCreation( * @throws DdlException */ public Map> selectBackendIdsForReplicaCreation( - ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, boolean isStorageMediumSpecified, + ReplicaAllocation replicaAlloc, Map nextIndexs, + TStorageMedium storageMedium, boolean isStorageMediumSpecified, boolean isOnlyForCheck) throws DdlException { Map copiedBackends = Maps.newHashMap(idToBackendRef); @@ -561,12 +491,17 @@ public Map> selectBackendIdsForReplicaCreation( List failedEntries = Lists.newArrayList(); for (Map.Entry entry : allocMap.entrySet()) { + Tag tag = entry.getKey(); BeSelectionPolicy.Builder builder = new BeSelectionPolicy.Builder() .needScheduleAvailable().needCheckDiskUsage().addTags(Sets.newHashSet(entry.getKey())) .setStorageMedium(storageMedium); if (FeConstants.runningUnitTest || Config.allow_replica_on_same_host) { builder.allowOnSameHost(); } + if (Config.enable_round_robin_create_tablet) { + builder.setEnableRoundRobin(true); + builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1)); + } BeSelectionPolicy policy = builder.build(); List beIds = selectBackendIdsByPolicy(policy, entry.getValue()); @@ -577,6 +512,9 @@ public Map> selectBackendIdsForReplicaCreation( policy = builder.setStorageMedium(storageMedium).build(); beIds = selectBackendIdsByPolicy(policy, entry.getValue()); } + if (Config.enable_round_robin_create_tablet) { + nextIndexs.put(tag, policy.nextRoundRobinIndex + beIds.size()); + } // after retry different storage medium, it's still empty if (beIds.isEmpty()) { LOG.error("failed backend(s) for policy:" + policy); @@ -613,50 +551,72 @@ public Map> selectBackendIdsForReplicaCreation( public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) { Preconditions.checkArgument(number >= -1); List candidates = policy.getCandidateBackends(idToBackendRef.values()); - if ((number != -1 && candidates.size() < number) || candidates.isEmpty()) { + if (candidates.size() < number || candidates.isEmpty()) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } + // If only need one Backend, just return a random one. - if (number == 1) { + if (number == 1 && !policy.enableRoundRobin) { Collections.shuffle(candidates); return Lists.newArrayList(candidates.get(0).getId()); } - if (policy.allowOnSameHost) { - Collections.shuffle(candidates); - if (number == -1) { - return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); - } else { - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + if (!policy.allowOnSameHost) { + // for each host, random select one backend. + Map> backendMaps = Maps.newHashMap(); + for (Backend backend : candidates) { + if (backendMaps.containsKey(backend.getHost())) { + backendMaps.get(backend.getHost()).add(backend); + } else { + List list = Lists.newArrayList(); + list.add(backend); + backendMaps.put(backend.getHost(), list); + } } - } - // for each host, random select one backend. - Map> backendMaps = Maps.newHashMap(); - for (Backend backend : candidates) { - if (backendMaps.containsKey(backend.getHost())) { - backendMaps.get(backend.getHost()).add(backend); - } else { - List list = Lists.newArrayList(); - list.add(backend); - backendMaps.put(backend.getHost(), list); + candidates.clear(); + for (List list : backendMaps.values()) { + Collections.shuffle(list); + candidates.add(list.get(0)); } } - candidates.clear(); - for (List list : backendMaps.values()) { - Collections.shuffle(list); - candidates.add(list.get(0)); - } + if (number != -1 && candidates.size() < number) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } - Collections.shuffle(candidates); - if (number != -1) { - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + + if (policy.enableRoundRobin) { + if (policy.allowOnSameHost) { + Collections.sort(candidates, new BeIdComparator()); + } else { + Collections.sort(candidates, new BeHostComparator()); + } + + if (policy.nextRoundRobinIndex < 0) { + policy.nextRoundRobinIndex = new SecureRandom().nextInt(candidates.size()); + } + + int realIndex = policy.nextRoundRobinIndex % candidates.size(); + List partialOrderList = new ArrayList(); + partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) + .stream().map(b -> b.getId()).collect(Collectors.toList())); + partialOrderList.addAll(candidates.subList(0, realIndex) + .stream().map(b -> b.getId()).collect(Collectors.toList())); + + if (number == -1) { + return partialOrderList; + } else { + return partialOrderList.subList(0, number); + } } else { - return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); + Collections.shuffle(candidates); + if (number != -1) { + return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + } else { + return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); + } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 64e92b35c83079..54341d41d39213 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -40,6 +40,7 @@ import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.persist.EditLog; import org.apache.doris.system.SystemInfoService; +import org.apache.doris.resource.Tag; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Lists; @@ -54,6 +55,7 @@ import org.junit.Test; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.zip.Adler32; @@ -153,12 +155,14 @@ public void setUp() throws Exception { new Expectations() { { - systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any, - false, true); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, + Maps.newHashMap(), (TStorageMedium) any, false, true); minTimes = 0; result = new Delegate() { public synchronized List selectBackendIdsForReplicaCreation( - ReplicaAllocation replicaAlloc, String clusterName, TStorageMedium medium) { + ReplicaAllocation replicaAlloc, Map nextIndexs, + TStorageMedium medium, boolean isStorageMediumSpecified, + boolean isOnlyForCheck) { List beIds = Lists.newArrayList(); beIds.add(CatalogMocker.BACKEND1_ID); beIds.add(CatalogMocker.BACKEND2_ID); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java index 14367ea731ec96..c53715cd817ed2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/ReplicaAllocationTest.java @@ -52,7 +52,8 @@ public class ReplicaAllocationTest { public void setUp() throws DdlException { new Expectations() { { - systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any, false, true); + systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, Maps.newHashMap(), + (TStorageMedium) any, false, true); minTimes = 0; result = new Delegate() { Map> selectBackendIdsForReplicaCreation() { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java index bf57f21f02fe93..61228c821a8e02 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java @@ -150,7 +150,7 @@ public void setUp() throws Exception { result = execPlanFragmentParams; systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, - (TStorageMedium) any, false, true); + Maps.newHashMap(), (TStorageMedium) any, false, true); minTimes = 0; result = backendIds; diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java index 7c5556e8cf2d49..22e12b37da392c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java @@ -404,7 +404,7 @@ public void testSelectBackendIdsForReplicaCreation() throws Exception { Map beCounterMap = Maps.newHashMap(); for (int i = 0; i < 10000; ++i) { Map> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc, - TStorageMedium.HDD, false, false); + Maps.newHashMap(), TStorageMedium.HDD, false, false); Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size()); for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) { beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1); From e1805db2c6e444f2d516b11c8ec5976f2ab00d65 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Sat, 23 Dec 2023 15:58:44 +0800 Subject: [PATCH 3/7] refactor --- .../org/apache/doris/system/SystemInfoService.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index 9fcbe73fbbc5cc..b17ae315ef0043 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -582,7 +582,7 @@ public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) } } - if (number != -1 && candidates.size() < number) { + if (candidates.size() < number) { LOG.debug("Not match policy: {}. candidates num: {}, expected: {}", policy, candidates.size(), number); return Lists.newArrayList(); } @@ -601,9 +601,9 @@ public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) int realIndex = policy.nextRoundRobinIndex % candidates.size(); List partialOrderList = new ArrayList(); partialOrderList.addAll(candidates.subList(realIndex, candidates.size()) - .stream().map(b -> b.getId()).collect(Collectors.toList())); + .stream().map(Backend::getId).collect(Collectors.toList())); partialOrderList.addAll(candidates.subList(0, realIndex) - .stream().map(b -> b.getId()).collect(Collectors.toList())); + .stream().map(Backend::getId).collect(Collectors.toList())); if (number == -1) { return partialOrderList; @@ -613,9 +613,9 @@ public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) } else { Collections.shuffle(candidates); if (number != -1) { - return candidates.subList(0, number).stream().map(b -> b.getId()).collect(Collectors.toList()); + return candidates.subList(0, number).stream().map(Backend::getId).collect(Collectors.toList()); } else { - return candidates.stream().map(b -> b.getId()).collect(Collectors.toList()); + return candidates.stream().map(Backend::getId).collect(Collectors.toList()); } } } From 8719a14529dff4ce5d3e27e197da3634e1917613 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Sat, 23 Dec 2023 16:16:58 +0800 Subject: [PATCH 4/7] update test --- .../src/test/java/org/apache/doris/backup/RestoreJobTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 54341d41d39213..d361777fdd56c3 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -39,8 +39,8 @@ import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.persist.EditLog; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.resource.Tag; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; import com.google.common.collect.Lists; From e531f11f36be4fc70cacbca4f957597ffa7ecdcc Mon Sep 17 00:00:00 2001 From: yujun777 Date: Sat, 23 Dec 2023 17:06:05 +0800 Subject: [PATCH 5/7] update test --- .../src/main/java/org/apache/doris/common/Config.java | 6 ++++++ .../org/apache/doris/datasource/InternalCatalog.java | 9 +++++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d75095204bc054..2d860f21c87183 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2065,6 +2065,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_round_robin_create_tablet = true; + @ConfField(mutable = true, masterOnly = true, description = { + "创建分区时,总是从第一个 BE 开始创建。注意:这种方式可能造成BE不均衡", + "When creating tablet of a partition, always start from the first BE. " + + "Note: This method may cause BE imbalance"}) + public static boolean create_tablet_round_robin_from_start = false; + /** * To prevent different types (V1, V2, V3) of behavioral inconsistencies, * we may delete the DecimalV2 and DateV1 types in the future. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index c3e83a7d4ded7c..40bd7ff30d1b67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2762,8 +2762,13 @@ public void createTablets(MaterializedIndex index, ReplicaState replicaState, Map nextIndexs = new HashMap<>(); if (Config.enable_round_robin_create_tablet) { for (Tag tag : replicaAlloc.getAllocMap().keySet()) { - int startPos = systemInfoService.getStartPosOfRoundRobin(tag, storageMedium, - isStorageMediumSpecified); + int startPos = -1; + if (Config.create_tablet_round_robin_from_start) { + startPos = 0; + } else { + startPos = systemInfoService.getStartPosOfRoundRobin(tag, storageMedium, + isStorageMediumSpecified); + } nextIndexs.put(tag, startPos); } } From 1ce3f19d07b1251c8362004f7d717b0c5e117ffd Mon Sep 17 00:00:00 2001 From: yujun777 Date: Sun, 24 Dec 2023 00:33:36 +0800 Subject: [PATCH 6/7] update policy.nextRoundRobinIndex --- .../doris/system/SystemInfoService.java | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java index b17ae315ef0043..9c1e196923f40d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java @@ -509,11 +509,15 @@ public Map> selectBackendIdsForReplicaCreation( // if only for check, no need to retry different storage medium to get backend if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified && !isOnlyForCheck) { storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD; - policy = builder.setStorageMedium(storageMedium).build(); + builder.setStorageMedium(storageMedium); + if (Config.enable_round_robin_create_tablet) { + builder.setNextRoundRobinIndex(nextIndexs.getOrDefault(tag, -1)); + } + policy = builder.build(); beIds = selectBackendIdsByPolicy(policy, entry.getValue()); } if (Config.enable_round_robin_create_tablet) { - nextIndexs.put(tag, policy.nextRoundRobinIndex + beIds.size()); + nextIndexs.put(tag, policy.nextRoundRobinIndex); } // after retry different storage medium, it's still empty if (beIds.isEmpty()) { @@ -543,7 +547,7 @@ public Map> selectBackendIdsForReplicaCreation( /** * Select a set of backends by the given policy. * - * @param policy + * @param policy if policy is enableRoundRobin, will update its nextRoundRobinIndex * @param number number of backends which need to be selected. -1 means return as many as possible. * @return return #number of backend ids, * or empty set if no backends match the policy, or the number of matched backends is less than "number"; @@ -562,6 +566,7 @@ public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) return Lists.newArrayList(candidates.get(0).getId()); } + boolean hasSameHost = false; if (!policy.allowOnSameHost) { // for each host, random select one backend. Map> backendMaps = Maps.newHashMap(); @@ -577,7 +582,10 @@ public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) candidates.clear(); for (List list : backendMaps.values()) { - Collections.shuffle(list); + if (list.size() > 1) { + Collections.shuffle(list); + hasSameHost = true; + } candidates.add(list.get(0)); } } @@ -588,10 +596,12 @@ public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) } if (policy.enableRoundRobin) { - if (policy.allowOnSameHost) { - Collections.sort(candidates, new BeIdComparator()); - } else { + if (!policy.allowOnSameHost && hasSameHost) { + // not allow same host and has same host, + // then we compare them with their host Collections.sort(candidates, new BeHostComparator()); + } else { + Collections.sort(candidates, new BeIdComparator()); } if (policy.nextRoundRobinIndex < 0) { @@ -605,11 +615,10 @@ public List selectBackendIdsByPolicy(BeSelectionPolicy policy, int number) partialOrderList.addAll(candidates.subList(0, realIndex) .stream().map(Backend::getId).collect(Collectors.toList())); - if (number == -1) { - return partialOrderList; - } else { - return partialOrderList.subList(0, number); - } + List result = number == -1 ? partialOrderList : partialOrderList.subList(0, number); + policy.nextRoundRobinIndex = realIndex + result.size(); + + return result; } else { Collections.shuffle(candidates); if (number != -1) { From 25aeaa4f241c42c7d279afccb498a1a777bf8054 Mon Sep 17 00:00:00 2001 From: yujun777 Date: Sun, 24 Dec 2023 00:36:48 +0800 Subject: [PATCH 7/7] update policy.nextRoundRobinIndex --- .../src/main/java/org/apache/doris/system/BeSelectionPolicy.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java index 573a1853b17621..ace2ab3e1e4b73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/system/BeSelectionPolicy.java @@ -53,6 +53,7 @@ public class BeSelectionPolicy { public boolean enableRoundRobin = false; // if enable round robin, choose next be from nextRoundRobinIndex + // call SystemInfoService::selectBackendIdsByPolicy will update nextRoundRobinIndex public int nextRoundRobinIndex = -1; public List preferredLocations = new ArrayList<>();