diff --git a/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java index b378d4ec11..ebba44e0cc 100644 --- a/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/streams/ControllerStreamManager.java @@ -216,8 +216,10 @@ public String toString() { case NODE_EPOCH_NOT_EXIST: LOGGER.error("Node epoch expired or not exist, stream {}, epoch {}, code: {}", streamId, epoch, code); throw code.exception(); - case STREAM_NOT_EXIST: case STREAM_FENCED: + LOGGER.warn("[STREAM_FENCED] open stream failed streamId={}, epoch {}, code: {}", streamId, epoch, code); + throw code.exception(); + case STREAM_NOT_EXIST: case STREAM_INNER_ERROR: LOGGER.error("Unexpected error while opening stream: {}, epoch {}, code: {}", streamId, epoch, code); throw code.exception(); diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala index 790a80a78f..ea3efac4f7 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala @@ -19,6 +19,7 @@ import kafka.log._ import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords} import kafka.metrics.KafkaMetricsUtil import kafka.utils.{CoreUtils, Logging} +import org.apache.kafka.common.errors.s3.StreamFencedException import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException} import org.apache.kafka.common.message.FetchResponseData import org.apache.kafka.common.record.MemoryRecords @@ -734,11 +735,18 @@ object ElasticLog extends Logging { if (metaStream != null) { metaStream.close().get } + }, this) + CoreUtils.swallow({ if (logStreamManager != null) { logStreamManager.close().get() } }, this) - error(s"${logIdent}failed to open elastic log, trying to close streams. Error msg: ${e.getMessage}") + val cause = FutureUtil.cause(e) + if (cause.isInstanceOf[StreamFencedException]) { + warn(s"${logIdent}failed to open elastic log, trying to close streams.", e) + } else { + error(s"${logIdent}failed to open elastic log, trying to close streams.", e) + } throw e } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala index ac0f8d14d7..6a8be4e01a 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala @@ -17,7 +17,6 @@ import kafka.log.UnifiedLog import kafka.log.streamaspect.ElasticLogManager.NAMESPACE import kafka.log.streamaspect.cache.FileCache import kafka.log.streamaspect.client.{ClientFactoryProxy, Context} -import kafka.log.streamaspect.utils.ExceptionUtil import kafka.server.{BrokerServer, BrokerTopicStats, KafkaConfig} import kafka.utils.Logging import org.apache.kafka.common.{TopicPartition, Uuid} @@ -53,29 +52,25 @@ class ElasticLogManager(val client: Client, val openStreamChecker: OpenStreamChe var elasticLog: ElasticUnifiedLog = null // Only Partition#makeLeader will create a new log, the ReplicaManager#asyncApplyDelta will ensure the same partition // operate sequentially. So it's safe without lock - ExceptionUtil.maybeRecordThrowableAndRethrow(new Runnable { - override def run(): Unit = { - // ElasticLog new is a time cost operation. - elasticLog = ElasticUnifiedLog( - dir, - config, - scheduler, - time, - maxTransactionTimeoutMs, - producerStateManagerConfig, - brokerTopicStats, - producerIdExpirationCheckIntervalMs, - logDirFailureChannel, - topicId, - leaderEpoch, - logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER, - client, - NAMESPACE, - openStreamChecker - ) - } - }, s"Failed to create elastic log for $topicPartition", this) - elasticLogs.putIfAbsent(topicPartition, elasticLog) + // ElasticLog new is a time cost operation. + elasticLog = ElasticUnifiedLog( + dir, + config, + scheduler, + time, + maxTransactionTimeoutMs, + producerStateManagerConfig, + brokerTopicStats, + producerIdExpirationCheckIntervalMs, + logDirFailureChannel, + topicId, + leaderEpoch, + logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER, + client, + NAMESPACE, + openStreamChecker + ) + elasticLogs.put(topicPartition, elasticLog) elasticLog }