Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(issues1169): optimize partition reassignment logic to avoid NotLeaderOrFollowerException #1180

Merged
merged 3 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.streamaspect;

import com.automq.stream.s3.metadata.StreamState;
import kafka.server.metadata.KRaftMetadataCache;
import org.apache.kafka.common.errors.s3.StreamFencedException;
import org.apache.kafka.image.S3StreamMetadataImage;

public class DefaultOpenStreamChecker implements OpenStreamChecker {
private final KRaftMetadataCache metadataCache;

public DefaultOpenStreamChecker(KRaftMetadataCache metadataCache) {
this.metadataCache = metadataCache;
}

@Override
public boolean check(long streamId, long epoch) throws StreamFencedException {
S3StreamMetadataImage stream = metadataCache.currentImage().streamsMetadata().getStreamMetadata(streamId);
if (stream == null) {
throw new StreamFencedException(String.format("streamId=%d cannot be found, it may be deleted or not created yet", streamId));
}
if (stream.getEpoch() > epoch)
throw new StreamFencedException(String.format("streamId=%d with epoch=%d is fenced by new epoch=%d", streamId, epoch, stream.getEpoch()));
return StreamState.CLOSED.equals(stream.state());
}
}
21 changes: 19 additions & 2 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import kafka.log._
import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords}
import kafka.metrics.KafkaMetricsUtil
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{ThreadUtils, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
Expand Down Expand Up @@ -610,7 +610,9 @@ object ElasticLog extends Logging {
maxTransactionTimeoutMs: Int,
producerStateManagerConfig: ProducerStateManagerConfig,
topicId: Option[Uuid],
leaderEpoch: Long): ElasticLog = {
leaderEpoch: Long,
openStreamChecker: OpenStreamChecker
): ElasticLog = {
// TODO: better error mark for elastic log
logDirFailureChannel.clearOfflineLogDirRecord(dir.getPath)
val logIdent = s"[ElasticLog partition=$topicPartition epoch=$leaderEpoch] "
Expand All @@ -634,6 +636,7 @@ object ElasticLog extends Logging {
stream
} else {
val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
awaitStreamReadyForOpen(openStreamChecker, metaStreamId, leaderEpoch, logIdent = logIdent)
superhx marked this conversation as resolved.
Show resolved Hide resolved
// open partition meta stream
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
.thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
Expand Down Expand Up @@ -862,4 +865,18 @@ object ElasticLog extends Logging {
})
resultCf
}

private def awaitStreamReadyForOpen(checker: OpenStreamChecker, streamId: Long, epoch: Long, logIdent: String): Unit = {
var round = 0
while(true) {
if (checker.check(streamId, epoch)) {
return
}
round += 1
if (round % 10 == 0) {
info(s"$logIdent streamId=$streamId is not ready for open, epoch=$epoch")
}
Thread.sleep(100)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.io.File
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala

class ElasticLogManager(val client: Client) extends Logging {
class ElasticLogManager(val client: Client, val openStreamChecker: OpenStreamChecker) extends Logging {
this.logIdent = s"[ElasticLogManager] "
private val elasticLogs = new ConcurrentHashMap[TopicPartition, ElasticUnifiedLog]()

Expand Down Expand Up @@ -70,7 +70,8 @@ class ElasticLogManager(val client: Client) extends Logging {
leaderEpoch,
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER,
client,
NAMESPACE
NAMESPACE,
openStreamChecker
)
}
}, s"Failed to create elastic log for $topicPartition", this)
Expand Down Expand Up @@ -139,7 +140,12 @@ object ElasticLogManager {
val context = new Context()
context.config = config
context.brokerServer = broker
INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context)))
val openStreamChecker = if (broker != null) {
new DefaultOpenStreamChecker(broker.metadataCache)
} else {
OpenStreamChecker.NOOP
}
INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context), openStreamChecker))
INSTANCE.foreach(_.startup())
ElasticLogSegment.txnCache = new FileCache(config.logDirs.head + "/" + "txnindex-cache", 100 * 1024 * 1024)
ElasticLogSegment.timeCache = new FileCache(config.logDirs.head + "/" + "timeindex-cache", 100 * 1024 * 1024)
Expand Down Expand Up @@ -176,7 +182,19 @@ object ElasticLogManager {
logDirFailureChannel: LogDirFailureChannel,
topicId: Option[Uuid],
leaderEpoch: Long = 0): ElasticUnifiedLog = {
INSTANCE.get.getOrCreateLog(dir, config, scheduler, time, maxTransactionTimeoutMs, producerStateManagerConfig, brokerTopicStats, producerIdExpirationCheckIntervalMs, logDirFailureChannel, topicId, leaderEpoch)
INSTANCE.get.getOrCreateLog(
dir,
config,
scheduler,
time,
maxTransactionTimeoutMs,
producerStateManagerConfig,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
logDirFailureChannel,
topicId,
leaderEpoch
)
}

