Skip to content

Commit

Permalink
feat(issues1169): optimize partition reassignment logic to avoid NotL…
Browse files Browse the repository at this point in the history
…eaderOrFollowerException

Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx committed Apr 26, 2024
1 parent 30a24c8 commit 1568ed9
Show file tree
Hide file tree
Showing 21 changed files with 273 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.streamaspect;

import com.automq.stream.s3.metadata.StreamState;
import kafka.server.metadata.KRaftMetadataCache;
import org.apache.kafka.common.errors.s3.StreamFencedException;
import org.apache.kafka.image.S3StreamMetadataImage;

public class DefaultOpenStreamChecker implements OpenStreamChecker {
private final KRaftMetadataCache metadataCache;

public DefaultOpenStreamChecker(KRaftMetadataCache metadataCache) {
this.metadataCache = metadataCache;
}

@Override
public boolean check(long streamId, long epoch) throws StreamFencedException {
S3StreamMetadataImage stream = metadataCache.currentImage().streamsMetadata().getStreamMetadata(streamId);
if (stream == null) {
throw new StreamFencedException("streamId=" + streamId + " cannot be found, it may be deleted or not created yet");
}
if (stream.getEpoch() > epoch) {
throw new StreamFencedException("streamId=" + streamId + " with epoch=" + epoch + " is fenced by new epoch=" + stream.getEpoch());
}
return StreamState.CLOSED.equals(stream.state());
}
}
21 changes: 19 additions & 2 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import kafka.log._
import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords}
import kafka.metrics.KafkaMetricsUtil
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.utils.{ThreadUtils, Time}
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.Scheduler
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile
Expand Down Expand Up @@ -610,7 +610,9 @@ object ElasticLog extends Logging {
maxTransactionTimeoutMs: Int,
producerStateManagerConfig: ProducerStateManagerConfig,
topicId: Option[Uuid],
leaderEpoch: Long): ElasticLog = {
leaderEpoch: Long,
openStreamChecker: OpenStreamChecker
): ElasticLog = {
// TODO: better error mark for elastic log
logDirFailureChannel.clearOfflineLogDirRecord(dir.getPath)
val logIdent = s"[ElasticLog partition=$topicPartition epoch=$leaderEpoch] "
Expand All @@ -634,6 +636,7 @@ object ElasticLog extends Logging {
stream
} else {
val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
awaitStreamReadyForOpen(openStreamChecker, metaStreamId, leaderEpoch, logIdent = logIdent)
// open partition meta stream
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
.thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
Expand Down Expand Up @@ -862,4 +865,18 @@ object ElasticLog extends Logging {
})
resultCf
}

private def awaitStreamReadyForOpen(checker: OpenStreamChecker, streamId: Long, epoch: Long, logIdent: String): Unit = {
var round = 0
while(true) {
if (checker.check(streamId, epoch)) {
return
}
round += 1
if (round % 10 == 0) {
info(s"$logIdent streamId=$streamId is not ready for open, epoch=$epoch")
}
Thread.sleep(100)
}
}
}
21 changes: 17 additions & 4 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.io.File
import java.util.concurrent.ConcurrentHashMap
import scala.jdk.CollectionConverters.ConcurrentMapHasAsScala

class ElasticLogManager(val client: Client) extends Logging {
class ElasticLogManager(val client: Client, val openStreamChecker: OpenStreamChecker) extends Logging {
this.logIdent = s"[ElasticLogManager] "
private val elasticLogs = new ConcurrentHashMap[TopicPartition, ElasticUnifiedLog]()

Expand Down Expand Up @@ -70,7 +70,8 @@ class ElasticLogManager(val client: Client) extends Logging {
leaderEpoch,
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER,
client,
NAMESPACE
NAMESPACE,
openStreamChecker
)
}
}, s"Failed to create elastic log for $topicPartition", this)
Expand Down Expand Up @@ -139,7 +140,7 @@ object ElasticLogManager {
val context = new Context()
context.config = config
context.brokerServer = broker
INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context)))
INSTANCE = Some(new ElasticLogManager(ClientFactoryProxy.get(context), new DefaultOpenStreamChecker(broker.metadataCache)))
INSTANCE.foreach(_.startup())
ElasticLogSegment.txnCache = new FileCache(config.logDirs.head + "/" + "txnindex-cache", 100 * 1024 * 1024)
ElasticLogSegment.timeCache = new FileCache(config.logDirs.head + "/" + "timeindex-cache", 100 * 1024 * 1024)
Expand Down Expand Up @@ -176,7 +177,19 @@ object ElasticLogManager {
logDirFailureChannel: LogDirFailureChannel,
topicId: Option[Uuid],
leaderEpoch: Long = 0): ElasticUnifiedLog = {
INSTANCE.get.getOrCreateLog(dir, config, scheduler, time, maxTransactionTimeoutMs, producerStateManagerConfig, brokerTopicStats, producerIdExpirationCheckIntervalMs, logDirFailureChannel, topicId, leaderEpoch)
INSTANCE.get.getOrCreateLog(
dir,
config,
scheduler,
time,
maxTransactionTimeoutMs,
producerStateManagerConfig,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
logDirFailureChannel,
topicId,
leaderEpoch
)
}

