Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](create tablet) default create tablet round robin #28911

Merged
merged 7 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2063,7 +2063,13 @@ public class Config extends ConfigBase {
public static boolean skip_localhost_auth_check = true;

@ConfField(mutable = true)
public static boolean enable_round_robin_create_tablet = false;
public static boolean enable_round_robin_create_tablet = true;

@ConfField(mutable = true, masterOnly = true, description = {
"创建分区时,总是从第一个 BE 开始创建。注意:这种方式可能造成BE不均衡",
"When creating tablet of a partition, always start from the first BE. "
+ "Note: This method may cause BE imbalance"})
public static boolean create_tablet_round_robin_from_start = false;

/**
* To prevent different types (V1, V2, V3) of behavioral inconsistencies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT
long visibleVersion = remotePart.getVisibleVersion();

// tablets
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
for (MaterializedIndex remoteIdx : remotePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
int remotetabletSize = remoteIdx.getTablets().size();
Expand All @@ -1127,7 +1128,7 @@ private Partition resetPartitionForRestore(OlapTable localTbl, OlapTable remoteT
// replicas
try {
Map<Tag, List<Long>> beIds = Env.getCurrentSystemInfo()
.selectBackendIdsForReplicaCreation(replicaAlloc, null, false, false);
.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, false, false);
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
for (Long beId : entry.getValue()) {
long newReplicaId = env.getNextId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore
}

// for each partition, reset rollup index map
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
for (Map.Entry<Long, Partition> entry : idToPartition.entrySet()) {
Partition partition = entry.getValue();
// entry.getKey() is the new partition id, use it to get the restore specified replica allocation
Expand Down Expand Up @@ -616,7 +617,7 @@ public Status resetIdsForRestore(Env env, Database db, ReplicaAllocation restore
try {
Map<Tag, List<Long>> tag2beIds =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, null, false, false);
replicaAlloc, nextIndexs, null, false, false);
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
for (Long beId : entry3.getValue()) {
long newReplicaId = env.getNextId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -228,7 +230,8 @@ private static void checkReplicationNum(String val, Database db) throws DdlExcep
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT, val);
}
ReplicaAllocation replicaAlloc = new ReplicaAllocation(Short.valueOf(val));
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true);
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, Maps.newHashMap(),
null, false, true);
}

private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int hotPartitionNum,
Expand All @@ -237,14 +240,16 @@ private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int h
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO);
}

Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true);
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null,
false, true);
if (hotPartitionNum <= 0) {
return;
}

try {
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, TStorageMedium.SSD, false,
true);
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs,
TStorageMedium.SSD, false, true);
} catch (DdlException e) {
throw new DdlException("Failed to find enough backend for ssd storage medium. When setting "
+ DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the hot partitions will store "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ private static ReplicaAllocation analyzeReplicaAllocationImpl(Map<String, String
allocationVal = allocationVal.replaceAll(" ", "");
String[] locations = allocationVal.split(",");
int totalReplicaNum = 0;
Map<Tag, Integer> nextIndexs = Maps.newHashMap();
for (String location : locations) {
String[] parts = location.split(":");
if (parts.length != 2) {
Expand All @@ -1092,7 +1093,7 @@ private static ReplicaAllocation analyzeReplicaAllocationImpl(Map<String, String
try {
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
systemInfoService.selectBackendIdsForReplicaCreation(
replicaAlloc, null, false, true);
replicaAlloc, nextIndexs, null, false, true);
} catch (DdlException ddlException) {
throw new AnalysisException(ddlException.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2738,6 +2738,7 @@ public void createTablets(MaterializedIndex index, ReplicaState replicaState,
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified)
throws DdlException {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
GroupId groupId = null;
if (colocateIndex.isColocateTable(tabletMeta.getTableId())) {
Expand All @@ -2757,16 +2758,18 @@ public void createTablets(MaterializedIndex index, ReplicaState replicaState,
backendsPerBucketSeq = Maps.newHashMap();
}

TStorageMedium storageMedium = Config.disable_storage_medium_check ? null : tabletMeta.getStorageMedium();
Map<Tag, Integer> nextIndexs = new HashMap<>();

if (Config.enable_round_robin_create_tablet) {
for (Map.Entry<Tag, Short> entry : replicaAlloc.getAllocMap().entrySet()) {
int startPos = Env.getCurrentSystemInfo().getStartPosOfRoundRobin(entry.getKey(),
tabletMeta.getStorageMedium());
if (startPos == -1) {
throw new DdlException("The number of BEs that match the policy is insufficient");
for (Tag tag : replicaAlloc.getAllocMap().keySet()) {
int startPos = -1;
if (Config.create_tablet_round_robin_from_start) {
startPos = 0;
} else {
startPos = systemInfoService.getStartPosOfRoundRobin(tag, storageMedium,
isStorageMediumSpecified);
}
nextIndexs.put(entry.getKey(), startPos);
nextIndexs.put(tag, startPos);
}
}

Expand All @@ -2783,27 +2786,8 @@ public void createTablets(MaterializedIndex index, ReplicaState replicaState,
if (chooseBackendsArbitrary) {
// This is the first colocate table in the group, or just a normal table,
// choose backends
if (Config.enable_round_robin_create_tablet) {
if (!Config.disable_storage_medium_check) {
chosenBackendIds = Env.getCurrentSystemInfo()
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(),
nextIndexs);
} else {
chosenBackendIds = Env.getCurrentSystemInfo()
.getBeIdRoundRobinForReplicaCreation(replicaAlloc, null,
nextIndexs);
}
} else {
if (!Config.disable_storage_medium_check) {
chosenBackendIds = Env.getCurrentSystemInfo()
.selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(),
isStorageMediumSpecified, false);
} else {
chosenBackendIds = Env.getCurrentSystemInfo()
.selectBackendIdsForReplicaCreation(replicaAlloc, null,
isStorageMediumSpecified, false);
}
}
chosenBackendIds = systemInfoService.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs,
storageMedium, isStorageMediumSpecified, false);

for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class BeSelectionPolicy {
public boolean preferComputeNode = false;
public int expectBeNum = 0;

public boolean enableRoundRobin = false;
// if enable round robin, choose next be from nextRoundRobinIndex
// call SystemInfoService::selectBackendIdsByPolicy will update nextRoundRobinIndex
public int nextRoundRobinIndex = -1;

public List<String> preferredLocations = new ArrayList<>();

private BeSelectionPolicy() {
Expand Down Expand Up @@ -114,6 +119,16 @@ public Builder addPreLocations(List<String> preferredLocations) {
return this;
}

public Builder setEnableRoundRobin(boolean enableRoundRobin) {
policy.enableRoundRobin = enableRoundRobin;
return this;
}

public Builder setNextRoundRobinIndex(int nextRoundRobinIndex) {
policy.nextRoundRobinIndex = nextRoundRobinIndex;
return this;
}

public BeSelectionPolicy build() {
return policy;
}
Expand Down
Loading
Loading