def shutdown(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,14 @@ object ElasticUnifiedLog extends Logging {

client: Client,
namespace: String,
openStreamChecker: OpenStreamChecker,
): ElasticUnifiedLog = {
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val partitionLogDirFailureChannel = new PartitionLogDirFailureChannel(logDirFailureChannel, dir.getPath);
LocalLog.maybeHandleIOException(partitionLogDirFailureChannel, dir.getPath, s"failed to open ElasticUnifiedLog $topicPartition in dir $dir") {
val start = System.currentTimeMillis()
val localLog = ElasticLog(client, namespace, dir, config, scheduler, time, topicPartition, partitionLogDirFailureChannel,
new ConcurrentHashMap[String, Int](), maxTransactionTimeoutMs, producerStateManagerConfig, topicId, leaderEpoch)
new ConcurrentHashMap[String, Int](), maxTransactionTimeoutMs, producerStateManagerConfig, topicId, leaderEpoch, openStreamChecker)
val leaderEpochFileCache = ElasticUnifiedLog.maybeCreateLeaderEpochCache(topicPartition, config.recordVersion, new ElasticLeaderEpochCheckpoint(localLog.leaderEpochCheckpointMeta, localLog.saveLeaderEpochCheckpoint))
// The real logStartOffset should be set by loaded offsets from ElasticLogLoader.
// Since the real value has been passed to localLog, we just pass it to ElasticUnifiedLog.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.streamaspect;

import org.apache.kafka.common.errors.s3.StreamFencedException;

/**
* Check whether a stream is ready for open.
*/
public interface OpenStreamChecker {
OpenStreamChecker NOOP = (streamId, epoch) -> true;

/**
* Check whether a stream is ready for open.
*/
boolean check(long streamId, long epoch) throws StreamFencedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ class DefaultAlterPartitionManager(
controllerChannelManager.sendRequest(request, new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
inflightElectLeadersRequest.set(false)
unsentElectLeaders.addAll(topicPartitions)
tryElectLeader(null)
throw new IllegalStateException("Encountered unexpected timeout when sending AlterPartition to the controller")
}
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/kafka/server/BrokerFeatures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.automq.AutoMQVersion

import java.util
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -75,16 +76,17 @@ object BrokerFeatures extends Logging {
}

def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
Features.supportedFeatures(
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableMetadataVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
}
)))
val features = new util.HashMap[String, SupportedVersionRange]()
features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableMetadataVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
}
))
features.put(AutoMQVersion.FEATURE_NAME, new SupportedVersionRange(AutoMQVersion.V0.featureLevel(), AutoMQVersion.LATEST.featureLevel()))
Features.supportedFeatures(features)
superhx marked this conversation as resolved.
Show resolved Hide resolved
}

def createEmpty(): BrokerFeatures = {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.server.common.automq.AutoMQVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}

import java.util
Expand Down Expand Up @@ -110,6 +111,8 @@ trait MetadataCache {
def getRandomAliveBrokerId: Option[Int]

def features(): Features

def autoMQVersion(): AutoMQVersion
}

object MetadataCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, Replicas}
import org.apache.kafka.server.common.automq.AutoMQVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}

import java.util
Expand Down Expand Up @@ -546,5 +547,12 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
image.highestOffsetAndEpoch().offset,
true)
}

// AutoMQ inject start
override def autoMQVersion(): AutoMQVersion = {
_currentImage.features().autoMQVersion()
}
// AutoMQ inject end

}

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.automq.AutoMQVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}

