From bb0408ebeda878face21e7d52861e4e7afb1a8d7 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Thu, 5 Sep 2024 11:59:26 +0800 Subject: [PATCH] fix(auto_balancer): fix broker status change (#1991) Signed-off-by: Shichao Nie --- .../kafka/autobalancer/LoadRetriever.java | 29 +++++++++---------- .../java/kafka/autobalancer/common/Utils.java | 29 +++++++++++++++++++ .../ControllerActionExecutorService.java | 25 +++++++++------- .../model/RecordClusterModel.java | 23 ++++----------- .../controller/ReplicationControlManager.java | 8 ++++- 5 files changed, 70 insertions(+), 44 deletions(-) diff --git a/core/src/main/java/kafka/autobalancer/LoadRetriever.java b/core/src/main/java/kafka/autobalancer/LoadRetriever.java index 7bc97fdb38..4cea520ee8 100644 --- a/core/src/main/java/kafka/autobalancer/LoadRetriever.java +++ b/core/src/main/java/kafka/autobalancer/LoadRetriever.java @@ -12,6 +12,7 @@ package kafka.autobalancer; import com.automq.stream.utils.LogContext; +import java.util.Optional; import kafka.autobalancer.common.AutoBalancerThreadFactory; import kafka.autobalancer.common.Utils; import kafka.autobalancer.common.types.MetricTypes; @@ -48,8 +49,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.controller.Controller; import org.apache.kafka.controller.ControllerRequestContext; -import org.apache.kafka.metadata.BrokerRegistrationFencingChange; -import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; import java.time.Duration; import java.util.Collections; @@ -203,7 +202,6 @@ public boolean isValid() { public void onBrokerRegister(RegisterBrokerRecord record) { lock.lock(); try { - boolean isFenced = record.fenced() || record.inControlledShutdown(); Set endpoints = new HashSet<>(); for (RegisterBrokerRecord.BrokerEndpoint endpoint : record.endPoints()) { if ("CONTROLLER".equals(endpoint.name())) { @@ -218,7 +216,7 @@ public void onBrokerRegister(RegisterBrokerRecord record) { logger.warn("No valid endpoint found for broker {} of name {}", record.brokerId(), listenerName); } BrokerEndpoints brokerEndpoints = new BrokerEndpoints(record.brokerId()); - brokerEndpoints.setFenced(isFenced); + brokerEndpoints.setFenced(Utils.isBrokerFenced(record)); brokerEndpoints.setEndpoints(endpoints); this.bootstrapServerMap.put(record.brokerId(), brokerEndpoints); cond.signal(); @@ -240,18 +238,19 @@ public void onBrokerUnregister(UnregisterBrokerRecord record) { @Override public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) { - boolean isFenced = record.fenced() == BrokerRegistrationFencingChange.FENCE.value() - || record.inControlledShutdown() == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value(); - lock.lock(); - try { - BrokerEndpoints brokerEndpoints = this.bootstrapServerMap.get(record.brokerId()); - if (brokerEndpoints != null) { - brokerEndpoints.setFenced(isFenced); + Optional isBrokerFenced = Utils.isBrokerFenced(record); + isBrokerFenced.ifPresent(isFenced -> { + lock.lock(); + try { + BrokerEndpoints brokerEndpoints = this.bootstrapServerMap.get(record.brokerId()); + if (brokerEndpoints != null) { + brokerEndpoints.setFenced(isFenced); + } + cond.signal(); + } finally { + lock.unlock(); } - cond.signal(); - } finally { - lock.unlock(); - } + }); } private boolean hasAvailableBrokerInUse() { diff --git a/core/src/main/java/kafka/autobalancer/common/Utils.java b/core/src/main/java/kafka/autobalancer/common/Utils.java index 4130f47cd8..95830a00cd 100644 --- a/core/src/main/java/kafka/autobalancer/common/Utils.java +++ b/core/src/main/java/kafka/autobalancer/common/Utils.java @@ -12,6 +12,11 @@ package kafka.autobalancer.common; import java.text.DecimalFormat; +import java.util.Optional; +import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; +import org.apache.kafka.metadata.BrokerRegistrationFencingChange; +import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; public class Utils { @@ -53,4 +58,28 @@ public static boolean checkListenerName(String listenerName, String expect) { } return expect.isEmpty() || listenerName.equals(expect); } + + public static Optional isBrokerFenced(BrokerRegistrationChangeRecord record) { + BrokerRegistrationFencingChange fencingChange = + BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow( + () -> new IllegalStateException(String.format("Unable to replay %s: unknown " + + "value for fenced field: %x", record, record.fenced()))); + BrokerRegistrationInControlledShutdownChange inControlledShutdownChange = + BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow( + () -> new IllegalStateException(String.format("Unable to replay %s: unknown " + + "value for inControlledShutdown field: %x", record, record.inControlledShutdown()))); + if (fencingChange == BrokerRegistrationFencingChange.FENCE + || inControlledShutdownChange == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN) { + return Optional.of(true); + } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) { + return Optional.of(false); + } else { + // broker status unchanged + return Optional.empty(); + } + } + + public static boolean isBrokerFenced(RegisterBrokerRecord record) { + return record.fenced() || record.inControlledShutdown(); + } } diff --git a/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java b/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java index 9361733d6b..5859c57f9a 100644 --- a/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java +++ b/core/src/main/java/kafka/autobalancer/executor/ControllerActionExecutorService.java @@ -12,9 +12,11 @@ package kafka.autobalancer.executor; import com.automq.stream.utils.LogContext; +import java.util.Optional; import kafka.autobalancer.common.Action; import kafka.autobalancer.common.ActionType; import kafka.autobalancer.common.AutoBalancerConstants; +import kafka.autobalancer.common.Utils; import kafka.autobalancer.listeners.BrokerStatusListener; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ApiException; @@ -27,8 +29,6 @@ import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.controller.Controller; import org.apache.kafka.controller.ControllerRequestContext; -import org.apache.kafka.metadata.BrokerRegistrationFencingChange; -import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; import org.slf4j.Logger; import java.util.ArrayList; @@ -170,7 +170,11 @@ private AlterPartitionReassignmentsRequestData.ReassignablePartition buildPartit @Override public void onBrokerRegister(RegisterBrokerRecord record) { - fencedBrokers.remove(record.brokerId()); + if (Utils.isBrokerFenced(record)) { + fencedBrokers.add(record.brokerId()); + } else { + fencedBrokers.remove(record.brokerId()); + } } @Override @@ -180,13 +184,14 @@ public void onBrokerUnregister(UnregisterBrokerRecord record) { @Override public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) { - boolean fenced = record.fenced() == BrokerRegistrationFencingChange.FENCE.value() - || record.inControlledShutdown() == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value(); - if (fenced) { - fencedBrokers.add(record.brokerId()); - } else { - fencedBrokers.remove(record.brokerId()); - } + Optional isBrokerFenced = Utils.isBrokerFenced(record); + isBrokerFenced.ifPresent(isFenced -> { + if (isFenced) { + fencedBrokers.add(record.brokerId()); + } else { + fencedBrokers.remove(record.brokerId()); + } + }); } private static class Task { diff --git a/core/src/main/java/kafka/autobalancer/model/RecordClusterModel.java b/core/src/main/java/kafka/autobalancer/model/RecordClusterModel.java index 9880001fd3..d760bcf48b 100644 --- a/core/src/main/java/kafka/autobalancer/model/RecordClusterModel.java +++ b/core/src/main/java/kafka/autobalancer/model/RecordClusterModel.java @@ -12,6 +12,8 @@ package kafka.autobalancer.model; import com.automq.stream.utils.LogContext; +import java.util.Optional; +import kafka.autobalancer.common.Utils; import kafka.autobalancer.listeners.BrokerStatusListener; import kafka.autobalancer.listeners.TopicPartitionStatusListener; import org.apache.kafka.common.Uuid; @@ -22,8 +24,6 @@ import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; -import org.apache.kafka.metadata.BrokerRegistrationFencingChange; -import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; import org.apache.kafka.metadata.LeaderConstants; public class RecordClusterModel extends ClusterModel implements BrokerStatusListener, TopicPartitionStatusListener { @@ -38,8 +38,7 @@ public RecordClusterModel(LogContext logContext) { @Override public void onBrokerRegister(RegisterBrokerRecord record) { - boolean isActive = !record.fenced() && !record.inControlledShutdown(); - registerBroker(record.brokerId(), record.rack(), isActive); + registerBroker(record.brokerId(), record.rack(), !Utils.isBrokerFenced(record)); } @Override @@ -49,20 +48,8 @@ public void onBrokerUnregister(UnregisterBrokerRecord record) { @Override public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) { - BrokerRegistrationFencingChange fencingChange = - BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow( - () -> new IllegalStateException(String.format("Unable to replay %s: unknown " + - "value for fenced field: %x", record, record.fenced()))); - BrokerRegistrationInControlledShutdownChange inControlledShutdownChange = - BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow( - () -> new IllegalStateException(String.format("Unable to replay %s: unknown " + - "value for inControlledShutdown field: %x", record, record.inControlledShutdown()))); - if (fencingChange == BrokerRegistrationFencingChange.FENCE - || inControlledShutdownChange == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN) { - changeBrokerStatus(record.brokerId(), false); - } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) { - changeBrokerStatus(record.brokerId(), true); - } + Optional isBrokerFenced = Utils.isBrokerFenced(record); + isBrokerFenced.ifPresent(isFenced -> changeBrokerStatus(record.brokerId(), !isFenced)); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 57388b1c42..bcff2f8b6c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -2026,11 +2026,17 @@ void validateManualPartitionAssignment( sortedBrokerIds.sort(Integer::compare); Integer prevBrokerId = null; for (Integer brokerId : sortedBrokerIds) { - if (!clusterControl.brokerRegistrations().containsKey(brokerId)) { + BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); + if (brokerRegistration == null) { throw new InvalidReplicaAssignmentException("The manual partition " + "assignment includes broker " + brokerId + ", but no such broker is " + "registered."); } + if (brokerRegistration.fenced()) { + throw new InvalidReplicaAssignmentException("The manual partition " + + "assignment includes broker " + brokerId + ", but this broker is " + + "currently fenced."); + } if (brokerId.equals(prevBrokerId)) { throw new InvalidReplicaAssignmentException("The manual partition " + "assignment includes the broker " + prevBrokerId + " more than " +