diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 02629315f7..a0a5003eb9 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -826,10 +826,9 @@ public void replay(UpdateNextNodeIdRecord record) { nextNodeId.set(record.nodeId()); } - public List getActiveBrokers() { + public List getActiveBrokers() { return brokerRegistrations.values().stream() - .map(BrokerRegistration::id) - .filter(this::isActive) + .filter(b -> isActive(b.id())) .collect(Collectors.toList()); } // AutoMQ inject end diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 3945fdd497..713725c008 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -2008,6 +2008,7 @@ void generateLeaderAndIsrUpdates(String context, IntPredicate isAcceptableLeader = fencing ? r -> false : r -> (r != brokerToRemove) && (r == brokerToAdd || clusterControl.isActive(r)); + BrokerRegistration brokerRegistrationToRemove = clusterControl.brokerRegistrations().get(brokerToRemove); PartitionLeaderSelector partitionLeaderSelector = null; // AutoMQ for Kafka inject end @@ -2047,8 +2048,7 @@ void generateLeaderAndIsrUpdates(String context, builder.setTargetNode(brokerToAdd); } else { if (partitionLeaderSelector == null) { - partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(), - brokerId -> brokerId != brokerToRemove); + partitionLeaderSelector = new LoadAwarePartitionLeaderSelector(clusterControl.getActiveBrokers(), brokerRegistrationToRemove); } partitionLeaderSelector .select(new TopicPartition(topic.name(), topicIdPart.partitionId())) diff --git a/metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java b/metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java index 9927509915..979de860cb 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java +++ b/metadata/src/main/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelector.java @@ -11,7 +11,9 @@ package org.apache.kafka.controller.es; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.metadata.BrokerRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +24,6 @@ import java.util.Optional; import java.util.PriorityQueue; import java.util.Set; -import java.util.function.Predicate; public class LoadAwarePartitionLeaderSelector implements PartitionLeaderSelector { private static final Logger LOGGER = LoggerFactory.getLogger(LoadAwarePartitionLeaderSelector.class); @@ -30,14 +31,14 @@ public class LoadAwarePartitionLeaderSelector implements PartitionLeaderSelector private final RandomPartitionLeaderSelector randomSelector; private final Map brokerLoadMap; - public LoadAwarePartitionLeaderSelector(List aliveBrokers, Predicate brokerPredicate) { + public LoadAwarePartitionLeaderSelector(List aliveBrokers, BrokerRegistration brokerToRemove) { Set excludedBrokers = ClusterLoads.getInstance().excludedBrokers(); if (excludedBrokers == null) { excludedBrokers = new HashSet<>(); } - List availableBrokers = new ArrayList<>(); - for (int broker : aliveBrokers) { - if (!excludedBrokers.contains(broker)) { + List availableBrokers = new ArrayList<>(); + for (BrokerRegistration broker : aliveBrokers) { + if (!excludedBrokers.contains(broker.id()) && broker.id() != brokerToRemove.id()) { availableBrokers.add(broker); } } @@ -47,14 +48,14 @@ public LoadAwarePartitionLeaderSelector(List aliveBrokers, Predicate(); - for (int brokerId : availableBrokers) { - if (!brokerPredicate.test(brokerId)) { + for (BrokerRegistration broker : availableBrokers) { + if (!broker.rack().equals(brokerToRemove.rack())) { continue; } - brokerLoads.offer(new BrokerLoad(brokerId, brokerLoadMap.getOrDefault(brokerId, 0.0))); + brokerLoads.offer(new BrokerLoad(broker.id(), brokerLoadMap.getOrDefault(broker.id(), 0.0))); } } - this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers, brokerPredicate); + this.randomSelector = new RandomPartitionLeaderSelector(availableBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toList()), id -> true); } @Override diff --git a/metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java b/metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java index 3800ccb595..712a007522 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/es/LoadAwarePartitionLeaderSelectorTest.java @@ -11,12 +11,14 @@ package org.apache.kafka.controller.es; +import java.util.Optional; +import java.util.stream.Collectors; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.metadata.BrokerRegistration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -25,21 +27,30 @@ public class LoadAwarePartitionLeaderSelectorTest { @Test public void testLoadAwarePartitionLeaderSelector() { - List aliveBrokers = List.of(0, 1, 2, 3, 4, 5); - Set brokerSet = new HashSet<>(aliveBrokers); - int brokerToRemove = 5; - LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove); + List aliveBrokers = List.of( + new BrokerRegistration.Builder().setId(0).build(), + new BrokerRegistration.Builder().setId(1).build(), + new BrokerRegistration.Builder().setId(2).build(), + new BrokerRegistration.Builder().setId(3).build(), + new BrokerRegistration.Builder().setId(4).build(), + new BrokerRegistration.Builder().setId(5).build()); + + Set brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet()); + BrokerRegistration brokerToRemove = aliveBrokers.get(aliveBrokers.size() - 1); + LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove); // fallback to random selector - int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); - Assertions.assertTrue(brokerSet.contains(brokerId)); - Assertions.assertTrue(brokerId != brokerToRemove); + for (int i = 0; i < 100; i++) { + int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertTrue(brokerId != brokerToRemove.id()); + } // load aware selector Map brokerLoads = setUpCluster(); - loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove); + loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove); - brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); + int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); Assertions.assertTrue(brokerSet.contains(brokerId)); Assertions.assertEquals(0, brokerId); brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 1)).orElse(-1); @@ -62,7 +73,7 @@ public void testLoadAwarePartitionLeaderSelector() { // tests exclude broker brokerLoads = setUpCluster(); ClusterLoads.getInstance().updateExcludedBrokers(Set.of(1)); - loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, broker -> broker != brokerToRemove); + loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove); brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); Assertions.assertTrue(brokerSet.contains(brokerId)); @@ -85,6 +96,55 @@ public void testLoadAwarePartitionLeaderSelector() { Assertions.assertEquals(50.0, brokerLoads.get(5)); } + @Test + public void testLoadAwarePartitionLeaderSelectorWithRack() { + String rackA = "rack-a"; + String rackB = "rack-b"; + List aliveBrokers = List.of( + new BrokerRegistration.Builder().setId(0).setRack(Optional.of(rackA)).build(), + new BrokerRegistration.Builder().setId(1).setRack(Optional.of(rackB)).build(), + new BrokerRegistration.Builder().setId(2).setRack(Optional.of(rackB)).build(), + new BrokerRegistration.Builder().setId(3).setRack(Optional.of(rackB)).build(), + new BrokerRegistration.Builder().setId(4).setRack(Optional.of(rackB)).build()); + + Set brokerSet = aliveBrokers.stream().map(BrokerRegistration::id).collect(Collectors.toSet()); + setUpCluster(); + BrokerRegistration brokerToRemove = aliveBrokers.get(0); + LoadAwarePartitionLeaderSelector loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove); + + // fallback to random selector + for (int i = 0; i < 100; i++) { + int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertTrue(brokerId != brokerToRemove.id()); + } + + // load aware selector + Map brokerLoads = setUpCluster(); + BrokerRegistration brokerToRemove1 = aliveBrokers.get(1); + loadAwarePartitionLeaderSelector = new LoadAwarePartitionLeaderSelector(aliveBrokers, brokerToRemove1); + + int brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 0)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(2, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 1)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(2, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 2)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(3, brokerId); + brokerId = loadAwarePartitionLeaderSelector.select(new TopicPartition("topic", 3)).orElse(-1); + Assertions.assertTrue(brokerSet.contains(brokerId)); + Assertions.assertEquals(2, brokerId); + + Assertions.assertNull(brokerLoads.get(0)); + Assertions.assertEquals(10.0, brokerLoads.get(1)); + Assertions.assertEquals(55.0, brokerLoads.get(2)); + Assertions.assertEquals(45.0, brokerLoads.get(3)); + Assertions.assertEquals(40.0, brokerLoads.get(4)); + Assertions.assertEquals(50.0, brokerLoads.get(5)); + } + private Map setUpCluster() { Map brokerLoads = new HashMap<>(); brokerLoads.put(1, 10.0);