Skip to content

Commit

Permalink
chore(log): convert stream fenced log to warn level (#1219)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored May 7, 2024
1 parent 5336da1 commit 7e7f417
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ public String toString() {
case NODE_EPOCH_NOT_EXIST:
LOGGER.error("Node epoch expired or not exist, stream {}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
case STREAM_NOT_EXIST:
case STREAM_FENCED:
LOGGER.warn("[STREAM_FENCED] open stream failed streamId={}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
case STREAM_NOT_EXIST:
case STREAM_INNER_ERROR:
LOGGER.error("Unexpected error while opening stream: {}, epoch {}, code: {}", streamId, epoch, code);
throw code.exception();
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/log/streamaspect/ElasticLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import kafka.log._
import kafka.log.streamaspect.ElasticLogFileRecords.{BatchIteratorRecordsAdaptor, PooledMemoryRecords}
import kafka.metrics.KafkaMetricsUtil
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.errors.s3.StreamFencedException
import org.apache.kafka.common.errors.{KafkaStorageException, OffsetOutOfRangeException}
import org.apache.kafka.common.message.FetchResponseData
import org.apache.kafka.common.record.MemoryRecords
Expand Down Expand Up @@ -734,11 +735,18 @@ object ElasticLog extends Logging {
if (metaStream != null) {
metaStream.close().get
}
}, this)
CoreUtils.swallow({
if (logStreamManager != null) {
logStreamManager.close().get()
}
}, this)
error(s"${logIdent}failed to open elastic log, trying to close streams. Error msg: ${e.getMessage}")
val cause = FutureUtil.cause(e)
if (cause.isInstanceOf[StreamFencedException]) {
warn(s"${logIdent}failed to open elastic log, trying to close streams.", e)
} else {
error(s"${logIdent}failed to open elastic log, trying to close streams.", e)
}
throw e
}

Expand Down
43 changes: 19 additions & 24 deletions core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import kafka.log.UnifiedLog
import kafka.log.streamaspect.ElasticLogManager.NAMESPACE
import kafka.log.streamaspect.cache.FileCache
import kafka.log.streamaspect.client.{ClientFactoryProxy, Context}
import kafka.log.streamaspect.utils.ExceptionUtil
import kafka.server.{BrokerServer, BrokerTopicStats, KafkaConfig}
import kafka.utils.Logging
import org.apache.kafka.common.{TopicPartition, Uuid}
Expand Down Expand Up @@ -53,29 +52,25 @@ class ElasticLogManager(val client: Client, val openStreamChecker: OpenStreamChe
var elasticLog: ElasticUnifiedLog = null
// Only Partition#makeLeader will create a new log, the ReplicaManager#asyncApplyDelta will ensure the same partition
// operate sequentially. So it's safe without lock
ExceptionUtil.maybeRecordThrowableAndRethrow(new Runnable {
override def run(): Unit = {
// ElasticLog new is a time cost operation.
elasticLog = ElasticUnifiedLog(
dir,
config,
scheduler,
time,
maxTransactionTimeoutMs,
producerStateManagerConfig,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
logDirFailureChannel,
topicId,
leaderEpoch,
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER,
client,
NAMESPACE,
openStreamChecker
)
}
}, s"Failed to create elastic log for $topicPartition", this)
elasticLogs.putIfAbsent(topicPartition, elasticLog)
// ElasticLog new is a time cost operation.
elasticLog = ElasticUnifiedLog(
dir,
config,
scheduler,
time,
maxTransactionTimeoutMs,
producerStateManagerConfig,
brokerTopicStats,
producerIdExpirationCheckIntervalMs,
logDirFailureChannel,
topicId,
leaderEpoch,
logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER,
client,
NAMESPACE,
openStreamChecker
)
elasticLogs.put(topicPartition, elasticLog)
elasticLog
}

Expand Down

0 comments on commit 7e7f417

Please sign in to comment.