import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
Expand Down Expand Up @@ -719,4 +720,11 @@ class ZkMetadataCache(
}

override def getFeatureOption: Option[Features] = _features

// AutoMQ inject start
override def autoMQVersion(): AutoMQVersion = {
AutoMQVersion.LATEST
}
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ class ElasticReplicaManager(
// instead of delete the partition.
val start = System.currentTimeMillis()
hostedPartition.partition.close().get()
info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms, trigger leader election")
alterPartitionManager.tryElectLeader(topicPartition)
info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms")
if (!metadataCache.autoMQVersion().isReassignmentV1Supported) {
// TODO: https://github.com/AutoMQ/automq/issues/1153 add schedule check when leader isn't successfully set
alterPartitionManager.tryElectLeader(topicPartition)
}
} else {
// Logs are not deleted here. They are deleted in a single batch later on.
// This is done to avoid having to checkpoint for every deletions.
Expand Down Expand Up @@ -900,6 +903,7 @@ class ElasticReplicaManager(
throw new IllegalStateException(s"Topic $tp exists, but its ID is " +
s"${partition.topicId.get}, not $topicId as expected")
}
createHook.accept(partition)
Some(partition, false)

case HostedPartition.None =>
Expand Down Expand Up @@ -1019,8 +1023,8 @@ class ElasticReplicaManager(

def doPartitionLeadingOrFollowing(onlyLeaderChange: Boolean): Unit = {
stateChangeLogger.info(s"Transitioning partition(s) info: $localChanges")
if (!localChanges.leaders.isEmpty) {
localChanges.leaders.forEach((tp, info) => {
for (replicas <- Seq(localChanges.leaders, localChanges.followers)) {
replicas.forEach((tp, info) => {
val prevOp = partitionOpMap.getOrDefault(tp, CompletableFuture.completedFuture(null))
val opCf = new CompletableFuture[Void]()
opCfList.add(opCf)
Expand All @@ -1032,6 +1036,10 @@ class ElasticReplicaManager(
val leader = mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]()
leader += (tp -> info)
applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, leader, directoryIds)
// Elect the leader to let client can find the partition by metadata.
if (info.partition().leader < 0) {
alterPartitionManager.tryElectLeader(tp)
}
} catch {
case t: Throwable => stateChangeLogger.error(s"Transitioning partition(s) fail: $localChanges", t)
} finally {
Expand All @@ -1042,7 +1050,6 @@ class ElasticReplicaManager(
})
})
})

}
// skip becoming follower or adding log dir
if (!onlyLeaderChange) {
Expand Down Expand Up @@ -1075,7 +1082,7 @@ class ElasticReplicaManager(

if (!opCfList.isEmpty) {
val elapsedMs = System.currentTimeMillis() - start
info(s"open ${localChanges.leaders.size()} / close ${localChanges.deletes.size()} partitions cost ${elapsedMs}ms")
info(s"open ${localChanges.followers().size()} / make leader ${localChanges.leaders.size()} / close ${localChanges.deletes.size()} partitions cost ${elapsedMs}ms")
}
})
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.scram.internals.ScramFormatter
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.automq.AutoMQVersion

import java.util
import java.util.Base64
Expand Down Expand Up @@ -397,6 +398,14 @@ object StorageTool extends Logging {
for (record <- metadataArguments) metadataRecords.add(record)
}

// AutoMQ inject start
metadataRecords.add(new ApiMessageAndVersion(
new FeatureLevelRecord().setName(AutoMQVersion.FEATURE_NAME).setFeatureLevel(AutoMQVersion.LATEST.featureLevel()),
0.toShort
))
// AutoMQ inject end


BootstrapMetadata.fromRecords(metadataRecords, source)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ElasticLogCleanerTest extends LogCleanerTest {
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER,
client,
"test_namespace",
OpenStreamChecker.NOOP,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,8 @@ class ElasticLogTest {
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
topicId = Some(Uuid.ZERO_UUID),
leaderEpoch = 0
leaderEpoch = 0,
openStreamChecker = OpenStreamChecker.NOOP,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ElasticUnifiedLogTest extends UnifiedLogTest {

client,
"test_namespace",
OpenStreamChecker.NOOP,
)
}

Expand Down
Loading
Loading