---
.../internals/HeartbeatRequestManager.java | 7 ++++++-
.../HeartbeatRequestManagerTest.java | 21 ++++++++++++++++++-
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
index a956ef3a93..d31d412c65 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java
@@ -45,6 +45,7 @@
import java.util.TreeSet;
import java.util.stream.Collectors;
+
/**
* Manages the request creation and response handling for the heartbeat. The module creates a
* {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link MembershipManager} and enqueue it to
@@ -208,7 +209,11 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(leaveHeartbeat));
}
- boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight();
+ // Case 1: The member is leaving
+ boolean heartbeatNow = membershipManager.state() == MemberState.LEAVING ||
+ // Case 2: The member state indicates it should send a heartbeat without waiting for the interval, and there is no heartbeat request currently in-flight
+ (membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight());
+
if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
index 8334fb2360..f63dd55754 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java
@@ -277,7 +277,7 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() {
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " +
"previous one is in-flight");
-
+
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " +
@@ -752,6 +752,25 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
assertEquals(1, result.unsentRequests.size(), "Fenced member should resume heartbeat after transitioning to JOINING");
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+ public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) {
+ mockStableMember();
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(1, result.unsentRequests.size());
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight");
+
+ membershipManager.leaveGroup();
+
+ ConsumerGroupHeartbeatRequest heartbeatToLeave = getHeartbeatRequest(heartbeatRequestManager, version);
+ assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, heartbeatToLeave.data().memberEpoch());
+
+ NetworkClientDelegate.PollResult pollAgain = heartbeatRequestManager.poll(time.milliseconds());
+ assertEquals(0, pollAgain.unsentRequests.size());
+ }
+
private void assertHeartbeat(HeartbeatRequestManager hrm, int nextPollMs) {
NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());
From 34f5d5bab26c59fef325a72725c05d7563b38f44 Mon Sep 17 00:00:00 2001
From: Colin Patrick McCabe
Date: Sat, 1 Jun 2024 15:51:39 -0700
Subject: [PATCH 005/128] KAFKA-16757: Fix broker re-registration issues around
MV 3.7-IV2 (#15945)
When upgrading from a MetadataVersion older than 3.7-IV2, we need to resend the broker registration, so that the controller can record the storage directories. The current code for doing this has several problems, however. One is that it tends to trigger even in cases where we don't actually need it. Another is that when re-registering the broker, the broker is marked as fenced.
This PR moves the handling of the re-registration case out of BrokerMetadataPublisher and into BrokerRegistrationTracker. The re-registration code there will only trigger in the case where the broker sees an existing registration for itself with no directories set. This is much more targetted than the original code.
Additionally, in ClusterControlManager, when re-registering the same broker, we now preserve its fencing and shutdown state, rather than clearing those. (There isn't any good reason re-registering the same broker should clear these things... this was purely an oversight.) Note that we can tell the broker is "the same" because it has the same IncarnationId.
Reviewers: Gaurav Narula , Igor Soarez
---
.../kafka/server/BrokerLifecycleManager.scala | 6 +-
.../scala/kafka/server/BrokerServer.scala | 8 +-
.../metadata/BrokerMetadataPublisher.scala | 16 --
.../server/BrokerLifecycleManagerTest.scala | 2 +-
.../BrokerMetadataPublisherTest.scala | 101 +-----------
.../controller/ClusterControlManager.java | 7 +
.../publisher/BrokerRegistrationTracker.java | 136 ++++++++++++++++
.../BrokerRegistrationTrackerTest.java | 151 ++++++++++++++++++
8 files changed, 306 insertions(+), 121 deletions(-)
create mode 100644 metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
create mode 100644 metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index 5f3fdc8188..51bc16fb09 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -264,11 +264,11 @@ class BrokerLifecycleManager(
new OfflineDirBrokerFailureEvent(directory))
}
- def handleKraftJBODMetadataVersionUpdate(): Unit = {
- eventQueue.append(new KraftJBODMetadataVersionUpdateEvent())
+ def resendBrokerRegistrationUnlessZkMode(): Unit = {
+ eventQueue.append(new ResendBrokerRegistrationUnlessZkModeEvent())
}
- private class KraftJBODMetadataVersionUpdateEvent extends EventQueue.Event {
+ private class ResendBrokerRegistrationUnlessZkModeEvent extends EventQueue.Event {
override def run(): Unit = {
if (!isZkBroker) {
registered = false
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 112a03c50a..64a4fd7474 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde}
-import org.apache.kafka.image.publisher.MetadataPublisher
+import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{AssignmentsManager, ClientMetricsManager, NodeToControllerChannelManager}
@@ -139,6 +139,8 @@ class BrokerServer(
var brokerMetadataPublisher: BrokerMetadataPublisher = _
+ var brokerRegistrationTracker: BrokerRegistrationTracker = _
+
val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault(config.unstableFeatureVersionsEnabled)
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
@@ -482,6 +484,10 @@ class BrokerServer(
lifecycleManager
)
metadataPublishers.add(brokerMetadataPublisher)
+ brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId,
+ logManager.directoryIdsSet.toList.asJava,
+ () => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
+ metadataPublishers.add(brokerRegistrationTracker)
// Register parts of the broker that can be reconfigured via dynamic configs. This needs to
// be done before we publish the dynamic configs, so that we don't miss anything.
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 048a665757..ee7bfa2157 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -29,7 +29,6 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
-import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.fault.FaultHandler
import java.util.concurrent.CompletableFuture
@@ -129,21 +128,6 @@ class BrokerMetadataPublisher(
debug(s"Publishing metadata at offset $highestOffsetAndEpoch with $metadataVersionLogMsg.")
}
- Option(delta.featuresDelta()).foreach { featuresDelta =>
- featuresDelta.metadataVersionChange().ifPresent{ metadataVersion =>
- info(s"Updating metadata.version to ${metadataVersion.featureLevel()} at offset $highestOffsetAndEpoch.")
- val currentMetadataVersion = delta.image().features().metadataVersion()
- if (currentMetadataVersion.isLessThan(MetadataVersion.IBP_3_7_IV2) && metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2)) {
- info(
- s"""Resending BrokerRegistration with existing incarnation-id to inform the
- |controller about log directories in the broker following metadata update:
- |previousMetadataVersion: ${delta.image().features().metadataVersion()}
- |newMetadataVersion: $metadataVersion""".stripMargin.linesIterator.mkString(" ").trim)
- brokerLifecycleManager.handleKraftJBODMetadataVersionUpdate()
- }
- }
- }
-
// Apply topic deltas.
Option(delta.topicsDelta()).foreach { topicsDelta =>
try {
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 34f9d139a0..b0162dc635 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -285,7 +285,7 @@ class BrokerLifecycleManagerTest {
assertEquals(1000L, manager.brokerEpoch)
// Trigger JBOD MV update
- manager.handleKraftJBODMetadataVersionUpdate()
+ manager.resendBrokerRegistrationUnlessZkMode()
// Accept new registration, response sets epoch to 1200
nextRegistrationRequest(1200L)
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index c2926c3b67..26f4fb3dae 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
-import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance}
@@ -43,7 +42,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito
-import org.mockito.Mockito.{clearInvocations, doThrow, mock, times, verify, verifyNoInteractions}
+import org.mockito.Mockito.{doThrow, mock, verify}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@@ -221,102 +220,4 @@ class BrokerMetadataPublisherTest {
verify(groupCoordinator).onNewMetadataImage(image, delta)
}
-
- @Test
- def testMetadataVersionUpdateToIBP_3_7_IV2OrAboveTriggersBrokerReRegistration(): Unit = {
- val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, ""))
- val metadataCache = new KRaftMetadataCache(0)
- val logManager = mock(classOf[LogManager])
- val replicaManager = mock(classOf[ReplicaManager])
- val groupCoordinator = mock(classOf[GroupCoordinator])
- val faultHandler = mock(classOf[FaultHandler])
- val brokerLifecycleManager = mock(classOf[BrokerLifecycleManager])
-
- val metadataPublisher = new BrokerMetadataPublisher(
- config,
- metadataCache,
- logManager,
- replicaManager,
- groupCoordinator,
- mock(classOf[TransactionCoordinator]),
- mock(classOf[DynamicConfigPublisher]),
- mock(classOf[DynamicClientQuotaPublisher]),
- mock(classOf[ScramPublisher]),
- mock(classOf[DelegationTokenPublisher]),
- mock(classOf[AclPublisher]),
- faultHandler,
- faultHandler,
- brokerLifecycleManager,
- )
-
- var image = MetadataImage.EMPTY
- var delta = new MetadataDelta.Builder()
- .setImage(image)
- .build()
-
- // We first upgrade metadata version to 3_6_IV2
- delta.replay(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(MetadataVersion.IBP_3_6_IV2.featureLevel()))
- var newImage = delta.apply(new MetadataProvenance(100, 4, 2000))
-
- metadataPublisher.onMetadataUpdate(delta, newImage,
- LogDeltaManifest.newBuilder()
- .provenance(MetadataProvenance.EMPTY)
- .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
- .numBatches(1)
- .elapsedNs(100)
- .numBytes(42)
- .build())
-
- // This should NOT trigger broker reregistration
- verifyNoInteractions(brokerLifecycleManager)
-
- // We then upgrade to IBP_3_7_IV2
- image = newImage
- delta = new MetadataDelta.Builder()
- .setImage(image)
- .build()
- delta.replay(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel()))
- newImage = delta.apply(new MetadataProvenance(100, 4, 2000))
-
- metadataPublisher.onMetadataUpdate(delta, newImage,
- LogDeltaManifest.newBuilder()
- .provenance(MetadataProvenance.EMPTY)
- .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
- .numBatches(1)
- .elapsedNs(100)
- .numBytes(42)
- .build())
-
- // This SHOULD trigger a broker registration
- verify(brokerLifecycleManager, times(1)).handleKraftJBODMetadataVersionUpdate()
- clearInvocations(brokerLifecycleManager)
-
- // Finally upgrade to IBP_3_8_IV0
- image = newImage
- delta = new MetadataDelta.Builder()
- .setImage(image)
- .build()
- delta.replay(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(MetadataVersion.IBP_3_8_IV0.featureLevel()))
- newImage = delta.apply(new MetadataProvenance(200, 4, 3000))
-
- metadataPublisher.onMetadataUpdate(delta, newImage,
- LogDeltaManifest.newBuilder()
- .provenance(MetadataProvenance.EMPTY)
- .leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
- .numBatches(1)
- .elapsedNs(100)
- .numBytes(42)
- .build())
-
- // This should NOT trigger broker reregistration
- verify(brokerLifecycleManager, times(0)).handleKraftJBODMetadataVersionUpdate()
-
- metadataPublisher.close()
- }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 0974c31d1b..8b9c5b19ea 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -408,6 +408,13 @@ public ControllerResult registerBroker(
setBrokerEpoch(brokerEpoch).
setRack(request.rack()).
setEndPoints(listenerInfo.toBrokerRegistrationRecord());
+
+ if (existing != null && request.incarnationId().equals(existing.incarnationId())) {
+ log.info("Amending registration of broker {}", request.brokerId());
+ record.setFenced(existing.fenced());
+ record.setInControlledShutdown(existing.inControlledShutdown());
+ }
+
for (BrokerRegistrationRequestData.Feature feature : request.features()) {
record.features().add(processRegistrationFeature(brokerId, finalizedFeatures, feature));
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
new file mode 100644
index 0000000000..51ac2bdfa4
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/publisher/BrokerRegistrationTracker.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+
+import java.util.List;
+
+/**
+ * Tracks the registration of a specific broker, and executes a callback if it should be refreshed.
+ *
+ * This tracker handles cases where we might want to re-register the broker. The only such case
+ * right now is during the transition from non-JBOD mode, to JBOD mode. In other words, the
+ * transition from a MetadataVersion less than 3.7-IV2, to one greater than or equal to 3.7-IV2.
+ * In this case, the broker registration will start out containing no directories, and we need to
+ * resend the BrokerRegistrationRequest to fix that.
+ *
+ * As much as possible, the goal here is to keep things simple. We just compare the desired state
+ * with the actual state, and try to make changes only if necessary.
+ */
+public class BrokerRegistrationTracker implements MetadataPublisher {
+ private final Logger log;
+ private final int id;
+ private final Runnable refreshRegistrationCallback;
+
+ /**
+ * Create the tracker.
+ *
+ * @param id The ID of this broker.
+ * @param targetDirectories The directories managed by this broker.
+ * @param refreshRegistrationCallback Callback to run if we need to refresh the registration.
+ */
+ public BrokerRegistrationTracker(
+ int id,
+ List targetDirectories,
+ Runnable refreshRegistrationCallback
+ ) {
+ this.log = new LogContext("[BrokerRegistrationTracker id=" + id + "] ").
+ logger(BrokerRegistrationTracker.class);
+ this.id = id;
+ this.refreshRegistrationCallback = refreshRegistrationCallback;
+ }
+
+ @Override
+ public String name() {
+ return "BrokerRegistrationTracker(id=" + id + ")";
+ }
+
+ @Override
+ public void onMetadataUpdate(
+ MetadataDelta delta,
+ MetadataImage newImage,
+ LoaderManifest manifest
+ ) {
+ boolean checkBrokerRegistration = false;
+ if (delta.featuresDelta() != null) {
+ if (delta.metadataVersionChanged().isPresent()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Metadata version change is present: {}",
+ delta.metadataVersionChanged());
+ }
+ checkBrokerRegistration = true;
+ }
+ }
+ if (delta.clusterDelta() != null) {
+ if (delta.clusterDelta().changedBrokers().get(id) != null) {
+ if (log.isTraceEnabled()) {
+ log.trace("Broker change is present: {}",
+ delta.clusterDelta().changedBrokers().get(id));
+ }
+ checkBrokerRegistration = true;
+ }
+ }
+ if (checkBrokerRegistration) {
+ if (brokerRegistrationNeedsRefresh(newImage.features().metadataVersion(),
+ delta.clusterDelta().broker(id))) {
+ refreshRegistrationCallback.run();
+ }
+ }
+ }
+
+ /**
+ * Check if the current broker registration needs to be refreshed.
+ *
+ * @param metadataVersion The current metadata version.
+ * @param registration The current broker registration, or null if there is none.
+ * @return True only if we should refresh.
+ */
+ boolean brokerRegistrationNeedsRefresh(
+ MetadataVersion metadataVersion,
+ BrokerRegistration registration
+ ) {
+ // If there is no existing registration, the BrokerLifecycleManager must still be sending it.
+ // So we don't need to do anything yet.
+ if (registration == null) {
+ log.debug("No current broker registration to check.");
+ return false;
+ }
+ // Check to see if the directory list has changed. Note that this check could certainly be
+ // triggered spuriously. For example, if the broker's directory list has been changed in the
+ // past, and we are in the process of replaying that change log, we will end up here.
+ // That's fine because resending the broker registration does not cause any problems. And,
+ // of course, as soon as a snapshot is made, we will no longer need to worry about those
+ // old metadata log entries being replayed on startup.
+ if (metadataVersion.isAtLeast(MetadataVersion.IBP_3_7_IV2) &&
+ registration.directories().isEmpty()) {
+ log.info("Current directory set is empty, but MV supports JBOD. Resending " +
+ "broker registration.");
+ return true;
+ }
+ log.debug("Broker registration does not need to be resent.");
+ return false;
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java
new file mode 100644
index 0000000000..855a96cd8a
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/BrokerRegistrationTrackerTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(value = 40)
+public class BrokerRegistrationTrackerTest {
+ static final Uuid INCARNATION_ID = Uuid.fromString("jyjLbk31Tpa53pFrU9Y-Ng");
+
+ static final Uuid A = Uuid.fromString("Ahw3vXfnThqeZbb7HD1w6Q");
+
+ static final Uuid B = Uuid.fromString("BjOacT0OTNqIvUWIlKhahg");
+
+ static final Uuid C = Uuid.fromString("CVHi_iv2Rvy5_1rtPdasfg");
+
+ static class BrokerRegistrationTrackerTestContext {
+ AtomicInteger numCalls = new AtomicInteger(0);
+ BrokerRegistrationTracker tracker = new BrokerRegistrationTracker(1,
+ Arrays.asList(B, A), () -> numCalls.incrementAndGet());
+
+ MetadataImage image = MetadataImage.EMPTY;
+
+ void onMetadataUpdate(MetadataDelta delta) {
+ MetadataProvenance provenance = new MetadataProvenance(0, 0, 0);
+ image = delta.apply(provenance);
+ LogDeltaManifest manifest = new LogDeltaManifest.Builder().
+ provenance(provenance).
+ leaderAndEpoch(LeaderAndEpoch.UNKNOWN).
+ numBatches(1).
+ elapsedNs(1).
+ numBytes(1).
+ build();
+ tracker.onMetadataUpdate(delta, image, manifest);
+ }
+
+ MetadataDelta newDelta() {
+ return new MetadataDelta.Builder().
+ setImage(image).
+ build();
+ }
+ }
+
+ @Test
+ public void testTrackerName() {
+ BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext();
+ assertEquals("BrokerRegistrationTracker(id=1)", ctx.tracker.name());
+ }
+
+ @Test
+ public void testMetadataVersionUpdateWithoutRegistrationDoesNothing() {
+ BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext();
+ MetadataDelta delta = ctx.newDelta();
+ delta.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(MetadataVersion.IBP_3_7_IV2.featureLevel()));
+ ctx.onMetadataUpdate(delta);
+ assertEquals(0, ctx.numCalls.get());
+ }
+
+ @Test
+ public void testBrokerUpdateWithoutNewMvDoesNothing() {
+ BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext();
+ MetadataDelta delta = ctx.newDelta();
+ delta.replay(new RegisterBrokerRecord().
+ setBrokerId(1).
+ setIncarnationId(INCARNATION_ID).
+ setLogDirs(Arrays.asList(A, B, C)));
+ ctx.onMetadataUpdate(delta);
+ assertEquals(0, ctx.numCalls.get());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBrokerUpdateWithNewMv(boolean jbodMv) {
+ BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext();
+ MetadataDelta delta = ctx.newDelta();
+ delta.replay(new RegisterBrokerRecord().
+ setBrokerId(1).
+ setIncarnationId(INCARNATION_ID).
+ setLogDirs(Arrays.asList()));
+ delta.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(jbodMv ? MetadataVersion.IBP_3_7_IV2.featureLevel() :
+ MetadataVersion.IBP_3_7_IV1.featureLevel()));
+ ctx.onMetadataUpdate(delta);
+ if (jbodMv) {
+ assertEquals(1, ctx.numCalls.get());
+ } else {
+ assertEquals(0, ctx.numCalls.get());
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBrokerUpdateWithNewMvWithTwoDeltas(boolean jbodMv) {
+ BrokerRegistrationTrackerTestContext ctx = new BrokerRegistrationTrackerTestContext();
+ MetadataDelta delta = ctx.newDelta();
+ delta.replay(new RegisterBrokerRecord().
+ setBrokerId(1).
+ setIncarnationId(INCARNATION_ID).
+ setLogDirs(Arrays.asList()));
+ ctx.onMetadataUpdate(delta);
+ // No calls are made because MetadataVersion is 3.0-IV1 initially
+ assertEquals(0, ctx.numCalls.get());
+
+ delta = ctx.newDelta();
+ delta.replay(new FeatureLevelRecord().
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(jbodMv ? MetadataVersion.IBP_3_7_IV2.featureLevel() :
+ MetadataVersion.IBP_3_7_IV1.featureLevel()));
+ ctx.onMetadataUpdate(delta);
+ if (jbodMv) {
+ assertEquals(1, ctx.numCalls.get());
+ } else {
+ assertEquals(0, ctx.numCalls.get());
+ }
+ }
+}
From dc5a22bf830b57ad0688fd8d544d631523e1fa26 Mon Sep 17 00:00:00 2001
From: Ken Huang <100591800+m1a2st@users.noreply.github.com>
Date: Sun, 2 Jun 2024 18:33:02 +0900
Subject: [PATCH 006/128] KAFKA-16807
DescribeLogDirsResponseData#results#topics have unexpected topics having
empty partitions (#16042)
Reviewers: Chia-Ping Tsai
---
.../scala/kafka/server/ReplicaManager.scala | 25 +++++++-------
.../kafka/server/ReplicaManagerTest.scala | 33 +++++++++++++++++++
2 files changed, 47 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index aa56269a2f..a2a070bcd0 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -33,6 +33,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
+import org.apache.kafka.common.message.DescribeLogDirsResponseData.DescribeLogDirsTopic
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaderAndIsrResponseData.{LeaderAndIsrPartitionError, LeaderAndIsrTopicError}
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderTopic
@@ -67,7 +68,7 @@ import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit}
-import java.util.{Optional, OptionalInt, OptionalLong}
+import java.util.{Collections, Optional, OptionalInt, OptionalLong}
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
@@ -1249,9 +1250,9 @@ class ReplicaManager(val config: KafkaConfig,
val fileStore = Files.getFileStore(file)
val totalBytes = adjustForLargeFileSystems(fileStore.getTotalSpace)
val usableBytes = adjustForLargeFileSystems(fileStore.getUsableSpace)
- logsByDir.get(absolutePath) match {
+ val topicInfos = logsByDir.get(absolutePath) match {
case Some(logs) =>
- val topicInfos = logs.groupBy(_.topicPartition.topic).map{case (topic, logs) =>
+ logs.groupBy(_.topicPartition.topic).map { case (topic, logs) =>
new DescribeLogDirsResponseData.DescribeLogDirsTopic().setName(topic).setPartitions(
logs.filter { log =>
partitions.contains(log.topicPartition)
@@ -1262,17 +1263,19 @@ class ReplicaManager(val config: KafkaConfig,
.setOffsetLag(getLogEndOffsetLag(log.topicPartition, log.logEndOffset, log.isFuture))
.setIsFutureKey(log.isFuture)
}.toList.asJava)
- }.toList.asJava
-
- new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
- .setErrorCode(Errors.NONE.code).setTopics(topicInfos)
- .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
+ }.filterNot(_.partitions().isEmpty).toList.asJava
case None =>
- new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
- .setErrorCode(Errors.NONE.code)
- .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
+ Collections.emptyList[DescribeLogDirsTopic]()
}
+ val describeLogDirsResult = new DescribeLogDirsResponseData.DescribeLogDirsResult()
+ .setLogDir(absolutePath).setTopics(topicInfos)
+ .setErrorCode(Errors.NONE.code)
+ .setTotalBytes(totalBytes).setUsableBytes(usableBytes)
+ if (!topicInfos.isEmpty)
+ describeLogDirsResult.setTopics(topicInfos)
+ describeLogDirsResult
+
} catch {
case e: KafkaStorageException =>
warn("Unable to describe replica dirs for %s".format(absolutePath), e)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 6b655ea783..97ba10d8be 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -6450,6 +6450,39 @@ class ReplicaManagerTest {
assertEquals(Errors.NONE.code, response.errorCode)
assertTrue(response.totalBytes > 0)
assertTrue(response.usableBytes >= 0)
+ assertFalse(response.topics().isEmpty)
+ response.topics().forEach(t => assertFalse(t.partitions().isEmpty))
+ }
+ } finally {
+ replicaManager.shutdown(checkpointHW = false)
+ }
+ }
+
+ @Test
+ def testDescribeLogDirsWithoutAnyPartitionTopic(): Unit = {
+ val noneTopic = "none-topic"
+ val topicPartition = 0
+ val topicId = Uuid.randomUuid()
+ val followerBrokerId = 0
+ val leaderBrokerId = 1
+ val leaderEpoch = 1
+ val leaderEpochIncrement = 2
+ val countDownLatch = new CountDownLatch(1)
+ val offsetFromLeader = 5
+
+ // Prepare the mocked components for the test
+ val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
+ topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
+ expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, topicId = Some(topicId))
+
+ try {
+ val responses = replicaManager.describeLogDirs(Set(new TopicPartition(noneTopic, topicPartition)))
+ assertEquals(mockLogMgr.liveLogDirs.size, responses.size)
+ responses.foreach { response =>
+ assertEquals(Errors.NONE.code, response.errorCode)
+ assertTrue(response.totalBytes > 0)
+ assertTrue(response.usableBytes >= 0)
+ assertTrue(response.topics().isEmpty)
}
} finally {
replicaManager.shutdown(checkpointHW = false)
From 495ec16fb23310ccf863beecb32cbf0687114af2 Mon Sep 17 00:00:00 2001
From: Ken Huang <100591800+m1a2st@users.noreply.github.com>
Date: Tue, 4 Jun 2024 02:34:58 +0900
Subject: [PATCH 007/128] KAFKA-16881: InitialState type leaks into the Connect
REST API OpenAPI spec (#16175)
Reviewers: Chris Egerton
---
.../connect/runtime/rest/entities/CreateConnectorRequest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
index da8e235e42..9d338936db 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
@@ -47,7 +47,7 @@ public Map config() {
return config;
}
- @JsonProperty
+ @JsonProperty("initial_state")
public InitialState initialState() {
return initialState;
}
From 1b11cf0fe3514fb4c3642516012898740f4000d4 Mon Sep 17 00:00:00 2001
From: David Jacot
Date: Mon, 3 Jun 2024 20:32:39 +0200
Subject: [PATCH 008/128] MINOR: Small refactor in TargetAssignmentBuilder
(#16174)
This patch is a small refactoring which mainly aims at avoid to construct a copy of the new target assignment in the TargetAssignmentBuilder because the copy is not used by the caller. The change relies on the exiting tests and it does not really have an impact on performance (e.g. validated with TargetAssignmentBuilderBenchmark).
Reviewers: Chia-Ping Tsai
---
.../group/GroupMetadataManager.java | 27 ++++++----
.../consumer/TargetAssignmentBuilder.java | 30 ++++-------
.../consumer/TargetAssignmentBuilderTest.java | 50 +++++++++----------
3 files changed, 50 insertions(+), 57 deletions(-)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index b912cb6ac3..ef1abb4e83 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -59,6 +59,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
@@ -1904,24 +1905,28 @@ private Assignment updateTargetAssignment(
.withInvertedTargetAssignment(group.invertedTargetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
- TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
- // A new static member is replacing an older one with the same subscriptions.
- // We just need to remove the older member and add the newer one. The new member should
- // reuse the target assignment of the older member.
+
if (staticMemberReplaced) {
- assignmentResult = assignmentResultBuilder
- .removeMember(member.memberId())
- .build();
- } else {
- assignmentResult = assignmentResultBuilder
- .build();
+ // A new static member is replacing an older one with the same subscriptions.
+ // We just need to remove the older member and add the newer one. The new member should
+ // reuse the target assignment of the older member.
+ assignmentResultBuilder.removeMember(member.memberId());
}
+ TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+ assignmentResultBuilder.build();
+
log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor: {}.",
group.groupId(), groupEpoch, preferredServerAssignor, assignmentResult.targetAssignment());
records.addAll(assignmentResult.records());
- return assignmentResult.targetAssignment().get(updatedMember.memberId());
+
+ MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
+ if (newMemberAssignment != null) {
+ return new Assignment(newMemberAssignment.targetPartitions());
+ } else {
+ return Assignment.EMPTY;
+ }
} catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target assignment for epoch %d: %s",
groupEpoch, ex.getMessage());
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
index 57d6039fa0..daea9938bf 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java
@@ -64,11 +64,11 @@ public static class TargetAssignmentResult {
/**
* The new target assignment for the group.
*/
- private final Map targetAssignment;
+ private final Map targetAssignment;
TargetAssignmentResult(
List records,
- Map targetAssignment
+ Map targetAssignment
) {
Objects.requireNonNull(records);
Objects.requireNonNull(targetAssignment);
@@ -86,7 +86,7 @@ public List records() {
/**
* @return The target assignment.
*/
- public Map targetAssignment() {
+ public Map targetAssignment() {
return targetAssignment;
}
}
@@ -347,38 +347,26 @@ public TargetAssignmentResult build() throws PartitionAssignorException {
// Compute delta from previous to new target assignment and create the
// relevant records.
List records = new ArrayList<>();
- Map newTargetAssignment = new HashMap<>();
- memberSpecs.keySet().forEach(memberId -> {
+ for (String memberId : memberSpecs.keySet()) {
Assignment oldMemberAssignment = targetAssignment.get(memberId);
Assignment newMemberAssignment = newMemberAssignment(newGroupAssignment, memberId);
- newTargetAssignment.put(memberId, newMemberAssignment);
-
- if (oldMemberAssignment == null) {
- // If the member had no assignment, we always create a record for it.
+ if (!newMemberAssignment.equals(oldMemberAssignment)) {
+ // If the member had no assignment or had a different assignment, we
+ // create a record for the new assignment.
records.add(newTargetAssignmentRecord(
groupId,
memberId,
newMemberAssignment.partitions()
));
- } else {
- // If the member had an assignment, we only create a record if the
- // new assignment is different.
- if (!newMemberAssignment.equals(oldMemberAssignment)) {
- records.add(newTargetAssignmentRecord(
- groupId,
- memberId,
- newMemberAssignment.partitions()
- ));
- }
}
- });
+ }
// Bump the target assignment epoch.
records.add(newTargetAssignmentEpochRecord(groupId, groupEpoch));
- return new TargetAssignmentResult(records, newTargetAssignment);
+ return new TargetAssignmentResult(records, newGroupAssignment.members());
}
private Assignment newMemberAssignment(
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
index d5ba038f31..e2e572b6bf 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java
@@ -337,12 +337,12 @@ public void testAssignmentHasNotChanged() {
20
)), result.records());
- Map expectedAssignment = new HashMap<>();
- expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ Map expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3)
)));
- expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6)
)));
@@ -400,12 +400,12 @@ public void testAssignmentSwapped() {
20
), result.records().get(2));
- Map expectedAssignment = new HashMap<>();
- expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ Map expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3)
)));
- expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6)
)));
@@ -474,16 +474,16 @@ public void testNewMember() {
20
), result.records().get(3));
- Map expectedAssignment = new HashMap<>();
- expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ Map expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
- expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4)
)));
- expectedAssignment.put("member-3", new Assignment(mkAssignment(
+ expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
)));
@@ -561,16 +561,16 @@ public void testUpdateMember() {
20
), result.records().get(3));
- Map expectedAssignment = new HashMap<>();
- expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ Map expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
- expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4)
)));
- expectedAssignment.put("member-3", new Assignment(mkAssignment(
+ expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
)));
@@ -639,16 +639,16 @@ public void testPartialAssignmentUpdate() {
20
), result.records().get(2));
- Map expectedAssignment = new HashMap<>();
- expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ Map expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
- expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4, 5),
mkTopicAssignment(barTopicId, 3, 4, 5)
)));
- expectedAssignment.put("member-3", new Assignment(mkAssignment(
+ expectedAssignment.put("member-3", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 6),
mkTopicAssignment(barTopicId, 6)
)));
@@ -713,12 +713,12 @@ public void testDeleteMember() {
20
), result.records().get(2));
- Map expectedAssignment = new HashMap<>();
- expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ Map expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2, 3),
mkTopicAssignment(barTopicId, 1, 2, 3)
)));
- expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 4, 5, 6),
mkTopicAssignment(barTopicId, 4, 5, 6)
)));
@@ -788,17 +788,17 @@ public void testReplaceStaticMember() {
20
), result.records().get(1));
- Map expectedAssignment = new HashMap<>();
- expectedAssignment.put("member-1", new Assignment(mkAssignment(
+ Map expectedAssignment = new HashMap<>();
+ expectedAssignment.put("member-1", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 1, 2),
mkTopicAssignment(barTopicId, 1, 2)
)));
- expectedAssignment.put("member-2", new Assignment(mkAssignment(
+ expectedAssignment.put("member-2", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 3, 4),
mkTopicAssignment(barTopicId, 3, 4)
)));
- expectedAssignment.put("member-3-a", new Assignment(mkAssignment(
+ expectedAssignment.put("member-3-a", new MemberAssignment(mkAssignment(
mkTopicAssignment(fooTopicId, 5, 6),
mkTopicAssignment(barTopicId, 5, 6)
)));
From 9c72048a883c0bc21911998741fff29ccd2a4260 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?TingI=C4=81u=20=22Ting=22=20K=C3=AC?=
<51072200+frankvicky@users.noreply.github.com>
Date: Tue, 4 Jun 2024 02:36:07 +0800
Subject: [PATCH 009/128] KAFKA-16861: Don't convert to group to classic if the
size is larger than group max size. (#16163)
Fix the bug where the group downgrade to a classic one when a member leaves, even though the consumer group size is still larger than `classicGroupMaxSize`.
Reviewers: Chia-Ping Tsai , David Jacot
---
.../group/GroupMetadataManager.java | 1 +
.../group/GroupMetadataManagerTest.java | 43 +++++++++++++++++++
2 files changed, 44 insertions(+)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ef1abb4e83..a1295397a8 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -809,6 +809,7 @@ private boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memb
} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.",
consumerGroup.groupId());
+ return false;
}
return true;
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3664a7a61d..abf48fd641 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -13166,6 +13166,49 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() {
assertEquals(Collections.emptyList(), leaveResult.records());
}
+ @Test
+ public void testNoConversionWhenSizeExceedsClassicMaxGroupSize() throws Exception {
+ String groupId = "group-id";
+ String nonClassicMemberId = "1";
+
+ List protocols = Collections.singletonList(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName("range")
+ .setMetadata(new byte[0])
+ );
+
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(nonClassicMemberId).build();
+ ConsumerGroupMember classicMember1 = new ConsumerGroupMember.Builder("2")
+ .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata().setSupportedProtocols(protocols))
+ .build();
+ ConsumerGroupMember classicMember2 = new ConsumerGroupMember.Builder("3")
+ .setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata().setSupportedProtocols(protocols))
+ .build();
+
+ GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+ .withClassicGroupMaxSize(1)
+ .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE)
+ .withConsumerGroup(
+ new ConsumerGroupBuilder(groupId, 10)
+ .withMember(member)
+ .withMember(classicMember1)
+ .withMember(classicMember2)
+ )
+ .build();
+
+ assertEquals(Group.GroupType.CONSUMER, context.groupMetadataManager.group(groupId).type());
+
+ context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(nonClassicMemberId)
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setRebalanceTimeoutMs(5000)
+ );
+
+ assertEquals(Group.GroupType.CONSUMER, context.groupMetadataManager.group(groupId).type());
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
From cd52f33746de68c497843e4cd94416d5ec7270ee Mon Sep 17 00:00:00 2001
From: Anatoly Popov
Date: Mon, 3 Jun 2024 23:46:40 +0300
Subject: [PATCH 010/128] KAFKA-16105: Reset read offsets when seeking to
beginning in TBRLMM (#15165)
Reviewers: Greg Harris , Luke Chen , Kamal Chandraprakash
---
.../remote/metadata/storage/ConsumerTask.java | 15 +++--
.../metadata/storage/ConsumerTaskTest.java | 62 ++++++++++++++++++-
2 files changed, 71 insertions(+), 6 deletions(-)
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index f36909b66b..a328b256ee 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -238,10 +238,15 @@ void maybeWaitForPartitionAssignments() throws InterruptedException {
this.assignedMetadataPartitions = Collections.unmodifiableSet(metadataPartitionSnapshot);
// for newly assigned user-partitions, read from the beginning of the corresponding metadata partition
final Set seekToBeginOffsetPartitions = assignedUserTopicIdPartitionsSnapshot
- .stream()
- .filter(utp -> !utp.isAssigned)
- .map(utp -> toRemoteLogPartition(utp.metadataPartition))
- .collect(Collectors.toSet());
+ .stream()
+ .filter(utp -> !utp.isAssigned)
+ .map(utp -> utp.metadataPartition)
+ // When reset to beginning is happening, we also need to reset the last read offset
+ // Otherwise if the next reassignment request for the same metadata partition comes in
+ // before the record of already assigned topic has been read, then the reset will happen again to the last read offset
+ .peek(readOffsetsByMetadataPartition::remove)
+ .map(ConsumerTask::toRemoteLogPartition)
+ .collect(Collectors.toSet());
consumer.seekToBeginning(seekToBeginOffsetPartitions);
// for other metadata partitions, read from the offset where the processing left last time.
remoteLogPartitions.stream()
@@ -463,4 +468,4 @@ public String toString() {
'}';
}
}
-}
\ No newline at end of file
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
index 424c86b6df..cef1d335b9 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicIdPartition;
@@ -64,8 +65,12 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
public class ConsumerTaskTest {
@@ -85,7 +90,7 @@ public class ConsumerTaskTest {
public void beforeEach() {
final Map offsets = remoteLogPartitions.stream()
.collect(Collectors.toMap(Function.identity(), e -> 0L));
- consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ consumer = spy(new MockConsumer<>(OffsetResetStrategy.EARLIEST));
consumer.updateBeginningOffsets(offsets);
consumerTask = new ConsumerTask(handler, partitioner, consumer, 10L, 300_000L, new SystemTime());
thread = new Thread(consumerTask);
@@ -254,6 +259,61 @@ public void testCanProcessRecord() throws InterruptedException {
assertEquals(3, handler.metadataCounter);
}
+ @Test
+ public void testCanReprocessSkippedRecords() throws InterruptedException {
+ final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+ final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0));
+ final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1));
+ final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new TopicPartition("sample", 3));
+ assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1));
+ assertNotEquals(partitioner.metadataPartition(tpId3), partitioner.metadataPartition(tpId0));
+
+ final int metadataPartition = partitioner.metadataPartition(tpId0);
+ final int anotherMetadataPartition = partitioner.metadataPartition(tpId3);
+
+ // Mocking the consumer to be able to wait for the second reassignment
+ doAnswer(invocation -> {
+ if (consumerTask.isUserPartitionAssigned(tpId1) && !consumerTask.isUserPartitionAssigned(tpId3)) {
+ return ConsumerRecords.empty();
+ } else {
+ return invocation.callRealMethod();
+ }
+ }).when(consumer).poll(any());
+
+ consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(metadataPartition), 0L));
+ consumer.updateEndOffsets(Collections.singletonMap(toRemoteLogPartition(anotherMetadataPartition), 0L));
+ final Set assignments = Collections.singleton(tpId0);
+ consumerTask.addAssignmentsForPartitions(assignments);
+ thread.start();
+ TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId0), "Timed out waiting for " + tpId0 + " to be assigned");
+
+ // Adding metadata records in the order opposite to the order of assignments
+ addRecord(consumer, metadataPartition, tpId1, 0);
+ addRecord(consumer, metadataPartition, tpId0, 1);
+ TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record");
+ // Only one record is processed, tpId1 record is skipped as unassigned
+ // but read offset is 1 e.g., record for tpId1 has been read by consumer
+ assertEquals(1, handler.metadataCounter);
+
+ // Adding assignment for tpId1 after related metadata records have already been read
+ consumerTask.addAssignmentsForPartitions(Collections.singleton(tpId1));
+ TestUtils.waitForCondition(() -> consumerTask.isUserPartitionAssigned(tpId1), "Timed out waiting for " + tpId1 + " to be assigned");
+
+ // Adding assignment for tpId0 to trigger the reset to last read offset
+ // and assignment for tpId3 that has different metadata partition to trigger the update of metadata snapshot
+ HashSet partitions = new HashSet<>();
+ partitions.add(tpId0);
+ partitions.add(tpId3);
+ consumerTask.addAssignmentsForPartitions(partitions);
+ // explicitly re-adding the records since MockConsumer drops them on poll.
+ addRecord(consumer, metadataPartition, tpId1, 0);
+ addRecord(consumer, metadataPartition, tpId0, 1);
+ // Waiting for all metadata records to be re-read from the first metadata partition number
+ TestUtils.waitForCondition(() -> consumerTask.readOffsetForMetadataPartition(metadataPartition).equals(Optional.of(1L)), "Couldn't read record");
+ // Verifying that all the metadata records from the first metadata partition were processed properly.
+ TestUtils.waitForCondition(() -> handler.metadataCounter == 2, "Couldn't read record");
+ }
+
@Test
public void testMaybeMarkUserPartitionsAsReady() throws InterruptedException {
final TopicIdPartition tpId = getIdPartitions("hello", 1).get(0);
From 961c28ae711d91682d708189056e79a249dd2fdb Mon Sep 17 00:00:00 2001
From: David Jacot
Date: Tue, 4 Jun 2024 05:48:04 +0200
Subject: [PATCH 011/128] MINOR: Fix type in MetadataVersion.IBP_4_0_IV0
(#16181)
This patch fixes a typo in MetadataVersion.IBP_4_0_IV0. It should be 0 not O.
Reviewers: Justine Olshan , Jun Rao , Chia-Ping Tsai
---
.../test/java/kafka/test/ClusterTestExtensionsTest.java | 2 +-
core/src/test/java/kafka/test/annotation/ClusterTest.java | 2 +-
.../integration/kafka/zk/ZkMigrationIntegrationTest.scala | 2 +-
.../scala/unit/kafka/server/ApiVersionsRequestTest.scala | 4 ++--
.../scala/unit/kafka/server/ReplicationQuotasTest.scala | 2 +-
.../org/apache/kafka/controller/QuorumControllerTest.java | 8 ++++----
.../java/org/apache/kafka/server/common/GroupVersion.java | 2 +-
.../org/apache/kafka/server/common/MetadataVersion.java | 2 +-
.../apache/kafka/server/common/MetadataVersionTest.java | 6 +++---
9 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index c8b53f8b8a..aba96eccdd 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -185,7 +185,7 @@ public void testNoAutoStart() {
@ClusterTest
public void testDefaults(ClusterInstance clusterInstance) {
- Assertions.assertEquals(MetadataVersion.IBP_4_0_IVO, clusterInstance.config().metadataVersion());
+ Assertions.assertEquals(MetadataVersion.IBP_4_0_IV0, clusterInstance.config().metadataVersion());
}
@ClusterTests({
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index bd95249b4f..5557abeb33 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -40,7 +40,7 @@
AutoStart autoStart() default AutoStart.DEFAULT;
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
- MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IVO;
+ MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IV0;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test
String[] tags() default {};
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index c0b6d916ec..e98a8fdecc 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -73,7 +73,7 @@ object ZkMigrationIntegrationTest {
MetadataVersion.IBP_3_7_IV2,
MetadataVersion.IBP_3_7_IV4,
MetadataVersion.IBP_3_8_IV0,
- MetadataVersion.IBP_4_0_IVO
+ MetadataVersion.IBP_4_0_IV0
).map { mv =>
val serverProperties = new util.HashMap[String, String]()
serverProperties.put("inter.broker.listener.name", "EXTERNAL")
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index a7415b5d50..d17872099b 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -47,7 +47,7 @@ object ApiVersionsRequestTest {
List(ClusterConfig.defaultBuilder()
.setTypes(java.util.Collections.singleton(Type.ZK))
.setServerProperties(serverProperties)
- .setMetadataVersion(MetadataVersion.IBP_4_0_IVO)
+ .setMetadataVersion(MetadataVersion.IBP_4_0_IV0)
.build()).asJava
}
@@ -83,7 +83,7 @@ object ApiVersionsRequestTest {
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@ClusterTemplate("testApiVersionsRequestTemplate")
- @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_4_0_IVO, serverProperties = Array(
+ @ClusterTest(types = Array(Type.KRAFT, Type.CO_KRAFT), metadataVersion = MetadataVersion.IBP_4_0_IV0, serverProperties = Array(
new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"),
new ClusterConfigProperty(key = "unstable.feature.versions.enable", value = "true")
))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 03623bab41..1f947dd8fa 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -299,7 +299,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(MetadataVersion.IBP_3_0_IV1.featureLevel())
- .setMaxSupportedVersion(MetadataVersion.IBP_4_0_IVO.featureLevel()))
+ .setMaxSupportedVersion(MetadataVersion.IBP_4_0_IV0.featureLevel()))
controllerServer.controller.registerBroker(
ControllerRequestContextUtil.ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData()
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 4498778592..e494ca3e9a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -196,7 +196,7 @@ public void testConfigurationOperations() throws Throwable {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("iiaQjkRPQcuMULNII0MUeA"))).
setClusterId(logEnv.clusterId())).get();
@@ -240,7 +240,7 @@ public void testDelayedConfigurationOperations() throws Throwable {
) {
controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
new BrokerRegistrationRequestData().
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setBrokerId(0).
setLogDirs(Collections.singletonList(Uuid.fromString("sTbzRAMnTpahIyIPNjiLhw"))).
setClusterId(logEnv.clusterId())).get();
@@ -298,7 +298,7 @@ public void testFenceMultipleBrokers() throws Throwable {
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
setClusterId(active.clusterId()).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
@@ -717,7 +717,7 @@ public void testUnregisterBroker() throws Throwable {
setBrokerId(0).
setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
- setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IVO)).
+ setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_4_0_IV0)).
setLogDirs(Collections.singletonList(Uuid.fromString("vBpaRsZVSaGsQT53wtYGtg"))).
setListeners(listeners));
assertEquals(5L, reply.get().epoch());
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java
index 002d7ef33f..b59df64666 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/GroupVersion.java
@@ -22,7 +22,7 @@
public enum GroupVersion implements FeatureVersion {
// Version 1 enables the consumer rebalance protocol (KIP-848).
- GV_1(1, MetadataVersion.IBP_4_0_IVO, Collections.emptyMap());
+ GV_1(1, MetadataVersion.IBP_4_0_IV0, Collections.emptyMap());
public static final String FEATURE_NAME = "group.version";
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 26b67321e4..f867edb54e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -205,7 +205,7 @@ public enum MetadataVersion {
IBP_3_8_IV0(20, "3.8", "IV0", true),
// Introduce version 1 of the GroupVersion feature (KIP-848).
- IBP_4_0_IVO(21, "4.0", "IV0", false);
+ IBP_4_0_IV0(21, "4.0", "IV0", false);
// NOTES when adding a new version:
// Update the default version in @ClusterTest annotation to point to the latest version
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index cfcdcf3afe..d8397ce615 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -186,7 +186,7 @@ public void testFromVersionString() {
assertEquals(IBP_3_8_IV0, MetadataVersion.fromVersionString("3.8-IV0"));
- assertEquals(IBP_4_0_IVO, MetadataVersion.fromVersionString("4.0-IV0"));
+ assertEquals(IBP_4_0_IV0, MetadataVersion.fromVersionString("4.0-IV0"));
}
@Test
@@ -247,7 +247,7 @@ public void testShortVersion() {
assertEquals("3.7", IBP_3_7_IV3.shortVersion());
assertEquals("3.7", IBP_3_7_IV4.shortVersion());
assertEquals("3.8", IBP_3_8_IV0.shortVersion());
- assertEquals("4.0", IBP_4_0_IVO.shortVersion());
+ assertEquals("4.0", IBP_4_0_IV0.shortVersion());
}
@Test
@@ -297,7 +297,7 @@ public void testVersion() {
assertEquals("3.7-IV3", IBP_3_7_IV3.version());
assertEquals("3.7-IV4", IBP_3_7_IV4.version());
assertEquals("3.8-IV0", IBP_3_8_IV0.version());
- assertEquals("4.0-IV0", IBP_4_0_IVO.version());
+ assertEquals("4.0-IV0", IBP_4_0_IV0.version());
}
@Test
From 0aa0a01d9cb44e89e4e86cece11022e866cad2b3 Mon Sep 17 00:00:00 2001
From: Jeff Kim
Date: Tue, 4 Jun 2024 02:27:35 -0400
Subject: [PATCH 012/128] KAFKA-16664; Re-add EventAccumulator.poll(long,
TimeUnit) (#16144)
We have revamped the thread idle ratio metric in https://github.com/apache/kafka/pull/15835. https://github.com/apache/kafka/pull/15835#discussion_r1588068337 describes a case where the metric loses accuracy and in order to set a lower bound to the accuracy, this patch re-adds a poll with a timeout that was removed as part of https://github.com/apache/kafka/pull/15430.
Reviewers: David Jacot
---
.../group/runtime/EventAccumulator.java | 35 +++++++------------
.../runtime/MultiThreadedEventProcessor.java | 8 ++++-
.../group/runtime/EventAccumulatorTest.java | 18 ++++++----
.../MultiThreadedEventProcessorTest.java | 4 +--
4 files changed, 33 insertions(+), 32 deletions(-)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
index 2c22232c47..cc1ab69cd7 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
@@ -27,6 +27,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -162,43 +163,31 @@ public void addFirst(T event) throws RejectedExecutionException {
}
/**
- * Returns the next {{@link Event}} available or null if no event is
- * available.
+ * Immediately returns the next {{@link Event}} available or null
+ * if the accumulator is empty.
*
* @return The next event available or null.
*/
public T poll() {
- lock.lock();
- try {
- K key = randomKey();
- if (key == null) return null;
-
- Deque queue = queues.get(key);
- T event = queue.poll();
-
- if (queue.isEmpty()) queues.remove(key);
- inflightKeys.add(key);
- size--;
-
- return event;
- } finally {
- lock.unlock();
- }
+ return poll(0, TimeUnit.MILLISECONDS);
}
/**
- * Returns the next {{@link Event}} available. This method blocks until an
- * event is available or accumulator is closed.
+ * Returns the next {{@link Event}} available. This method blocks for the provided
+ * time and returns null of no event is available.
*
+ * @param timeout The timeout.
+ * @param unit The timeout unit.
* @return The next event available or null.
*/
- public T take() {
+ public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
K key = randomKey();
- while (key == null && !closed) {
+ long nanos = unit.toNanos(timeout);
+ while (key == null && !closed && nanos > 0) {
try {
- condition.await();
+ nanos = condition.awaitNanos(nanos);
} catch (InterruptedException e) {
// Ignore.
}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
index 31fa52ea7d..6265334872 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
@@ -25,6 +25,7 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -34,6 +35,11 @@
*/
public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
+ /**
+ * The poll timeout to wait for an event by the EventProcessorThread.
+ */
+ private static final long POLL_TIMEOUT_MS = 300L;
+
/**
* The logger.
*/
@@ -129,7 +135,7 @@ private void handleEvents() {
// time should be discounted by # threads.
long idleStartTimeMs = time.milliseconds();
- CoordinatorEvent event = accumulator.take();
+ CoordinatorEvent event = accumulator.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
long idleEndTimeMs = time.milliseconds();
long idleTimeMs = idleEndTimeMs - idleStartTimeMs;
metrics.recordThreadIdleTime(idleTimeMs / threads.size());
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
index faac0f46f6..602614414a 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
@@ -190,9 +190,12 @@ public void testDoneUnblockWaitingThreads() throws ExecutionException, Interrupt
MockEvent event1 = new MockEvent(1, 1);
MockEvent event2 = new MockEvent(1, 2);
- CompletableFuture future0 = CompletableFuture.supplyAsync(accumulator::take);
- CompletableFuture future1 = CompletableFuture.supplyAsync(accumulator::take);
- CompletableFuture future2 = CompletableFuture.supplyAsync(accumulator::take);
+ CompletableFuture future0 = CompletableFuture.supplyAsync(() ->
+ accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+ CompletableFuture future1 = CompletableFuture.supplyAsync(() ->
+ accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+ CompletableFuture future2 = CompletableFuture.supplyAsync(() ->
+ accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
List> futures = Arrays.asList(future0, future1, future2);
assertFalse(future0.isDone());
@@ -245,9 +248,12 @@ public void testDoneUnblockWaitingThreads() throws ExecutionException, Interrupt
public void testCloseUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException {
EventAccumulator accumulator = new EventAccumulator<>();
- CompletableFuture future0 = CompletableFuture.supplyAsync(accumulator::take);
- CompletableFuture future1 = CompletableFuture.supplyAsync(accumulator::take);
- CompletableFuture future2 = CompletableFuture.supplyAsync(accumulator::take);
+ CompletableFuture future0 = CompletableFuture.supplyAsync(() ->
+ accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+ CompletableFuture future1 = CompletableFuture.supplyAsync(() ->
+ accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+ CompletableFuture future2 = CompletableFuture.supplyAsync(() ->
+ accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
assertFalse(future0.isDone());
assertFalse(future1.isDone());
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
index 0f2801daec..0b8f04fe34 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
@@ -60,8 +60,8 @@ public DelayEventAccumulator(Time time, long takeDelayMs) {
}
@Override
- public CoordinatorEvent take() {
- CoordinatorEvent event = super.take();
+ public CoordinatorEvent poll(long timeout, TimeUnit unit) {
+ CoordinatorEvent event = super.poll(timeout, unit);
time.sleep(takeDelayMs);
return event;
}
From c295feff3c9e6a7764fee6ff52d4130f8fef10b9 Mon Sep 17 00:00:00 2001
From: Edoardo Comar
Date: Tue, 4 Jun 2024 11:45:11 +0100
Subject: [PATCH 013/128] KAFKA-16047: Use REQUEST_TIMEOUT_MS_CONFIG in
AdminClient.fenceProducers (#16151)
Use REQUEST_TIMEOUT_MS_CONFIG in AdminClient.fenceProducers,
or options.timeoutMs if specified, as transaction timeout.
No transaction will be started with this timeout, but
ReplicaManager.appendRecords uses this value as its timeout.
Use REQUEST_TIMEOUT_MS_CONFIG like a regular producer append
to allow for replication to take place.
Co-Authored-By: Adrian Preston
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../internals/FenceProducersHandler.java | 12 ++++++----
.../internals/FenceProducersHandlerTest.java | 23 ++++++++++++++-----
3 files changed, 26 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 71d39900cd..92ba6ad3d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4569,7 +4569,7 @@ public ListTransactionsResult listTransactions(ListTransactionsOptions options)
public FenceProducersResult fenceProducers(Collection transactionalIds, FenceProducersOptions options) {
AdminApiFuture.SimpleAdminApiFuture future =
FenceProducersHandler.newFuture(transactionalIds);
- FenceProducersHandler handler = new FenceProducersHandler(logContext);
+ FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
invokeDriver(handler, future, options.timeoutMs);
return new FenceProducersResult(future.all());
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
index 23572dd441..9a12bc1959 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;
+import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
@@ -38,12 +39,16 @@
public class FenceProducersHandler extends AdminApiHandler.Unbatched {
private final Logger log;
private final AdminApiLookupStrategy lookupStrategy;
+ private final int txnTimeoutMs;
public FenceProducersHandler(
- LogContext logContext
+ FenceProducersOptions options,
+ LogContext logContext,
+ int requestTimeoutMs
) {
this.log = logContext.logger(FenceProducersHandler.class);
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.TRANSACTION, logContext);
+ this.txnTimeoutMs = options.timeoutMs() != null ? options.timeoutMs() : requestTimeoutMs;
}
public static AdminApiFuture.SimpleAdminApiFuture newFuture(
@@ -82,9 +87,8 @@ InitProducerIdRequest.Builder buildSingleRequest(int brokerId, CoordinatorKey ke
.setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
.setProducerId(ProducerIdAndEpoch.NONE.producerId)
.setTransactionalId(key.idValue)
- // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
- // and shouldn't be used for any actual record writes
- .setTransactionTimeoutMs(1);
+ // This timeout is used by the coordinator to append the record with the new producer epoch to the transaction log.
+ .setTransactionTimeoutMs(txnTimeoutMs);
return new InitProducerIdRequest.Builder(data);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
index 34ed2e6772..9665bd0bdf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/internals/FenceProducersHandlerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.admin.internals;
+import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.admin.internals.AdminApiHandler.ApiResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.message.InitProducerIdResponseData;
@@ -39,11 +40,21 @@
public class FenceProducersHandlerTest {
private final LogContext logContext = new LogContext();
private final Node node = new Node(1, "host", 1234);
+ private final int requestTimeoutMs = 30000;
+ private final FenceProducersOptions options = new FenceProducersOptions();
@Test
public void testBuildRequest() {
- FenceProducersHandler handler = new FenceProducersHandler(logContext);
- mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId));
+ FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
+ mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, requestTimeoutMs));
+ }
+
+ @Test
+ public void testBuildRequestOptionsTimeout() {
+ final int optionsTimeoutMs = 50000;
+ options.timeoutMs(optionsTimeoutMs);
+ FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
+ mkSet("foo", "bar", "baz").forEach(transactionalId -> assertLookup(handler, transactionalId, optionsTimeoutMs));
}
@Test
@@ -51,7 +62,7 @@ public void testHandleSuccessfulResponse() {
String transactionalId = "foo";
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
- FenceProducersHandler handler = new FenceProducersHandler(logContext);
+ FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
short epoch = 57;
long producerId = 7;
@@ -73,7 +84,7 @@ public void testHandleSuccessfulResponse() {
@Test
public void testHandleErrorResponse() {
String transactionalId = "foo";
- FenceProducersHandler handler = new FenceProducersHandler(logContext);
+ FenceProducersHandler handler = new FenceProducersHandler(options, logContext, requestTimeoutMs);
assertFatalError(handler, transactionalId, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.CLUSTER_AUTHORIZATION_FAILED);
assertFatalError(handler, transactionalId, Errors.UNKNOWN_SERVER_ERROR);
@@ -136,10 +147,10 @@ private ApiResult handleResponseError(
return result;
}
- private void assertLookup(FenceProducersHandler handler, String transactionalId) {
+ private void assertLookup(FenceProducersHandler handler, String transactionalId, int txnTimeoutMs) {
CoordinatorKey key = CoordinatorKey.byTransactionalId(transactionalId);
InitProducerIdRequest.Builder request = handler.buildSingleRequest(1, key);
assertEquals(transactionalId, request.data.transactionalId());
- assertEquals(1, request.data.transactionTimeoutMs());
+ assertEquals(txnTimeoutMs, request.data.transactionTimeoutMs());
}
}
From 7404fdffa671ed55188444fc96319b4dc301da74 Mon Sep 17 00:00:00 2001
From: Chris Egerton
Date: Tue, 4 Jun 2024 15:36:24 +0200
Subject: [PATCH 014/128] KAFKA-16837, KAFKA-16838: Ignore task configs for
deleted connectors, and compare raw task configs before publishing them
(#16122)
Reviewers: Mickael Maison
---
.../kafka/connect/runtime/AbstractHerder.java | 10 +-
.../distributed/DistributedHerder.java | 4 +-
.../runtime/standalone/StandaloneHerder.java | 4 +-
.../storage/KafkaConfigBackingStore.java | 29 ++-
.../ConnectWorkerIntegrationTest.java | 199 ++++++++++++++++++
.../connect/runtime/AbstractHerderTest.java | 27 +++
.../KafkaConfigBackingStoreMockitoTest.java | 53 ++++-
.../util/clusters/EmbeddedKafkaCluster.java | 16 +-
8 files changed, 325 insertions(+), 17 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 2a27103079..c6aeea80a2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -124,7 +124,7 @@
*/
public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
- private final Logger log = LoggerFactory.getLogger(AbstractHerder.class);
+ private static final Logger log = LoggerFactory.getLogger(AbstractHerder.class);
private final String workerId;
protected final Worker worker;
@@ -1039,16 +1039,16 @@ public static List