From 1568ed9fa6d17993d093b848d8f0b14678357d95 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 25 Apr 2024 17:31:57 +0800 Subject: [PATCH 1/3] feat(issues1169): optimize partition reassignment logic to avoid NotLeaderOrFollowerException Signed-off-by: Robin Han --- .../DefaultOpenStreamChecker.java | 37 +++++++++++++ .../kafka/log/streamaspect/ElasticLog.scala | 21 +++++++- .../log/streamaspect/ElasticLogManager.scala | 21 ++++++-- .../log/streamaspect/ElasticUnifiedLog.scala | 3 +- .../log/streamaspect/OpenStreamChecker.java | 27 ++++++++++ .../kafka/server/AlterPartitionManager.scala | 1 + .../scala/kafka/server/BrokerFeatures.scala | 22 ++++---- .../scala/kafka/server/MetadataCache.scala | 3 ++ .../server/metadata/KRaftMetadataCache.scala | 8 +++ .../server/metadata/ZkMetadataCache.scala | 8 +++ .../streamaspect/ElasticReplicaManager.scala | 19 ++++--- .../main/scala/kafka/tools/StorageTool.scala | 9 ++++ .../streamaspect/ElasticLogCleanerTest.scala | 1 + .../log/streamaspect/ElasticLogTest.scala | 3 +- .../streamaspect/ElasticUnifiedLogTest.scala | 1 + .../controller/FeatureControlManager.java | 28 ++++++++++ .../kafka/controller/QuorumFeatures.java | 2 + .../controller/ReplicationControlManager.java | 10 ++++ .../org/apache/kafka/image/FeaturesImage.java | 16 +++++- .../kafka/image/S3StreamsMetadataImage.java | 4 ++ .../server/common/automq/AutoMQVersion.java | 54 +++++++++++++++++++ 21 files changed, 273 insertions(+), 25 deletions(-) create mode 100644 core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java create mode 100644 core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java create mode 100644 server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java diff --git a/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java b/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java new file mode 100644 index 0000000000..699fda4974 --- /dev/null +++ b/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.log.streamaspect; + +import com.automq.stream.s3.metadata.StreamState; +import kafka.server.metadata.KRaftMetadataCache; +import org.apache.kafka.common.errors.s3.StreamFencedException; +import org.apache.kafka.image.S3StreamMetadataImage; + +public class DefaultOpenStreamChecker implements OpenStreamChecker { + private final KRaftMetadataCache metadataCache; + + public DefaultOpenStreamChecker(KRaftMetadataCache metadataCache) { + this.metadataCache = metadataCache; + } + + @Override + public boolean check(long streamId, long epoch) throws StreamFencedException { + S3StreamMetadataImage stream = metadataCache.currentImage().streamsMetadata().getStreamMetadata(streamId); + if (stream == null) { + throw new StreamFencedException("streamId=" + streamId + " cannot be found, it may be deleted or not created yet"); + } + if (stream.getEpoch() > epoch) { + throw new StreamFencedException("streamId=" + streamId + " with epoch=" + epoch + " is fenced by new epoch=" + stream.getEpoch()); + } + return StreamState.CLOSED.equals(stream.state()); + } +} diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index 914f1a1a28..c4709b78c4 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -19,11 +19,11 @@ import kafka.log._ import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords} import kafka.metrics.KafkaMetricsUtil import kafka.utils.{CoreUtils, Logging} -import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException} import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.utils.{ThreadUtils, Time} +import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.Scheduler import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile @@ -610,7 +610,9 @@ object ElasticLog extends Logging { maxTransactionTimeoutMs: Int, producerStateManagerConfig: ProducerStateManagerConfig, topicId: Option[Uuid], - leaderEpoch: Long): ElasticLog = { + leaderEpoch: Long, + openStreamChecker: OpenStreamChecker + ): ElasticLog = { // TODO: better error mark for elastic log logDirFailureChannel.clearOfflineLogDirRecord(dir.getPath) val logIdent = s"[ElasticLog partition=$topicPartition epoch=$leaderEpoch] " @@ -634,6 +636,7 @@ object ElasticLog extends Logging { stream } else { val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong() + awaitStreamReadyForOpen(openStreamChecker, metaStreamId, leaderEpoch, logIdent = logIdent) // open partition meta stream val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build()) .thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent)) @@ -862,4 +865,18 @@ object ElasticLog extends Logging { }) resultCf } + + private def awaitStreamReadyForOpen(checker: OpenStreamChecker, streamId: Long, epoch: Long, logIdent: String): Unit = { + var round = 0 + while(true) { + if (checker.check(streamId, epoch)) { + return + } + round += 1 + if (round % 10 == 0) { + info(s"$logIdent streamId=$streamId is not ready for open, epoch=$epoch") + } + Thread.sleep(100) + } + } } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala index fc67f330eb..99282eefc1 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala @@ -29,7 +29,7 @@ import java.io.File import java.util.concurrent.ConcurrentHashMap import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala -class ElasticLogManager(val client: Client) extends Logging { +class ElasticLogManager(val client: Client, val openStreamChecker: OpenStreamChecker) extends Logging { this.logIdent = s"[ElasticLogManager] " private val elasticLogs = new ConcurrentHashMap[TopicPartition, ElasticUnifiedLog]() @@ -70,7 +70,8 @@ class ElasticLogManager(val client: Client) extends Logging { leaderEpoch, logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER, client, - NAMESPACE + NAMESPACE, + openStreamChecker ) } }, s"Failed to create elastic log for $topicPartition", this) @@ -139,7 +140,7 @@ object ElasticLogManager { val context = new Context() context.config = config context.brokerServer = broker - INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context))) + INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context), new DefaultOpenStreamChecker(broker.metadataCache))) INSTANCE.foreach(_.startup()) ElasticLogSegment.txnCache = new FileCache(config.logDirs.head + "/" + "txnindex-cache", 100 * 1024 * 1024) ElasticLogSegment.timeCache = new FileCache(config.logDirs.head + "/" + "timeindex-cache", 100 * 1024 * 1024) @@ -176,7 +177,19 @@ object ElasticLogManager { logDirFailureChannel: LogDirFailureChannel, topicId: Option[Uuid], leaderEpoch: Long = 0): ElasticUnifiedLog = { - INSTANCE.get.getOrCreateLog(dir, config, scheduler, time, maxTransactionTimeoutMs, producerStateManagerConfig, brokerTopicStats, producerIdExpirationCheckIntervalMs, logDirFailureChannel, topicId, leaderEpoch) + INSTANCE.get.getOrCreateLog( + dir, + config, + scheduler, + time, + maxTransactionTimeoutMs, + producerStateManagerConfig, + brokerTopicStats, + producerIdExpirationCheckIntervalMs, + logDirFailureChannel, + topicId, + leaderEpoch + ) } def shutdown(): Unit = { diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala index b5c2b1ee2b..53eb3d0d6a 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticUnifiedLog.scala @@ -262,13 +262,14 @@ object ElasticUnifiedLog extends Logging { client: Client, namespace: String, + openStreamChecker: OpenStreamChecker, ): ElasticUnifiedLog = { val topicPartition = UnifiedLog.parseTopicPartitionName(dir) val partitionLogDirFailureChannel = new PartitionLogDirFailureChannel(logDirFailureChannel, dir.getPath); LocalLog.maybeHandleIOException(partitionLogDirFailureChannel, dir.getPath, s"failed to open ElasticUnifiedLog $topicPartition in dir $dir") { val start = System.currentTimeMillis() val localLog = ElasticLog(client, namespace, dir, config, scheduler, time, topicPartition, partitionLogDirFailureChannel, - new ConcurrentHashMap[String, Int](), maxTransactionTimeoutMs, producerStateManagerConfig, topicId, leaderEpoch) + new ConcurrentHashMap[String, Int](), maxTransactionTimeoutMs, producerStateManagerConfig, topicId, leaderEpoch, openStreamChecker) val leaderEpochFileCache = ElasticUnifiedLog.maybeCreateLeaderEpochCache(topicPartition, config.recordVersion, new ElasticLeaderEpochCheckpoint(localLog.leaderEpochCheckpointMeta, localLog.saveLeaderEpochCheckpoint)) // The real logStartOffset should be set by loaded offsets from ElasticLogLoader. // Since the real value has been passed to localLog, we just pass it to ElasticUnifiedLog. diff --git a/core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java b/core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java new file mode 100644 index 0000000000..e5bb4a8794 --- /dev/null +++ b/core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package kafka.log.streamaspect; + +import org.apache.kafka.common.errors.s3.StreamFencedException; + +/** + * Check whether a stream is ready for open. + */ +public interface OpenStreamChecker { + OpenStreamChecker NOOP = (streamId, epoch) -> true; + + /** + * Check whether a stream is ready for open. + */ + boolean check(long streamId, long epoch) throws StreamFencedException; + +} diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala index 1203fbc8c6..80eba5e032 100644 --- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala +++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala @@ -407,6 +407,7 @@ class DefaultAlterPartitionManager( controllerChannelManager.sendRequest(request, new ControllerRequestCompletionHandler { override def onTimeout(): Unit = { inflightElectLeadersRequest.set(false) + unsentElectLeaders.addAll(topicPartitions) tryElectLeader(null) throw new IllegalStateException("Encountered unexpected timeout when sending AlterPartition to the controller") } diff --git a/core/src/main/scala/kafka/server/BrokerFeatures.scala b/core/src/main/scala/kafka/server/BrokerFeatures.scala index 41333c2bd3..582192d9ea 100644 --- a/core/src/main/scala/kafka/server/BrokerFeatures.scala +++ b/core/src/main/scala/kafka/server/BrokerFeatures.scala @@ -20,6 +20,7 @@ package kafka.server import kafka.utils.Logging import org.apache.kafka.common.feature.{Features, SupportedVersionRange} import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.common.automq.AutoMQVersion import java.util import scala.jdk.CollectionConverters._ @@ -75,16 +76,17 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { - Features.supportedFeatures( - java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME, - new SupportedVersionRange( - MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), - if (unstableMetadataVersionsEnabled) { - MetadataVersion.latestTesting.featureLevel - } else { - MetadataVersion.latestProduction.featureLevel - } - ))) + val features = new util.HashMap[String, SupportedVersionRange]() + features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange( + MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), + if (unstableMetadataVersionsEnabled) { + MetadataVersion.latestTesting.featureLevel + } else { + MetadataVersion.latestProduction.featureLevel + } + )) + features.put(AutoMQVersion.FEATURE_NAME, new SupportedVersionRange(AutoMQVersion.V0.featureLevel(), AutoMQVersion.LATEST.featureLevel())) + Features.supportedFeatures(features) } def createEmpty(): BrokerFeatures = { diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 015e46a765..3c31ef8ad4 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -22,6 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid} +import org.apache.kafka.server.common.automq.AutoMQVersion import org.apache.kafka.server.common.{Features, MetadataVersion} import java.util @@ -110,6 +111,8 @@ trait MetadataCache { def getRandomAliveBrokerId: Option[Int] def features(): Features + + def autoMQVersion(): AutoMQVersion } object MetadataCache { diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 3dcbdd0df6..9ef6b68b36 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.MetadataResponse import org.apache.kafka.image.MetadataImage import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, Replicas} +import org.apache.kafka.server.common.automq.AutoMQVersion import org.apache.kafka.server.common.{Features, MetadataVersion} import java.util @@ -546,5 +547,12 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w image.highestOffsetAndEpoch().offset, true) } + + // AutoMQ inject start + override def autoMQVersion(): AutoMQVersion = { + _currentImage.features().autoMQVersion() + } + // AutoMQ inject end + } diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index 2a6a880d54..d12f14400d 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -40,6 +40,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.server.common.automq.AutoMQVersion import org.apache.kafka.server.common.{Features, MetadataVersion} import java.util.concurrent.{ThreadLocalRandom, TimeUnit} @@ -719,4 +720,11 @@ class ZkMetadataCache( } override def getFeatureOption: Option[Features] = _features + + // AutoMQ inject start + override def autoMQVersion(): AutoMQVersion = { + throw new UnsupportedOperationException("AutoMQVersion is not supported in ZkMetadataCache") + } + // AutoMQ inject end + } diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index fdcd8e052f..5625538635 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -192,8 +192,11 @@ class ElasticReplicaManager( // instead of delete the partition. val start = System.currentTimeMillis() hostedPartition.partition.close().get() - info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms, trigger leader election") - alterPartitionManager.tryElectLeader(topicPartition) + info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms") + if (!metadataCache.autoMQVersion().isReassignmentV1Supported) { + // TODO: https://github.com/AutoMQ/automq/issues/1153 add schedule check when leader isn't successfully set + alterPartitionManager.tryElectLeader(topicPartition) + } } else { // Logs are not deleted here. They are deleted in a single batch later on. // This is done to avoid having to checkpoint for every deletions. @@ -900,6 +903,7 @@ class ElasticReplicaManager( throw new IllegalStateException(s"Topic $tp exists, but its ID is " + s"${partition.topicId.get}, not $topicId as expected") } + createHook.accept(partition) Some(partition, false) case HostedPartition.None => @@ -1019,8 +1023,8 @@ class ElasticReplicaManager( def doPartitionLeadingOrFollowing(onlyLeaderChange: Boolean): Unit = { stateChangeLogger.info(s"Transitioning partition(s) info: $localChanges") - if (!localChanges.leaders.isEmpty) { - localChanges.leaders.forEach((tp, info) => { + for (replicas <- Seq(localChanges.leaders, localChanges.followers)) { + replicas.forEach((tp, info) => { val prevOp = partitionOpMap.getOrDefault(tp, CompletableFuture.completedFuture(null)) val opCf = new CompletableFuture[Void]() opCfList.add(opCf) @@ -1032,6 +1036,10 @@ class ElasticReplicaManager( val leader = mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]() leader += (tp -> info) applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, leader, directoryIds) + // Elect the leader to let client can find the partition by metadata. + if (info.partition().leader < 0) { + alterPartitionManager.tryElectLeader(tp) + } } catch { case t: Throwable => stateChangeLogger.error(s"Transitioning partition(s) fail: $localChanges", t) } finally { @@ -1042,7 +1050,6 @@ class ElasticReplicaManager( }) }) }) - } // skip becoming follower or adding log dir if (!onlyLeaderChange) { @@ -1075,7 +1082,7 @@ class ElasticReplicaManager( if (!opCfList.isEmpty) { val elapsedMs = System.currentTimeMillis() - start - info(s"open ${localChanges.leaders.size()} / close ${localChanges.deletes.size()} partitions cost ${elapsedMs}ms") + info(s"open ${localChanges.followers().size()} / make leader ${localChanges.leaders.size()} / close ${localChanges.deletes.size()} partitions cost ${elapsedMs}ms") } }) } diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 7c16550301..53f005bc74 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism import org.apache.kafka.common.security.scram.internals.ScramFormatter import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} +import org.apache.kafka.server.common.automq.AutoMQVersion import java.util import java.util.Base64 @@ -397,6 +398,14 @@ object StorageTool extends Logging { for (record <- metadataArguments) metadataRecords.add(record) } + // AutoMQ inject start + metadataRecords.add(new ApiMessageAndVersion( + new FeatureLevelRecord().setName(AutoMQVersion.FEATURE_NAME).setFeatureLevel(AutoMQVersion.LATEST.featureLevel()), + 0.toShort + )) + // AutoMQ inject end + + BootstrapMetadata.fromRecords(metadataRecords, source) } diff --git a/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala b/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala index 2a39a11517..d4e2604bf9 100644 --- a/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala +++ b/core/src/test/scala/kafka/log/streamaspect/ElasticLogCleanerTest.scala @@ -64,6 +64,7 @@ class ElasticLogCleanerTest extends LogCleanerTest { logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER, client, "test_namespace", + OpenStreamChecker.NOOP, ) } } diff --git a/core/src/test/scala/kafka/log/streamaspect/ElasticLogTest.scala b/core/src/test/scala/kafka/log/streamaspect/ElasticLogTest.scala index 08c968c023..4318653f86 100644 --- a/core/src/test/scala/kafka/log/streamaspect/ElasticLogTest.scala +++ b/core/src/test/scala/kafka/log/streamaspect/ElasticLogTest.scala @@ -618,7 +618,8 @@ class ElasticLogTest { maxTransactionTimeoutMs = 5 * 60 * 1000, producerStateManagerConfig = producerStateManagerConfig, topicId = Some(Uuid.ZERO_UUID), - leaderEpoch = 0 + leaderEpoch = 0, + openStreamChecker = OpenStreamChecker.NOOP, ) } } diff --git a/core/src/test/scala/kafka/log/streamaspect/ElasticUnifiedLogTest.scala b/core/src/test/scala/kafka/log/streamaspect/ElasticUnifiedLogTest.scala index 2275e739d9..def98f2274 100644 --- a/core/src/test/scala/kafka/log/streamaspect/ElasticUnifiedLogTest.scala +++ b/core/src/test/scala/kafka/log/streamaspect/ElasticUnifiedLogTest.scala @@ -68,6 +68,7 @@ class ElasticUnifiedLogTest extends UnifiedLogTest { client, "test_namespace", + OpenStreamChecker.NOOP, ) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java index d5911d4f54..3763b0a528 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java @@ -39,6 +39,7 @@ import org.apache.kafka.metadata.migration.ZkMigrationState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.common.automq.AutoMQVersion; import org.apache.kafka.server.mutable.BoundedList; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; @@ -151,6 +152,10 @@ public FeatureControlManager build() { */ private final ClusterFeatureSupportDescriber clusterSupportDescriber; + // AutoMQ inject start + private final TimelineObject autoMQVersion; + // AutoMQ inject end + private FeatureControlManager( LogContext logContext, QuorumFeatures quorumFeatures, @@ -166,6 +171,10 @@ private FeatureControlManager( this.minimumBootstrapVersion = minimumBootstrapVersion; this.migrationControlState = new TimelineObject<>(snapshotRegistry, ZkMigrationState.NONE); this.clusterSupportDescriber = clusterSupportDescriber; + + // AutoMQ inject start + this.autoMQVersion = new TimelineObject<>(snapshotRegistry, AutoMQVersion.V0); + // AutoMQ inject end } ControllerResult> updateFeatures( @@ -406,8 +415,20 @@ public void replay(FeatureLevelRecord record) { finalizedVersions.put(record.name(), record.featureLevel()); log.info("Replayed a FeatureLevelRecord setting feature {} to {}", record.name(), record.featureLevel()); + + } + } + + // AutoMQ inject start + if (record.name().equals(AutoMQVersion.FEATURE_NAME)) { + if (record.featureLevel() == 0) { + autoMQVersion.set(AutoMQVersion.V0); + } else { + autoMQVersion.set(AutoMQVersion.from(record.featureLevel())); } } + // AutoMQ inject end + } public void replay(ZkMigrationStateRecord record) { @@ -426,4 +447,11 @@ public void replay(ZkMigrationStateRecord record) { boolean isControllerId(int nodeId) { return quorumFeatures.isControllerId(nodeId); } + + // AutoMQ inject start + public AutoMQVersion autoMQVersion() { + return autoMQVersion.get(); + } + // AutoMQ inject end + } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java index 9b0355c0db..4b0ec87589 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import org.apache.kafka.server.common.automq.AutoMQVersion; /** * A holder class of the local node's supported feature flags as well as the quorum node IDs. @@ -61,6 +62,7 @@ public static Map defaultFeatureMap(boolean enableUnstable enableUnstable ? MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); + features.put(AutoMQVersion.FEATURE_NAME, VersionRange.of(AutoMQVersion.V0.featureLevel(), AutoMQVersion.LATEST.featureLevel())); return features; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index b357a37b5e..04054df29e 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1715,6 +1715,13 @@ boolean arePartitionLeadersImbalanced() { * @return All of the election records and if there may be more available preferred replicas to elect as leader */ ControllerResult maybeBalancePartitionLeaders() { + // AutoMQ inject start + if (featureControl.autoMQVersion().isReassignmentV1Supported()) { + // AutoMQ isr/replica only has a single node + return ControllerResult.of(Collections.emptyList(), false); + } + // AutoMQ inject end + List records = new ArrayList<>(); boolean rescheduleImmediately = false; @@ -2500,6 +2507,9 @@ Optional changePartitionReassignmentV2(TopicIdPartition tp * @param topicPartition the topic partition that needs to be re-elected */ private void addPartitionToReElectTimeouts(TopicPartition topicPartition) { + if (featureControl.autoMQVersion().isReassignmentV1Supported()) { + return; + } Timeout timeout = timer.newTimeout(t -> { partitionReElectTimeouts.remove(topicPartition); if (quorumController != null) { diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java index 4f22588ed0..df23284f35 100644 --- a/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesImage.java @@ -32,7 +32,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; - +import org.apache.kafka.server.common.automq.AutoMQVersion; /** * Represents the feature levels in the metadata image. @@ -52,6 +52,11 @@ public final class FeaturesImage { private final ZkMigrationState zkMigrationState; + // AutoMQ inject start + // lazy load from finalizedVersions + private AutoMQVersion autoMQVersion; + // AutoMQ inject end + public FeaturesImage( Map finalizedVersions, MetadataVersion metadataVersion, @@ -149,4 +154,13 @@ public boolean equals(Object o) { public String toString() { return new FeaturesImageNode(this).stringify(); } + + // AutoMQ inject start + public AutoMQVersion autoMQVersion() { + if (autoMQVersion == null) { + autoMQVersion = AutoMQVersion.from(finalizedVersion(AutoMQVersion.FEATURE_NAME).orElse((short) 0)); + } + return autoMQVersion; + } + // AutoMQ inject end } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 1983add90e..bed0fe8bd5 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -215,6 +215,10 @@ public List getStreamSetObjects(int nodeId) { return wal.orderList(); } + public S3StreamMetadataImage getStreamMetadata(long streamId) { + return streamsMetadata.get(streamId); + } + @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java new file mode 100644 index 0000000000..af9592f388 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/common/automq/AutoMQVersion.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.server.common.automq; + +public enum AutoMQVersion { + + V0((short) 1), + // Support reassignment v1: elect leader after partition open in the new broker + V1((short) 2); + + public static final String FEATURE_NAME = "automq.version"; + public static final AutoMQVersion LATEST = V1; + + private final short level; + + AutoMQVersion(short level) { + this.level = level; + } + + public static AutoMQVersion from(short level) { + for (AutoMQVersion version : AutoMQVersion.values()) { + if (version.level == level) { + return version; + } + } + if (level == 0) { + // when the version is not set, we assume it is V0 + return V0; + } + throw new IllegalArgumentException("Unknown AutoMQVersion level: " + level); + } + + public short featureLevel() { + return level; + } + + public boolean isReassignmentV1Supported() { + return isAtLeast(V1); + } + + public boolean isAtLeast(AutoMQVersion otherVersion) { + return this.compareTo(otherVersion) >= 0; + } + +} From e2dd1f617e08c4b176fd1fbbe2f8f36ec2744f1d Mon Sep 17 00:00:00 2001 From: Robin Han Date: Fri, 26 Apr 2024 18:07:11 +0800 Subject: [PATCH 2/3] fix: unit test Signed-off-by: Robin Han --- .../scala/kafka/log/streamaspect/ElasticLogManager.scala | 7 ++++++- .../main/scala/kafka/server/metadata/ZkMetadataCache.scala | 2 +- .../server/streamaspect/ElasticReplicaManagerTest.scala | 3 +++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala index 99282eefc1..ac0f8d14d7 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala @@ -140,7 +140,12 @@ object ElasticLogManager { val context = new Context() context.config = config context.brokerServer = broker - INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context), new DefaultOpenStreamChecker(broker.metadataCache))) + val openStreamChecker = if (broker != null) { + new DefaultOpenStreamChecker(broker.metadataCache) + } else { + OpenStreamChecker.NOOP + } + INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context), openStreamChecker)) INSTANCE.foreach(_.startup()) ElasticLogSegment.txnCache = new FileCache(config.logDirs.head + "/" + "txnindex-cache", 100 * 1024 * 1024) ElasticLogSegment.timeCache = new FileCache(config.logDirs.head + "/" + "timeindex-cache", 100 * 1024 * 1024) diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d12f14400d..05d2c1ef46 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -723,7 +723,7 @@ class ZkMetadataCache( // AutoMQ inject start override def autoMQVersion(): AutoMQVersion = { - throw new UnsupportedOperationException("AutoMQVersion is not supported in ZkMetadataCache") + AutoMQVersion.LATEST } // AutoMQ inject end diff --git a/core/src/test/scala/unit/kafka/server/streamaspect/ElasticReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/streamaspect/ElasticReplicaManagerTest.scala index 895b21a643..bdbbf8939e 100644 --- a/core/src/test/scala/unit/kafka/server/streamaspect/ElasticReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/streamaspect/ElasticReplicaManagerTest.scala @@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{DirectoryId, Node, TopicPartition, Uuid} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} +import org.apache.kafka.server.common.automq.AutoMQVersion import org.apache.kafka.server.common.{DirectoryEventHandler, OffsetAndEpoch} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.util.timer.MockTimer @@ -152,6 +153,7 @@ class ElasticReplicaManagerTest extends ReplicaManagerTest { when(metadataCache.topicNamesToIds()).thenReturn(topicIds.asJava) when(metadataCache.topicIdsToNames()).thenReturn(topicNames.asJava) when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + when(metadataCache.autoMQVersion()).thenReturn(AutoMQVersion.LATEST) mockGetAliveBrokerFunctions(metadataCache, aliveBrokers) val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", timer, reaperEnabled = false) @@ -361,6 +363,7 @@ class ElasticReplicaManagerTest extends ReplicaManagerTest { thenReturn(Map(leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-b")).toMap) when(metadataCache.metadataVersion()).thenReturn(config.interBrokerProtocolVersion) + when(metadataCache.autoMQVersion()).thenReturn(AutoMQVersion.LATEST) val mockProducePurgatory = new DelayedOperationPurgatory[DelayedProduce]( purgatoryName = "Produce", timer, reaperEnabled = false) val mockFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch]( From 65d681763d54ffca7ffe85fc90f8e5169c6b9254 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sat, 27 Apr 2024 15:24:30 +0800 Subject: [PATCH 3/3] fix(s3stream): code review Signed-off-by: Robin Han --- .../kafka/log/streamaspect/DefaultOpenStreamChecker.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java b/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java index 699fda4974..61d6f1f020 100644 --- a/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java +++ b/core/src/main/scala/kafka/log/streamaspect/DefaultOpenStreamChecker.java @@ -27,11 +27,10 @@ public DefaultOpenStreamChecker(KRaftMetadataCache metadataCache) { public boolean check(long streamId, long epoch) throws StreamFencedException { S3StreamMetadataImage stream = metadataCache.currentImage().streamsMetadata().getStreamMetadata(streamId); if (stream == null) { - throw new StreamFencedException("streamId=" + streamId + " cannot be found, it may be deleted or not created yet"); - } - if (stream.getEpoch() > epoch) { - throw new StreamFencedException("streamId=" + streamId + " with epoch=" + epoch + " is fenced by new epoch=" + stream.getEpoch()); + throw new StreamFencedException(String.format("streamId=%d cannot be found, it may be deleted or not created yet", streamId)); } + if (stream.getEpoch() > epoch) + throw new StreamFencedException(String.format("streamId=%d with epoch=%d is fenced by new epoch=%d", streamId, epoch, stream.getEpoch())); return StreamState.CLOSED.equals(stream.state()); } }