Skip to content

Commit

Permalink
feat(auto_balancer): assign partitions within rack on broker fence (#…
Browse files Browse the repository at this point in the history
…1778)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Aug 13, 2024
1 parent 120c9cd commit 1d7c829
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -826,10 +826,9 @@ public void replay(UpdateNextNodeIdRecord record) {
nextNodeId.set(record.nodeId());
}

public List<Integer> getActiveBrokers() {
public List<BrokerRegistration> getActiveBrokers() {
return brokerRegistrations.values().stream()
.map(BrokerRegistration::id)
.filter(this::isActive)
.filter(b -> isActive(b.id()))
.collect(Collectors.toList());
}
// AutoMQ inject end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,7 @@ void generateLeaderAndIsrUpdates(String context,
IntPredicate isAcceptableLeader = fencing ? r -> false :
r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r));

BrokerRegistration brokerRegistrationToRemove = clusterControl.brokerRegistrations().get(brokerToRemove);
PartitionLeaderSelector partitionLeaderSelector = null;
// AutoMQ for Kafka inject end

Expand Down Expand Up @@ -2047,8 +2048,7 @@ void generateLeaderAndIsrUpdates(String context,
builder.setTargetNode(brokerToAdd);
} else {
if (partitionLeaderSelector == null) {
partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(),
brokerId -> brokerId != brokerToRemove);
partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(), brokerRegistrationToRemove);
}
partitionLeaderSelector
.select(new TopicPartition(topic.name(), topicIdPart.partitionId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@

package org.apache.kafka.controller.es;

import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.metadata.BrokerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,22 +24,21 @@
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Predicate;

public class LoadAwarePartitionLeaderSelector implements PartitionLeaderSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadAwarePartitionLeaderSelector.class);
private final PriorityQueue<BrokerLoad> brokerLoads;
private final RandomPartitionLeaderSelector randomSelector;
private final Map<Integer, Double> brokerLoadMap;

public LoadAwarePartitionLeaderSelector(List<Integer> aliveBrokers, Predicate<Integer> brokerPredicate) {
public LoadAwarePartitionLeaderSelector(List<BrokerRegistration> aliveBrokers, BrokerRegistration brokerToRemove) {
Set<Integer> excludedBrokers = ClusterLoads.getInstance().excludedBrokers();
if (excludedBrokers == null) {
excludedBrokers = new HashSet<>();
}
List<Integer> availableBrokers = new ArrayList<>();
for (int broker : aliveBrokers) {
if (!excludedBrokers.contains(broker)) {
List<BrokerRegistration> availableBrokers = new ArrayList<>();
for (BrokerRegistration broker : aliveBrokers) {
if (!excludedBrokers.contains(broker.id()) && broker.id() != brokerToRemove.id()) {
availableBrokers.add(broker);
}
}
Expand All @@ -47,14 +48,14 @@ public LoadAwarePartitionLeaderSelector(List<Integer> aliveBrokers, Predicate<In
LOGGER.warn("No broker loads available, using random partition leader selector");
} else {
this.brokerLoads = new PriorityQueue<>();
for (int brokerId : availableBrokers) {
if (!brokerPredicate.test(brokerId)) {
for (BrokerRegistration broker : availableBrokers) {
if (!broker.rack().equals(brokerToRemove.rack())) {
continue;
}
brokerLoads.offer(new BrokerLoad(brokerId, brokerLoadMap.getOrDefault(brokerId, 0.0)));
brokerLoads.offer(new BrokerLoad(broker.id(), brokerLoadMap.getOrDefault(broker.id(), 0.0)));
}
}
this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers, brokerPredicate);
this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toList()), id -> true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

package org.apache.kafka.controller.es;

import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.metadata.BrokerRegistration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -25,21 +27,30 @@ public class LoadAwarePartitionLeaderSelectorTest {

@Test
public void testLoadAwarePartitionLeaderSelector() {
List<Integer> aliveBrokers = List.of(0, 1, 2, 3, 4, 5);
Set<Integer> brokerSet = new HashSet<>(aliveBrokers);
int brokerToRemove = 5;
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove);
List<BrokerRegistration> aliveBrokers = List.of(
new BrokerRegistration.Builder().setId(0).build(),
new BrokerRegistration.Builder().setId(1).build(),
new BrokerRegistration.Builder().setId(2).build(),
new BrokerRegistration.Builder().setId(3).build(),
new BrokerRegistration.Builder().setId(4).build(),
new BrokerRegistration.Builder().setId(5).build());

Set<Integer> brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet());
BrokerRegistration brokerToRemove = aliveBrokers.get(aliveBrokers.size() - 1);
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove);

// fallback to random selector
int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertTrue(brokerId != brokerToRemove);
for (int i = 0; i < 100; i++) {
int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertTrue(brokerId != brokerToRemove.id());
}

// load aware selector
Map<Integer, Double> brokerLoads = setUpCluster();
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove);

brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1);
int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertEquals(0, brokerId);
brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 1)).orElse(-1);
Expand All @@ -62,7 +73,7 @@ public void testLoadAwarePartitionLeaderSelector() {
// tests exclude broker
brokerLoads = setUpCluster();
ClusterLoads.getInstance().updateExcludedBrokers(Set.of(1));
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove);

brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Expand All @@ -85,6 +96,55 @@ public void testLoadAwarePartitionLeaderSelector() {
Assertions.assertEquals(50.0, brokerLoads.get(5));
}

@Test
public void testLoadAwarePartitionLeaderSelectorWithRack() {
String rackA = "rack-a";
String rackB = "rack-b";
List<BrokerRegistration> aliveBrokers = List.of(
new BrokerRegistration.Builder().setId(0).setRack(Optional.of(rackA)).build(),
new BrokerRegistration.Builder().setId(1).setRack(Optional.of(rackB)).build(),
new BrokerRegistration.Builder().setId(2).setRack(Optional.of(rackB)).build(),
new BrokerRegistration.Builder().setId(3).setRack(Optional.of(rackB)).build(),
new BrokerRegistration.Builder().setId(4).setRack(Optional.of(rackB)).build());

Set<Integer> brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet());
setUpCluster();
BrokerRegistration brokerToRemove = aliveBrokers.get(0);
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove);

// fallback to random selector
for (int i = 0; i < 100; i++) {
int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertTrue(brokerId != brokerToRemove.id());
}

// load aware selector
Map<Integer, Double> brokerLoads = setUpCluster();
BrokerRegistration brokerToRemove1 = aliveBrokers.get(1);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove1);

int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertEquals(2, brokerId);
brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 1)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertEquals(2, brokerId);
brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 2)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertEquals(3, brokerId);
brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 3)).orElse(-1);
Assertions.assertTrue(brokerSet.contains(brokerId));
Assertions.assertEquals(2, brokerId);

Assertions.assertNull(brokerLoads.get(0));
Assertions.assertEquals(10.0, brokerLoads.get(1));
Assertions.assertEquals(55.0, brokerLoads.get(2));
Assertions.assertEquals(45.0, brokerLoads.get(3));
Assertions.assertEquals(40.0, brokerLoads.get(4));
Assertions.assertEquals(50.0, brokerLoads.get(5));
}

private Map<Integer, Double> setUpCluster() {
Map<Integer, Double> brokerLoads = new HashMap<>();
brokerLoads.put(1, 10.0);
Expand Down

0 comments on commit 1d7c829

Please sign in to comment.