Skip to content

Commit

Permalink
[fix](auto bucket) fix auto buckets calc using the first k partition (#…
Browse files Browse the repository at this point in the history
…41675)

If the first k (at most 7) partition data size is ascending, the result
will be partion_size[k-1] + ema(first k partitons delta).

This is a bug, should use the last k partitions, but not the first k
partitions to calculate.
  • Loading branch information
yujun777 committed Oct 14, 2024
1 parent 033220e commit e657fd0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
numOfLowPaths += pathSlot.getTotalAvailBalanceSlotNum();
}
}
LOG.info("get number of low load paths: {}, with medium: {}", numOfLowPaths, medium);
LOG.info("get number of low load paths: {}, with medium: {}, tag: {}, isUrgent {}",
numOfLowPaths, medium, clusterStat.getTag(), isUrgent);

List<String> alternativeTabletInfos = Lists.newArrayList();

Expand All @@ -131,6 +132,8 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
.map(beStat -> Sets.newHashSet(invertedIndex.getTabletIdsByBackendId(beStat.getBeId())))
.collect(Collectors.toList());

boolean hasCandidateTablet = false;

// choose tablets from high load backends.
// BackendLoadStatistic is sorted by load score in ascend order,
// so we need to traverse it from last to first
Expand Down Expand Up @@ -234,6 +237,8 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
continue;
}

hasCandidateTablet = true;

// for urgent disk, pick tablets order by size,
// then it may always pick tablets that was on the low backends.
if (!lowBETablets.isEmpty()
Expand Down Expand Up @@ -287,6 +292,9 @@ protected List<TabletSchedCtx> selectAlternativeTabletsForCluster(
if (!alternativeTablets.isEmpty()) {
LOG.info("select alternative tablets, medium: {}, is urgent: {}, num: {}, detail: {}",
medium, isUrgent, alternativeTablets.size(), alternativeTabletInfos);
} else if (isUrgent && !hasCandidateTablet) {
LOG.info("urgent balance cann't found candidate tablets. medium: {}, tag: {}",
medium, clusterStat.getTag());
}
return alternativeTablets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,22 +164,19 @@ private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize)
return historyPartitionsSize.get(0);
}

int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();

boolean isAscending = true;
for (int i = 1; i < size; i++) {
if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
ArrayList<Long> ascendingDeltaSize = new ArrayList<Long>();
for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) {
long delta = historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1);
if (delta < 0) {
isAscending = false;
break;
}
ascendingDeltaSize.add(delta);
}

if (isAscending) {
ArrayList<Long> historyDeltaSize = Lists.newArrayList();
for (int i = 1; i < size; i++) {
historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
}
return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
return historyPartitionsSize.get(historyPartitionsSize.size() - 1) + ema(ascendingDeltaSize, 7);
} else {
return ema(historyPartitionsSize, 7);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,8 @@ public void testAutoBuckets() throws Exception {
+ " PROPERTIES (\n"
+ " \"dynamic_partition.enable\" = \"true\",\n"
+ " \"dynamic_partition.time_unit\" = \"YEAR\",\n"
+ " \"dynamic_partition.start\" = \"-50\",\n"
+ " \"dynamic_partition.create_history_partition\" = \"true\",\n"
+ " \"dynamic_partition.end\" = \"1\",\n"
+ " \"dynamic_partition.prefix\" = \"p\",\n"
+ " \"replication_allocation\" = \"tag.location.default: 1\"\n"
Expand All @@ -1744,22 +1746,59 @@ public void testAutoBuckets() throws Exception {
Env.getCurrentInternalCatalog().getDbOrAnalysisException("default_cluster:test");
OlapTable table = (OlapTable) db.getTableOrAnalysisException("test_autobucket_dynamic_partition");
List<Partition> partitions = Lists.newArrayList(table.getAllPartitions());
Assert.assertEquals(2, partitions.size());
Assert.assertEquals(52, partitions.size());
for (Partition partition : partitions) {
Assert.assertEquals(FeConstants.default_bucket_num, partition.getDistributionInfo().getBucketNum());
partition.setVisibleVersionAndTime(2L, System.currentTimeMillis());
}
RebalancerTestUtil.updateReplicaDataSize(1, 1, 1);

String alterStmt =
String alterStmt1 =
"alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '2')";
ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt));
ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt1));
List<Pair<Long, Long>> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(db.getId(), table.getId()));
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false);

partitions = Lists.newArrayList(table.getAllPartitions());
partitions.sort(Comparator.comparing(Partition::getId));
Assert.assertEquals(3, partitions.size());
Assert.assertEquals(1, partitions.get(2).getDistributionInfo().getBucketNum());
Assert.assertEquals(53, partitions.size());
Assert.assertEquals(1, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum());

table.readLock();
try {
// first 40 partitions with size 0, then 13 partitions with size 100GB(10GB * 10 buckets)
for (int i = 0; i < 52; i++) {
Partition partition = partitions.get(i);
partition.updateVisibleVersion(2L);
for (MaterializedIndex idx : partition.getMaterializedIndices(
MaterializedIndex.IndexExtState.VISIBLE)) {
Assert.assertEquals(10, idx.getTablets().size());
for (Tablet tablet : idx.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
replica.updateVersion(2L);
replica.setDataSize(i < 40 ? 0L : 10L << 30);
replica.setRowCount(1000L);
}
}
}
if (i >= 40) {
// first 52 partitions are 10 buckets(FeConstants.default_bucket_num)
Assert.assertEquals(10 * (10L << 30), partition.getAllDataSize(true));
}
}
} finally {
table.readUnlock();
}

String alterStmt2 =
"alter table test.test_autobucket_dynamic_partition set ('dynamic_partition.end' = '3')";
ExceptionChecker.expectThrowsNoException(() -> alterTable(alterStmt2));
Env.getCurrentEnv().getDynamicPartitionScheduler().executeDynamicPartition(tempDynamicPartitionTableInfo, false);

partitions = Lists.newArrayList(table.getAllPartitions());
partitions.sort(Comparator.comparing(Partition::getId));
Assert.assertEquals(54, partitions.size());
// 100GB total, 1GB per bucket, should 100 buckets.
Assert.assertEquals(100, partitions.get(partitions.size() - 1).getDistributionInfo().getBucketNum());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setPathHash(be.getId());
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setTotalCapacityB(10L << 40);
diskInfo1.setAvailableCapacityB(5L << 40);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
Map<String, DiskInfo> disks = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort
Backend be = new Backend(Env.getCurrentEnv().getNextId(), backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path" + be.getId());
diskInfo1.setTotalCapacityB(10L << 30);
diskInfo1.setAvailableCapacityB(5L << 30);
diskInfo1.setTotalCapacityB(10L << 40);
diskInfo1.setAvailableCapacityB(5L << 40);
diskInfo1.setDataUsedCapacityB(480000);
diskInfo1.setPathHash(be.getId());
disks.put(diskInfo1.getRootPath(), diskInfo1);
Expand Down

0 comments on commit e657fd0

Please sign in to comment.