Skip to content

Commit

Permalink
KAFKA-15586: Clean shutdown detection - server side (apache#14706)
Browse files Browse the repository at this point in the history
If the broker registers with the same broker epoch as the previous session, it is recognized as a clean shutdown. Otherwise, it is an unclean shutdown. This replica will be removed from any ELR.

Reviewers: Artem Livshits <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
CalvinConfluent authored Apr 4, 2024
1 parent ef7f823 commit 376e9e2
Show file tree
Hide file tree
Showing 11 changed files with 590 additions and 22 deletions.
162 changes: 162 additions & 0 deletions metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;

import static org.apache.kafka.metadata.Replicas.NONE;

public class BrokersToElrs {
private final SnapshotRegistry snapshotRegistry;

// It maps from the broker id to the topic id partitions if the partition has ELR.
private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> elrMembers;

BrokersToElrs(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.elrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
}

/**
* Update our records of a partition's ELR.
*
* @param topicId The topic ID of the partition.
* @param partitionId The partition ID of the partition.
* @param prevElr The previous ELR, or null if the partition is new.
* @param nextElr The new ELR, or null if the partition is being removed.
*/

void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) {
int[] prev;
if (prevElr == null) {
prev = NONE;
} else {
prev = Replicas.clone(prevElr);
Arrays.sort(prev);
}
int[] next;
if (nextElr == null) {
next = NONE;
} else {
next = Replicas.clone(nextElr);
Arrays.sort(next);
}

int i = 0, j = 0;
while (true) {
if (i == prev.length) {
if (j == next.length) {
break;
}
int newReplica = next[j];
add(newReplica, topicId, partitionId);
j++;
} else if (j == next.length) {
int prevReplica = prev[i];
remove(prevReplica, topicId, partitionId);
i++;
} else {
int prevReplica = prev[i];
int newReplica = next[j];
if (prevReplica < newReplica) {
remove(prevReplica, topicId, partitionId);
i++;
} else if (prevReplica > newReplica) {
add(newReplica, topicId, partitionId);
j++;
} else {
i++;
j++;
}
}
}
}

void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
Map<Uuid, int[]> topicMap = elrMembers.get(brokerId);
if (topicMap != null) {
topicMap.remove(topicId);
}
}

private void add(int brokerId, Uuid topicId, int newPartition) {
TimelineHashMap<Uuid, int[]> topicMap = elrMembers.get(brokerId);
if (topicMap == null) {
topicMap = new TimelineHashMap<>(snapshotRegistry, 0);
elrMembers.put(brokerId, topicMap);
}
int[] partitions = topicMap.get(topicId);
int[] newPartitions;
if (partitions == null) {
newPartitions = new int[1];
} else {
newPartitions = new int[partitions.length + 1];
System.arraycopy(partitions, 0, newPartitions, 0, partitions.length);
}
newPartitions[newPartitions.length - 1] = newPartition;
topicMap.put(topicId, newPartitions);
}

private void remove(int brokerId, Uuid topicId, int removedPartition) {
TimelineHashMap<Uuid, int[]> topicMap = elrMembers.get(brokerId);
if (topicMap == null) {
throw new RuntimeException("Broker " + brokerId + " has no elrMembers " +
"entry, so we can't remove " + topicId + ":" + removedPartition);
}
int[] partitions = topicMap.get(topicId);
if (partitions == null) {
throw new RuntimeException("Broker " + brokerId + " has no " +
"entry in elrMembers for topic " + topicId);
}
if (partitions.length == 1) {
if (partitions[0] != removedPartition) {
throw new RuntimeException("Broker " + brokerId + " has no " +
"entry in elrMembers for " + topicId + ":" + removedPartition);
}
topicMap.remove(topicId);
if (topicMap.isEmpty()) {
elrMembers.remove(brokerId);
}
} else {
int[] newPartitions = new int[partitions.length - 1];
int j = 0;
for (int i = 0; i < partitions.length; i++) {
int partition = partitions[i];
if (partition != removedPartition) {
newPartitions[j++] = partition;
}
}
topicMap.put(topicId, newPartitions);
}
}

BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithBrokerInElr(int brokerId) {
Map<Uuid, int[]> topicMap = elrMembers.get(brokerId);
if (topicMap == null) {
topicMap = Collections.emptyMap();
}
return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ static class Builder {
private ReplicaPlacer replicaPlacer = null;
private FeatureControlManager featureControl = null;
private boolean zkMigrationEnabled = false;
private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler = null;

Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
Expand Down Expand Up @@ -132,6 +133,11 @@ Builder setZkMigrationEnabled(boolean zkMigrationEnabled) {
return this;
}

Builder setBrokerUncleanShutdownHandler(BrokerUncleanShutdownHandler brokerUncleanShutdownHandler) {
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
return this;
}

ClusterControlManager build() {
if (logContext == null) {
logContext = new LogContext();
Expand All @@ -148,14 +154,18 @@ ClusterControlManager build() {
if (featureControl == null) {
throw new RuntimeException("You must specify FeatureControlManager");
}
if (brokerUncleanShutdownHandler == null) {
throw new RuntimeException("You must specify BrokerUncleanShutdownHandler");
}
return new ClusterControlManager(logContext,
clusterId,
time,
snapshotRegistry,
sessionTimeoutNs,
replicaPlacer,
featureControl,
zkMigrationEnabled
zkMigrationEnabled,
brokerUncleanShutdownHandler
);
}
}
Expand Down Expand Up @@ -247,6 +257,8 @@ boolean check() {
*/
private final boolean zkMigrationEnabled;

private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;

/**
* Maps controller IDs to controller registrations.
*/
Expand All @@ -265,7 +277,8 @@ private ClusterControlManager(
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
FeatureControlManager featureControl,
boolean zkMigrationEnabled
boolean zkMigrationEnabled,
BrokerUncleanShutdownHandler brokerUncleanShutdownHandler
) {
this.logContext = logContext;
this.clusterId = clusterId;
Expand All @@ -281,6 +294,7 @@ private ClusterControlManager(
this.zkMigrationEnabled = zkMigrationEnabled;
this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
}

ReplicaPlacer replicaPlacer() {
Expand Down Expand Up @@ -336,10 +350,11 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
", but got cluster ID " + request.clusterId());
}
int brokerId = request.brokerId();
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) {
// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown.
log.debug("Received an unclean shutdown request");
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
}
if (existing != null) {
if (heartbeatManager.hasValidSession(brokerId)) {
Expand Down Expand Up @@ -410,7 +425,6 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(

heartbeatManager.register(brokerId, record.fenced());

List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record, featureControl.metadataVersion().
registerBrokerRecordVersion()));
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
Expand Down Expand Up @@ -780,4 +794,9 @@ public Entry<Integer, Map<String, VersionRange>> next() {
}
};
}

@FunctionalInterface
interface BrokerUncleanShutdownHandler {
void addRecordsForShutdown(int brokerId, List<ApiMessageAndVersion> records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
partition.lastKnownElr[0] != partition.leader)) {
// Only update the last known leader when the first time the partition becomes leaderless.
record.setLastKnownElr(Arrays.asList(partition.leader));
} else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER))
&& partition.lastKnownElr.length > 0) {
// Clear the LastKnownElr field if the partition will have or continues to have a valid leader.
record.setLastKnownElr(Collections.emptyList());
}
}

Expand All @@ -517,6 +521,10 @@ private void maybeUpdateRecordElr(PartitionChangeRecord record) {
targetLastKnownElr = Replicas.toList(partition.lastKnownElr);
}

// If the last known ELR is expected to store the last known leader, the lastKnownElr field should be updated
// later in maybeUpdateLastKnownLeader.
if (useLastKnownLeaderInBalancedRecovery) return;

if (!targetLastKnownElr.equals(Replicas.toList(partition.lastKnownElr))) {
record.setLastKnownElr(targetLastKnownElr);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1861,6 +1861,7 @@ private QuorumController(
setReplicaPlacer(replicaPlacer).
setFeatureControlManager(featureControl).
setZkMigrationEnabled(zkMigrationEnabled).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
build();
this.producerIdControlManager = new ProducerIdControlManager.Builder().
setLogContext(logContext).
Expand Down Expand Up @@ -2355,4 +2356,8 @@ void setNewNextWriteOffset(long newNextWriteOffset) {
offsetControl.setNextWriteOffset(newNextWriteOffset);
});
}

void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
}
}
Loading

0 comments on commit 376e9e2

Please sign in to comment.