def shutdown(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,14 @@ object ElasticUnifiedLog extends Logging {

client: Client,
namespace: String,
openStreamChecker: OpenStreamChecker,
): ElasticUnifiedLog = {
val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
val partitionLogDirFailureChannel = new PartitionLogDirFailureChannel(logDirFailureChannel, dir.getPath);
LocalLog.maybeHandleIOException(partitionLogDirFailureChannel, dir.getPath, s"failed to open ElasticUnifiedLog $topicPartition in dir $dir") {
val start = System.currentTimeMillis()
val localLog = ElasticLog(client, namespace, dir, config, scheduler, time, topicPartition, partitionLogDirFailureChannel,
new ConcurrentHashMap[String, Int](), maxTransactionTimeoutMs, producerStateManagerConfig, topicId, leaderEpoch)
new ConcurrentHashMap[String, Int](), maxTransactionTimeoutMs, producerStateManagerConfig, topicId, leaderEpoch, openStreamChecker)
val leaderEpochFileCache = ElasticUnifiedLog.maybeCreateLeaderEpochCache(topicPartition, config.recordVersion, new ElasticLeaderEpochCheckpoint(localLog.leaderEpochCheckpointMeta, localLog.saveLeaderEpochCheckpoint))
// The real logStartOffset should be set by loaded offsets from ElasticLogLoader.
// Since the real value has been passed to localLog, we just pass it to ElasticUnifiedLog.
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/kafka/log/streamaspect/OpenStreamChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.log.streamaspect;

import org.apache.kafka.common.errors.s3.StreamFencedException;

/**
* Check whether a stream is ready for open.
*/
public interface OpenStreamChecker {
OpenStreamChecker NOOP = (streamId, epoch) -> true;

/**
* Check whether a stream is ready for open.
*/
boolean check(long streamId, long epoch) throws StreamFencedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ class DefaultAlterPartitionManager(
controllerChannelManager.sendRequest(request, new ControllerRequestCompletionHandler {
override def onTimeout(): Unit = {
inflightElectLeadersRequest.set(false)
unsentElectLeaders.addAll(topicPartitions)
tryElectLeader(null)
throw new IllegalStateException("Encountered unexpected timeout when sending AlterPartition to the controller")
}
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/kafka/server/BrokerFeatures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.automq.AutoMQVersion

import java.util
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -75,16 +76,17 @@ object BrokerFeatures extends Logging {
}

def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = {
Features.supportedFeatures(
java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableMetadataVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
}
)))
val features = new util.HashMap[String, SupportedVersionRange]()
features.put(MetadataVersion.FEATURE_NAME, new SupportedVersionRange(
MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
if (unstableMetadataVersionsEnabled) {
MetadataVersion.latestTesting.featureLevel
} else {
MetadataVersion.latestProduction.featureLevel
}
))
features.put(AutoMQVersion.FEATURE_NAME, new SupportedVersionRange(AutoMQVersion.V0.featureLevel(), AutoMQVersion.LATEST.featureLevel()))
Features.supportedFeatures(features)
}

def createEmpty(): BrokerFeatures = {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/MetadataCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
import org.apache.kafka.server.common.automq.AutoMQVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}

import java.util
Expand Down Expand Up @@ -110,6 +111,8 @@ trait MetadataCache {
def getRandomAliveBrokerId: Option[Int]

def features(): Features

def autoMQVersion(): AutoMQVersion
}

object MetadataCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.MetadataResponse
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.metadata.{BrokerRegistration, PartitionRegistration, Replicas}
import org.apache.kafka.server.common.automq.AutoMQVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}

import java.util
Expand Down Expand Up @@ -546,5 +547,12 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w
image.highestOffsetAndEpoch().offset,
true)
}

