Skip to content

Commit

Permalink
fix(auto_balancer): avoid broker load affected by excluded topics (#1086
Browse files Browse the repository at this point in the history
)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Apr 3, 2024
1 parent c07ef17 commit f960426
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 21 deletions.
20 changes: 12 additions & 8 deletions core/src/main/java/kafka/autobalancer/model/ClusterModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.automq.stream.utils.LogContext;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.common.Resource;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -112,9 +113,11 @@ public ClusterModelSnapshot snapshot(Set<Integer> excludedBrokerIds, Set<String>
}
for (Map.Entry<Integer, Map<TopicPartition, TopicPartitionReplicaUpdater>> entry : brokerReplicaMap.entrySet()) {
int brokerId = entry.getKey();
if (snapshot.broker(brokerId) == null) {
BrokerUpdater.Broker broker = snapshot.broker(brokerId);
if (broker == null) {
continue;
}
Map<Resource, Double> totalLoads = new HashMap<>();
for (Map.Entry<TopicPartition, TopicPartitionReplicaUpdater> tpEntry : entry.getValue().entrySet()) {
TopicPartition tp = tpEntry.getKey();
TopicPartitionReplicaUpdater.TopicPartitionReplica replica =
Expand All @@ -124,30 +127,31 @@ public ClusterModelSnapshot snapshot(Set<Integer> excludedBrokerIds, Set<String>
snapshot.removeBroker(brokerIdToRackMap.get(brokerId), brokerId);
break;
}
replica.processMetrics();
for (Resource resource : Resource.cachedValues()) {
double load = replica.load(resource);
totalLoads.put(resource, totalLoads.getOrDefault(resource, 0.0) + load);
}
if (excludedTopics.contains(tp.topic())) {
continue;
}
replica.processMetrics();
snapshot.addTopicPartition(brokerId, tp, replica);
}
for (Map.Entry<Resource, Double> loadEntry : totalLoads.entrySet()) {
broker.setLoad(loadEntry.getKey(), loadEntry.getValue());
}
}
} finally {
clusterLock.unlock();
}

postProcess(snapshot);

return snapshot;
}

protected ClusterModelSnapshot createSnapshot() {
return new ClusterModelSnapshot();
}

public void postProcess(ClusterModelSnapshot snapshot) {
snapshot.aggregate();
}

public boolean updateBrokerMetrics(int brokerId, Map<Byte, Double> metricsMap, long time) {
BrokerUpdater brokerUpdater = null;
clusterLock.lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import kafka.autobalancer.common.Action;
import kafka.autobalancer.common.ActionType;
import kafka.autobalancer.common.Resource;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
Expand All @@ -32,17 +31,6 @@ public ClusterModelSnapshot() {
brokerToReplicaMap = new HashMap<>();
}

public void aggregate() {
// Override broker load with sum of replicas
for (Map.Entry<Integer, Map<TopicPartition, TopicPartitionReplicaUpdater.TopicPartitionReplica>> entry : brokerToReplicaMap.entrySet()) {
int brokerId = entry.getKey();
for (Resource resource : Resource.cachedValues()) {
double sum = entry.getValue().values().stream().mapToDouble(e -> e.load(resource)).sum();
brokerMap.get(brokerId).setLoad(resource, sum);
}
}
}

public void addBroker(int brokerId, String rack, BrokerUpdater.Broker broker) {
rackToBrokerMap.putIfAbsent(rack, brokerId);
brokerMap.putIfAbsent(brokerId, broker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.autobalancer.model;

import kafka.autobalancer.common.Resource;
import kafka.autobalancer.common.types.RawMetricTypes;
import kafka.autobalancer.metricsreporter.metric.TopicPartitionMetrics;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -31,8 +32,11 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

@Tag("S3Unit")
public class ClusterModelTest {
Expand Down Expand Up @@ -226,8 +230,61 @@ public void testUpdatePartition() {
}

@Test
public void testSnapshot() {
public void testExcludeTopics() {
RecordClusterModel clusterModel = new RecordClusterModel();
String topicName = "testTopic";
Uuid topicId = Uuid.randomUuid();
String topicName1 = "testTopic-1";
Uuid topicId1 = Uuid.randomUuid();
int partition = 0;
int brokerId = 1;

RegisterBrokerRecord registerBrokerRecord = new RegisterBrokerRecord()
.setBrokerId(brokerId);
clusterModel.onBrokerRegister(registerBrokerRecord);
TopicRecord topicRecord = new TopicRecord()
.setName(topicName)
.setTopicId(topicId);
clusterModel.onTopicCreate(topicRecord);
PartitionRecord partitionRecord = new PartitionRecord()
.setLeader(brokerId)
.setTopicId(topicId)
.setPartitionId(partition);
clusterModel.onPartitionCreate(partitionRecord);

TopicRecord topicRecord1 = new TopicRecord()
.setName(topicName1)
.setTopicId(topicId1);
clusterModel.onTopicCreate(topicRecord1);
PartitionRecord partitionRecord1 = new PartitionRecord()
.setLeader(brokerId)
.setTopicId(topicId1)
.setPartitionId(partition);
clusterModel.onPartitionCreate(partitionRecord1);
long now = System.currentTimeMillis();
clusterModel.updateBrokerMetrics(brokerId, new HashMap<>(), now);

TopicPartitionMetrics topicPartitionMetrics = new TopicPartitionMetrics(now, brokerId, "", topicName, partition);
topicPartitionMetrics.put(RawMetricTypes.TOPIC_PARTITION_BYTES_IN, 10);
topicPartitionMetrics.put(RawMetricTypes.TOPIC_PARTITION_BYTES_OUT, 10);
topicPartitionMetrics.put(RawMetricTypes.PARTITION_SIZE, 10);
clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics.brokerId(),
new TopicPartition(topicName, partition), topicPartitionMetrics.getMetricValueMap(), topicPartitionMetrics.time());

TopicPartitionMetrics topicPartitionMetrics1 = new TopicPartitionMetrics(now, brokerId, "", topicName1, partition);
topicPartitionMetrics1.put(RawMetricTypes.TOPIC_PARTITION_BYTES_IN, 60);
topicPartitionMetrics1.put(RawMetricTypes.TOPIC_PARTITION_BYTES_OUT, 50);
topicPartitionMetrics1.put(RawMetricTypes.PARTITION_SIZE, 10);
clusterModel.updateTopicPartitionMetrics(topicPartitionMetrics1.brokerId(),
new TopicPartition(topicName1, partition), topicPartitionMetrics1.getMetricValueMap(), topicPartitionMetrics1.time());

ClusterModelSnapshot snapshot = clusterModel.snapshot(Collections.emptySet(), Set.of(topicName1), 10000);
Collection<TopicPartitionReplicaUpdater.TopicPartitionReplica> replicas = snapshot.replicasFor(brokerId);
Assertions.assertEquals(1, replicas.size());
Assertions.assertEquals(topicName, replicas.iterator().next().getTopicPartition().topic());
Assertions.assertEquals(partition, replicas.iterator().next().getTopicPartition().partition());
Assertions.assertEquals(70, snapshot.broker(brokerId).load(Resource.NW_IN));
Assertions.assertEquals(60, snapshot.broker(brokerId).load(Resource.NW_OUT));
}

@Test
Expand Down

0 comments on commit f960426

Please sign in to comment.