From f9604266442c7810cabef5a3d30633014f94ab5e Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Wed, 3 Apr 2024 19:32:26 +0800 Subject: [PATCH] fix(auto_balancer): avoid broker load affected by excluded topics (#1086) Signed-off-by: Shichao Nie --- .../autobalancer/model/ClusterModel.java | 20 ++++--- .../model/ClusterModelSnapshot.java | 12 ---- .../autobalancer/model/ClusterModelTest.java | 59 ++++++++++++++++++- 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java index eb530e3a5a..2cf628c3e6 100644 --- a/core/src/main/java/kafka/autobalancer/model/ClusterModel.java +++ b/core/src/main/java/kafka/autobalancer/model/ClusterModel.java @@ -13,6 +13,7 @@ import com.automq.stream.utils.LogContext; import kafka.autobalancer.common.AutoBalancerConstants; +import kafka.autobalancer.common.Resource; import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -112,9 +113,11 @@ public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set } for (Map.Entry> entry : brokerReplicaMap.entrySet()) { int brokerId = entry.getKey(); - if (snapshot.broker(brokerId) == null) { + BrokerUpdater.Broker broker = snapshot.broker(brokerId); + if (broker == null) { continue; } + Map totalLoads = new HashMap<>(); for (Map.Entry tpEntry : entry.getValue().entrySet()) { TopicPartition tp = tpEntry.getKey(); TopicPartitionReplicaUpdater.TopicPartitionReplica replica = @@ -124,19 +127,24 @@ public ClusterModelSnapshot snapshot(Set excludedBrokerIds, Set snapshot.removeBroker(brokerIdToRackMap.get(brokerId), brokerId); break; } + replica.processMetrics(); + for (Resource resource : Resource.cachedValues()) { + double load = replica.load(resource); + totalLoads.put(resource, totalLoads.getOrDefault(resource, 0.0) + load); + } if (excludedTopics.contains(tp.topic())) { continue; } - replica.processMetrics(); snapshot.addTopicPartition(brokerId, tp, replica); } + for (Map.Entry loadEntry : totalLoads.entrySet()) { + broker.setLoad(loadEntry.getKey(), loadEntry.getValue()); + } } } finally { clusterLock.unlock(); } - postProcess(snapshot); - return snapshot; } @@ -144,10 +152,6 @@ protected ClusterModelSnapshot createSnapshot() { return new ClusterModelSnapshot(); } - public void postProcess(ClusterModelSnapshot snapshot) { - snapshot.aggregate(); - } - public boolean updateBrokerMetrics(int brokerId, Map metricsMap, long time) { BrokerUpdater brokerUpdater = null; clusterLock.lock(); diff --git a/core/src/main/java/kafka/autobalancer/model/ClusterModelSnapshot.java b/core/src/main/java/kafka/autobalancer/model/ClusterModelSnapshot.java index e17035d4d3..20654eb13c 100644 --- a/core/src/main/java/kafka/autobalancer/model/ClusterModelSnapshot.java +++ b/core/src/main/java/kafka/autobalancer/model/ClusterModelSnapshot.java @@ -13,7 +13,6 @@ import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; -import kafka.autobalancer.common.Resource; import org.apache.kafka.common.TopicPartition; import java.util.Collection; @@ -32,17 +31,6 @@ public ClusterModelSnapshot() { brokerToReplicaMap = new HashMap<>(); } - public void aggregate() { - // Override broker load with sum of replicas - for (Map.Entry> entry : brokerToReplicaMap.entrySet()) { - int brokerId = entry.getKey(); - for (Resource resource : Resource.cachedValues()) { - double sum = entry.getValue().values().stream().mapToDouble(e -> e.load(resource)).sum(); - brokerMap.get(brokerId).setLoad(resource, sum); - } - } - } - public void addBroker(int brokerId, String rack, BrokerUpdater.Broker broker) { rackToBrokerMap.putIfAbsent(rack, brokerId); brokerMap.putIfAbsent(brokerId, broker); diff --git a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java index f1c1e299d9..d14946df27 100644 --- a/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java +++ b/core/src/test/java/kafka/autobalancer/model/ClusterModelTest.java @@ -17,6 +17,7 @@ package kafka.autobalancer.model; +import kafka.autobalancer.common.Resource; import kafka.autobalancer.common.types.RawMetricTypes; import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics; import org.apache.kafka.common.TopicPartition; @@ -31,8 +32,11 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; @Tag("S3Unit") public class ClusterModelTest { @@ -226,8 +230,61 @@ public void testUpdatePartition() { } @Test - public void testSnapshot() { + public void testExcludeTopics() { + RecordClusterModel clusterModel = new RecordClusterModel(); + String topicName = "testTopic"; + Uuid topicId = Uuid.randomUuid(); + String topicName1 = "testTopic-1"; + Uuid topicId1 = Uuid.randomUuid(); + int partition = 0; + int brokerId = 1; + + RegisterBrokerRecord registerBrokerRecord = new RegisterBrokerRecord() + .setBrokerId(brokerId); + clusterModel.onBrokerRegister(registerBrokerRecord); + TopicRecord topicRecord = new TopicRecord() + .setName(topicName) + .setTopicId(topicId); + clusterModel.onTopicCreate(topicRecord); + PartitionRecord partitionRecord = new PartitionRecord() + .setLeader(brokerId) + .setTopicId(topicId) + .setPartitionId(partition); + clusterModel.onPartitionCreate(partitionRecord); + + TopicRecord topicRecord1 = new TopicRecord() + .setName(topicName1) + .setTopicId(topicId1); + clusterModel.onTopicCreate(topicRecord1); + PartitionRecord partitionRecord1 = new PartitionRecord() + .setLeader(brokerId) + .setTopicId(topicId1) + .setPartitionId(partition); + clusterModel.onPartitionCreate(partitionRecord1); + long now = System.currentTimeMillis(); + clusterModel.updateBrokerMetrics(brokerId, new HashMap<>(), now); + TopicPartitionMetrics topicPartitionMetrics = new TopicPartitionMetrics(now, brokerId, "", topicName, partition); + topicPartitionMetrics.put(RawMetricTypes.TOPIC_PARTITION_BYTES_IN, 10); + topicPartitionMetrics.put(RawMetricTypes.TOPIC_PARTITION_BYTES_OUT, 10); + topicPartitionMetrics.put(RawMetricTypes.PARTITION_SIZE, 10); + clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics.brokerId(), + new TopicPartition(topicName, partition), topicPartitionMetrics.getMetricValueMap(), topicPartitionMetrics.time()); + + TopicPartitionMetrics topicPartitionMetrics1 = new TopicPartitionMetrics(now, brokerId, "", topicName1, partition); + topicPartitionMetrics1.put(RawMetricTypes.TOPIC_PARTITION_BYTES_IN, 60); + topicPartitionMetrics1.put(RawMetricTypes.TOPIC_PARTITION_BYTES_OUT, 50); + topicPartitionMetrics1.put(RawMetricTypes.PARTITION_SIZE, 10); + clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics1.brokerId(), + new TopicPartition(topicName1, partition), topicPartitionMetrics1.getMetricValueMap(), topicPartitionMetrics1.time()); + + ClusterModelSnapshot snapshot = clusterModel.snapshot(Collections.emptySet(), Set.of(topicName1), 10000); + Collection replicas = snapshot.replicasFor(brokerId); + Assertions.assertEquals(1, replicas.size()); + Assertions.assertEquals(topicName, replicas.iterator().next().getTopicPartition().topic()); + Assertions.assertEquals(partition, replicas.iterator().next().getTopicPartition().partition()); + Assertions.assertEquals(70, snapshot.broker(brokerId).load(Resource.NW_IN)); + Assertions.assertEquals(60, snapshot.broker(brokerId).load(Resource.NW_OUT)); } @Test