Skip to content

Commit

Permalink
fix(auto_balancer): fix broker status change (#1991)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Sep 5, 2024
1 parent 185ec02 commit bb0408e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 44 deletions.
29 changes: 14 additions & 15 deletions core/src/main/java/kafka/autobalancer/LoadRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package kafka.autobalancer;

import com.automq.stream.utils.LogContext;
import java.util.Optional;
import kafka.autobalancer.common.AutoBalancerThreadFactory;
import kafka.autobalancer.common.Utils;
import kafka.autobalancer.common.types.MetricTypes;
Expand Down Expand Up @@ -48,8 +49,6 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;

import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -203,7 +202,6 @@ public boolean isValid() {
public void onBrokerRegister(RegisterBrokerRecord record) {
lock.lock();
try {
boolean isFenced = record.fenced() || record.inControlledShutdown();
Set<String> endpoints = new HashSet<>();
for (RegisterBrokerRecord.BrokerEndpoint endpoint : record.endPoints()) {
if ("CONTROLLER".equals(endpoint.name())) {
Expand All @@ -218,7 +216,7 @@ public void onBrokerRegister(RegisterBrokerRecord record) {
logger.warn("No valid endpoint found for broker {} of name {}", record.brokerId(), listenerName);
}
BrokerEndpoints brokerEndpoints = new BrokerEndpoints(record.brokerId());
brokerEndpoints.setFenced(isFenced);
brokerEndpoints.setFenced(Utils.isBrokerFenced(record));
brokerEndpoints.setEndpoints(endpoints);
this.bootstrapServerMap.put(record.brokerId(), brokerEndpoints);
cond.signal();
Expand All @@ -240,18 +238,19 @@ public void onBrokerUnregister(UnregisterBrokerRecord record) {

@Override
public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) {
boolean isFenced = record.fenced() == BrokerRegistrationFencingChange.FENCE.value()
|| record.inControlledShutdown() == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value();
lock.lock();
try {
BrokerEndpoints brokerEndpoints = this.bootstrapServerMap.get(record.brokerId());
if (brokerEndpoints != null) {
brokerEndpoints.setFenced(isFenced);
Optional<Boolean> isBrokerFenced = Utils.isBrokerFenced(record);
isBrokerFenced.ifPresent(isFenced -> {
lock.lock();
try {
BrokerEndpoints brokerEndpoints = this.bootstrapServerMap.get(record.brokerId());
if (brokerEndpoints != null) {
brokerEndpoints.setFenced(isFenced);
}
cond.signal();
} finally {
lock.unlock();
}
cond.signal();
} finally {
lock.unlock();
}
});
}

private boolean hasAvailableBrokerInUse() {
Expand Down
29 changes: 29 additions & 0 deletions core/src/main/java/kafka/autobalancer/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
package kafka.autobalancer.common;

import java.text.DecimalFormat;
import java.util.Optional;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;

public class Utils {

Expand Down Expand Up @@ -53,4 +58,28 @@ public static boolean checkListenerName(String listenerName, String expect) {
}
return expect.isEmpty() || listenerName.equals(expect);
}

public static Optional<Boolean> isBrokerFenced(BrokerRegistrationChangeRecord record) {
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for fenced field: %x", record, record.fenced())));
BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
if (fencingChange == BrokerRegistrationFencingChange.FENCE
|| inControlledShutdownChange == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN) {
return Optional.of(true);
} else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
return Optional.of(false);
} else {
// broker status unchanged
return Optional.empty();
}
}

public static boolean isBrokerFenced(RegisterBrokerRecord record) {
return record.fenced() || record.inControlledShutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
package kafka.autobalancer.executor;

import com.automq.stream.utils.LogContext;
import java.util.Optional;
import kafka.autobalancer.common.Action;
import kafka.autobalancer.common.ActionType;
import kafka.autobalancer.common.AutoBalancerConstants;
import kafka.autobalancer.common.Utils;
import kafka.autobalancer.listeners.BrokerStatusListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
Expand All @@ -27,8 +29,6 @@
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.slf4j.Logger;

import java.util.ArrayList;
Expand Down Expand Up @@ -170,7 +170,11 @@ private AlterPartitionReassignmentsRequestData.ReassignablePartition buildPartit

@Override
public void onBrokerRegister(RegisterBrokerRecord record) {
fencedBrokers.remove(record.brokerId());
if (Utils.isBrokerFenced(record)) {
fencedBrokers.add(record.brokerId());
} else {
fencedBrokers.remove(record.brokerId());
}
}

@Override
Expand All @@ -180,13 +184,14 @@ public void onBrokerUnregister(UnregisterBrokerRecord record) {

@Override
public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) {
boolean fenced = record.fenced() == BrokerRegistrationFencingChange.FENCE.value()
|| record.inControlledShutdown() == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value();
if (fenced) {
fencedBrokers.add(record.brokerId());
} else {
fencedBrokers.remove(record.brokerId());
}
Optional<Boolean> isBrokerFenced = Utils.isBrokerFenced(record);
isBrokerFenced.ifPresent(isFenced -> {
if (isFenced) {
fencedBrokers.add(record.brokerId());
} else {
fencedBrokers.remove(record.brokerId());
}
});
}

private static class Task {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
package kafka.autobalancer.model;

import com.automq.stream.utils.LogContext;
import java.util.Optional;
import kafka.autobalancer.common.Utils;
import kafka.autobalancer.listeners.BrokerStatusListener;
import kafka.autobalancer.listeners.TopicPartitionStatusListener;
import org.apache.kafka.common.Uuid;
Expand All @@ -22,8 +24,6 @@
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.LeaderConstants;

public class RecordClusterModel extends ClusterModel implements BrokerStatusListener, TopicPartitionStatusListener {
Expand All @@ -38,8 +38,7 @@ public RecordClusterModel(LogContext logContext) {

@Override
public void onBrokerRegister(RegisterBrokerRecord record) {
boolean isActive = !record.fenced() && !record.inControlledShutdown();
registerBroker(record.brokerId(), record.rack(), isActive);
registerBroker(record.brokerId(), record.rack(), !Utils.isBrokerFenced(record));
}

@Override
Expand All @@ -49,20 +48,8 @@ public void onBrokerUnregister(UnregisterBrokerRecord record) {

@Override
public void onBrokerRegistrationChanged(BrokerRegistrationChangeRecord record) {
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for fenced field: %x", record, record.fenced())));
BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
if (fencingChange == BrokerRegistrationFencingChange.FENCE
|| inControlledShutdownChange == BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN) {
changeBrokerStatus(record.brokerId(), false);
} else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
changeBrokerStatus(record.brokerId(), true);
}
Optional<Boolean> isBrokerFenced = Utils.isBrokerFenced(record);
isBrokerFenced.ifPresent(isFenced -> changeBrokerStatus(record.brokerId(), !isFenced));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2026,11 +2026,17 @@ void validateManualPartitionAssignment(
sortedBrokerIds.sort(Integer::compare);
Integer prevBrokerId = null;
for (Integer brokerId : sortedBrokerIds) {
if (!clusterControl.brokerRegistrations().containsKey(brokerId)) {
BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId);
if (brokerRegistration == null) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes broker " + brokerId + ", but no such broker is " +
"registered.");
}
if (brokerRegistration.fenced()) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes broker " + brokerId + ", but this broker is " +
"currently fenced.");
}
if (brokerId.equals(prevBrokerId)) {
throw new InvalidReplicaAssignmentException("The manual partition " +
"assignment includes the broker " + prevBrokerId + " more than " +
Expand Down

0 comments on commit bb0408e

Please sign in to comment.