// AutoMQ inject start
override def autoMQVersion(): AutoMQVersion = {
_currentImage.features().autoMQVersion()
}
// AutoMQ inject end

}

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{AbstractControlRequest, ApiVersionsResponse, MetadataResponse, UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.common.automq.AutoMQVersion
import org.apache.kafka.server.common.{Features, MetadataVersion}

import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
Expand Down Expand Up @@ -719,4 +720,11 @@ class ZkMetadataCache(
}

override def getFeatureOption: Option[Features] = _features

// AutoMQ inject start
override def autoMQVersion(): AutoMQVersion = {
throw new UnsupportedOperationException("AutoMQVersion is not supported in ZkMetadataCache")
}
// AutoMQ inject end

}
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ class ElasticReplicaManager(
// instead of delete the partition.
val start = System.currentTimeMillis()
hostedPartition.partition.close().get()
info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms, trigger leader election")
alterPartitionManager.tryElectLeader(topicPartition)
info(s"partition $topicPartition is closed, cost ${System.currentTimeMillis() - start} ms")
if (!metadataCache.autoMQVersion().isReassignmentV1Supported) {
// TODO: https://github.com/AutoMQ/automq/issues/1153 add schedule check when leader isn't successfully set
alterPartitionManager.tryElectLeader(topicPartition)
}
} else {
// Logs are not deleted here. They are deleted in a single batch later on.
// This is done to avoid having to checkpoint for every deletions.
Expand Down Expand Up @@ -900,6 +903,7 @@ class ElasticReplicaManager(
throw new IllegalStateException(s"Topic $tp exists, but its ID is " +
s"${partition.topicId.get}, not $topicId as expected")
}
createHook.accept(partition)
Some(partition, false)

case HostedPartition.None =>
Expand Down Expand Up @@ -1019,8 +1023,8 @@ class ElasticReplicaManager(

def doPartitionLeadingOrFollowing(onlyLeaderChange: Boolean): Unit = {
stateChangeLogger.info(s"Transitioning partition(s) info: $localChanges")
if (!localChanges.leaders.isEmpty) {
localChanges.leaders.forEach((tp, info) => {
for (replicas <- Seq(localChanges.leaders, localChanges.followers)) {
replicas.forEach((tp, info) => {
val prevOp = partitionOpMap.getOrDefault(tp, CompletableFuture.completedFuture(null))
val opCf = new CompletableFuture[Void]()
opCfList.add(opCf)
Expand All @@ -1032,6 +1036,10 @@ class ElasticReplicaManager(
val leader = mutable.Map[TopicPartition, LocalReplicaChanges.PartitionInfo]()
leader += (tp -> info)
applyLocalLeadersDelta(leaderChangedPartitions, newImage, delta, lazyOffsetCheckpoints, leader, directoryIds)
// Elect the leader to let client can find the partition by metadata.
if (info.partition().leader < 0) {
alterPartitionManager.tryElectLeader(tp)
}
} catch {
case t: Throwable => stateChangeLogger.error(s"Transitioning partition(s) fail: $localChanges", t)
} finally {
Expand All @@ -1042,7 +1050,6 @@ class ElasticReplicaManager(
})
})
})

}
// skip becoming follower or adding log dir
if (!onlyLeaderChange) {
Expand Down Expand Up @@ -1075,7 +1082,7 @@ class ElasticReplicaManager(

if (!opCfList.isEmpty) {
val elapsedMs = System.currentTimeMillis() - start
info(s"open ${localChanges.leaders.size()} / close ${localChanges.deletes.size()} partitions cost ${elapsedMs}ms")
info(s"open ${localChanges.followers().size()} / make leader ${localChanges.leaders.size()} / close ${localChanges.deletes.size()} partitions cost ${elapsedMs}ms")
}
})
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.security.scram.internals.ScramFormatter
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.automq.AutoMQVersion

import java.util
import java.util.Base64
Expand Down Expand Up @@ -397,6 +398,14 @@ object StorageTool extends Logging {
for (record <- metadataArguments) metadataRecords.add(record)
}

// AutoMQ inject start
metadataRecords.add(new ApiMessageAndVersion(
new FeatureLevelRecord().setName(AutoMQVersion.FEATURE_NAME).setFeatureLevel(AutoMQVersion.LATEST.featureLevel()),
0.toShort
))
// AutoMQ inject end


BootstrapMetadata.fromRecords(metadataRecords, source)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class ElasticLogCleanerTest extends LogCleanerTest {
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER,
client,
"test_namespace",
OpenStreamChecker.NOOP,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,8 @@ class ElasticLogTest {
maxTransactionTimeoutMs = 5 * 60 * 1000,
producerStateManagerConfig = producerStateManagerConfig,
topicId = Some(Uuid.ZERO_UUID),
leaderEpoch = 0
leaderEpoch = 0,
openStreamChecker = OpenStreamChecker.NOOP,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class ElasticUnifiedLogTest extends UnifiedLogTest {

client,
"test_namespace",
OpenStreamChecker.NOOP,
)
}

Expand Down
Loading

0 comments on commit 1568ed9

Please sign in to comment.