Skip to content

Commit

Permalink
feat(auto_balancer): assign partitions within rack on broker fence (#…
Browse files Browse the repository at this point in the history
…1778) (#1783)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Aug 14, 2024
1 parent 7d91cb7 commit 242e7b3
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -849,10 +849,9 @@ public void replay(UpdateNextNodeIdRecord record) {
nextNodeId.set(record.nodeId());
}

public List<Integer> getActiveBrokers() {
public List<BrokerRegistration> getActiveBrokers() {
return brokerRegistrations.values().stream()
.map(BrokerRegistration::id)
.filter(this::isActive)
.filter(b -> isActive(b.id()))
.collect(Collectors.toList());
}
// AutoMQ inject end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,7 @@ void generateLeaderAndIsrUpdates(String context,
IntPredicate isAcceptableLeader = fencing ? r -> false :
r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r));

BrokerRegistration brokerRegistrationToRemove = clusterControl.brokerRegistrations().get(brokerToRemove);
PartitionLeaderSelector partitionLeaderSelector = null;
// AutoMQ for Kafka inject end

Expand Down Expand Up @@ -2157,8 +2158,7 @@ void generateLeaderAndIsrUpdates(String context,
builder.setTargetNode(brokerToAdd);
} else {
if (partitionLeaderSelector == null) {
partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(),
brokerId -> brokerId != brokerToRemove);
partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(), brokerRegistrationToRemove);
}
partitionLeaderSelector
.select(new TopicPartition(topic.name(), topicIdPart.partitionId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

package org.apache.kafka.controller.es;

import java.util.stream.Collectors;
import java.util.Random;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.common.WeightedRandomList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -23,46 +25,46 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;

public class LoadAwarePartitionLeaderSelector implements PartitionLeaderSelector {
private static final Logger LOGGER = LoggerFactory.getLogger(LoadAwarePartitionLeaderSelector.class);
private final WeightedRandomList<Integer> brokerLoads;
private final RandomPartitionLeaderSelector randomSelector;

public LoadAwarePartitionLeaderSelector(List<Integer> aliveBrokers, Predicate<Integer> brokerPredicate) {
this(new Random(), aliveBrokers, brokerPredicate);
public LoadAwarePartitionLeaderSelector(List<BrokerRegistration> aliveBrokers, BrokerRegistration brokerToRemove) {
this(new Random(), aliveBrokers, brokerToRemove);
}

public LoadAwarePartitionLeaderSelector(Random r, List<Integer> aliveBrokers, Predicate<Integer> brokerPredicate) {
public LoadAwarePartitionLeaderSelector(Random r, List<BrokerRegistration> aliveBrokers, BrokerRegistration brokerToRemove) {
Map<Integer, Double> brokerLoadMap = ClusterStats.getInstance().brokerLoads();
if (brokerLoadMap == null) {
this.brokerLoads = null;
LOGGER.warn("No broker loads available, using random partition leader selector");
this.randomSelector = new RandomPartitionLeaderSelector(aliveBrokers, brokerPredicate);
this.randomSelector = new RandomPartitionLeaderSelector(aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toList()), brokerId -> brokerId != brokerToRemove.id());
return;
}
Set<Integer> excludedBrokers = ClusterStats.getInstance().excludedBrokers();
if (excludedBrokers == null) {
excludedBrokers = new HashSet<>();
}
List<Integer> availableBrokers = new ArrayList<>();
for (int broker : aliveBrokers) {
if (!excludedBrokers.contains(broker)) {
List<BrokerRegistration> availableBrokers = new ArrayList<>();
for (BrokerRegistration broker : aliveBrokers) {
if (!excludedBrokers.contains(broker.id()) && broker.id() != brokerToRemove.id()) {
availableBrokers.add(broker);
}
}
brokerLoads = new WeightedRandomList<>(r);
for (int brokerId : availableBrokers) {
if (!brokerPredicate.test(brokerId) || !brokerLoadMap.containsKey(brokerId)) {
for (BrokerRegistration broker : availableBrokers) {
int brokerId = broker.id();
if (!broker.rack().equals(brokerToRemove.rack()) || !brokerLoadMap.containsKey(brokerId)) {
continue;
}
double load = Math.max(1, brokerLoadMap.get(brokerId));
// allocation weight is inversely proportional to the load
brokerLoads.add(new WeightedRandomList.Entity<>(brokerId, 1 / load));
}
brokerLoads.update();
this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers, brokerPredicate);
this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toList()), id -> true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,48 @@

package org.apache.kafka.controller.es;

import java.util.Collections;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.server.util.MockRandom;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class LoadAwarePartitionLeaderSelectorTest {

@AfterEach
public void tearDown() {
ClusterStats.getInstance().updateExcludedBrokers(Collections.emptySet());
ClusterStats.getInstance().updateBrokerLoads(Collections.emptyMap());
ClusterStats.getInstance().updatePartitionLoads(Collections.emptyMap());
}

@Test
public void testLoadAwarePartitionLeaderSelector() {
List<Integer> aliveBrokers = List.of(0, 1, 2, 3, 4, 5);
Set<Integer> brokerSet = new HashSet<>(aliveBrokers);
int brokerToRemove = 5;
List<BrokerRegistration> aliveBrokers = List.of(
new BrokerRegistration.Builder().setId(0).build(),
new BrokerRegistration.Builder().setId(1).build(),
new BrokerRegistration.Builder().setId(2).build(),
new BrokerRegistration.Builder().setId(3).build(),
new BrokerRegistration.Builder().setId(4).build(),
new BrokerRegistration.Builder().setId(5).build());
Set<Integer> brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet());
BrokerRegistration brokerToRemove = aliveBrokers.get(aliveBrokers.size() - 1);
MockRandom random = new MockRandom();
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random,
aliveBrokers, broker -> broker != brokerToRemove);
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);

// fallback to random selector
setUpCluster();
Map<Integer, Double> brokerLoads = new HashMap<>();
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
Assertions.assertEquals(4000, brokerLoads.get(0));
Assertions.assertEquals(4000, brokerLoads.get(1));
Assertions.assertEquals(4000, brokerLoads.get(2));
Expand All @@ -45,8 +61,8 @@ public void testLoadAwarePartitionLeaderSelector() {

// load aware selector
brokerLoads = setUpCluster();
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, broker -> broker != brokerToRemove);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
Assertions.assertEquals(5990, brokerLoads.get(0));
Assertions.assertEquals(7660, brokerLoads.get(1));
Assertions.assertEquals(6720, brokerLoads.get(2));
Expand All @@ -57,8 +73,8 @@ public void testLoadAwarePartitionLeaderSelector() {
brokerLoads = setUpCluster();
brokerLoads.remove(1);
ClusterStats.getInstance().updateBrokerLoads(brokerLoads);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, broker -> broker != brokerToRemove);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
Assertions.assertEquals(6840, brokerLoads.get(0));
Assertions.assertEquals(7280, brokerLoads.get(2));
Assertions.assertEquals(7950, brokerLoads.get(3));
Expand All @@ -67,8 +83,8 @@ public void testLoadAwarePartitionLeaderSelector() {
// tests exclude broker
brokerLoads = setUpCluster();
ClusterStats.getInstance().updateExcludedBrokers(Set.of(1));
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, broker -> broker != brokerToRemove);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove, brokerLoads);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
Assertions.assertEquals(6970, brokerLoads.get(0));
Assertions.assertEquals(5000, brokerLoads.get(1));
Assertions.assertEquals(7210, brokerLoads.get(2));
Expand All @@ -93,6 +109,46 @@ private void randomSelect(LoadAwarePartitionLeaderSelector selector, int count,
}
}

@Test
public void testLoadAwarePartitionLeaderSelectorWithRack() {
String rackA = "rack-a";
String rackB = "rack-b";
List<BrokerRegistration> aliveBrokers = List.of(
new BrokerRegistration.Builder().setId(0).setRack(Optional.of(rackA)).build(),
new BrokerRegistration.Builder().setId(1).setRack(Optional.of(rackB)).build(),
new BrokerRegistration.Builder().setId(2).setRack(Optional.of(rackB)).build(),
new BrokerRegistration.Builder().setId(3).setRack(Optional.of(rackB)).build(),
new BrokerRegistration.Builder().setId(4).setRack(Optional.of(rackB)).build(),
new BrokerRegistration.Builder().setId(5).setRack(Optional.of(rackB)).build());

Set<Integer> brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet());
setUpCluster();
BrokerRegistration brokerToRemove = aliveBrokers.get(0);
MockRandom random = new MockRandom();
LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);

// fallback to random selector
Map<Integer, Double> brokerLoads = new HashMap<>();
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
Assertions.assertEquals(4000, brokerLoads.get(1));
Assertions.assertEquals(4000, brokerLoads.get(2));
Assertions.assertEquals(4000, brokerLoads.get(3));
Assertions.assertEquals(4000, brokerLoads.get(4));
Assertions.assertEquals(4000, brokerLoads.get(5));

// load aware selector
brokerLoads = setUpCluster();
brokerToRemove = aliveBrokers.get(1);
loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(random, aliveBrokers, brokerToRemove);
randomSelect(loadAwarePartitionLeaderSelector, 2000, brokerSet, brokerToRemove.id(), brokerLoads);
Assertions.assertEquals(0, brokerLoads.get(0));
Assertions.assertEquals(5000, brokerLoads.get(1));
Assertions.assertEquals(7330, brokerLoads.get(2));
Assertions.assertEquals(7840, brokerLoads.get(3));
Assertions.assertEquals(7120, brokerLoads.get(4));
Assertions.assertEquals(6710, brokerLoads.get(5));
}

private Map<Integer, Double> setUpCluster() {
Map<Integer, Double> brokerLoads = new HashMap<>();
brokerLoads.put(0, 0.0);
Expand Down

0 comments on commit 242e7b3

Please sign in to comment.