From 7591868aead54fff7d5e8a44c5e06746ed34866b Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 11 Dec 2024 10:56:47 +0100 Subject: [PATCH] KAFKA-18179: Move AsyncOffsetReadFutureHolder to storage module (#18095) Reviewers: Christo Lolov --- .../kafka/log/remote/RemoteLogManager.java | 2 +- .../scala/kafka/log/OffsetResultHolder.scala | 12 +------- .../server/ListOffsetsPartitionStatus.scala | 2 +- .../log/remote/RemoteLogOffsetReaderTest.java | 2 +- .../server/DelayedRemoteListOffsetsTest.scala | 2 +- .../log/AsyncOffsetReadFutureHolder.java | 30 +++++++++++++++++++ 6 files changed, 35 insertions(+), 15 deletions(-) create mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 6e95e36020b8d..b2b1ab856c04f 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -17,7 +17,6 @@ package kafka.log.remote; import kafka.cluster.Partition; -import kafka.log.AsyncOffsetReadFutureHolder; import kafka.log.UnifiedLog; import kafka.server.DelayedRemoteListOffsets; @@ -74,6 +73,7 @@ import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.AbortedTxn; +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.EpochEntry; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; diff --git a/core/src/main/scala/kafka/log/OffsetResultHolder.scala b/core/src/main/scala/kafka/log/OffsetResultHolder.scala index 64b78c6cee912..89951dbb96f2b 100644 --- a/core/src/main/scala/kafka/log/OffsetResultHolder.scala +++ b/core/src/main/scala/kafka/log/OffsetResultHolder.scala @@ -18,8 +18,7 @@ package kafka.log import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.record.FileRecords.TimestampAndOffset - -import java.util.concurrent.{CompletableFuture, Future} +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset], futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]] = None) { @@ -27,12 +26,3 @@ case class OffsetResultHolder(timestampAndOffsetOpt: Option[TimestampAndOffset], var maybeOffsetsError: Option[ApiException] = None var lastFetchableOffset: Option[Long] = None } - -/** - * A remote log offset read task future holder. It contains two futures: - * 1. JobFuture - Use this future to cancel the running job. - * 2. TaskFuture - Use this future to get the result of the job/computation. - */ -case class AsyncOffsetReadFutureHolder[T](jobFuture: Future[Void], taskFuture: CompletableFuture[T]) { - -} diff --git a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala index 702d0a4ccb8ef..d9fb9e6d059db 100644 --- a/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala +++ b/core/src/main/scala/kafka/server/ListOffsetsPartitionStatus.scala @@ -16,10 +16,10 @@ */ package kafka.server -import kafka.log.AsyncOffsetReadFutureHolder import org.apache.kafka.common.errors.ApiException import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder class ListOffsetsPartitionStatus(val futureHolderOpt: Option[AsyncOffsetReadFutureHolder[Either[Exception, Option[TimestampAndOffset]]]], val lastFetchableOffset: Option[Long], diff --git a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java index ce027f8f91510..1313c8e2898ed 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogOffsetReaderTest.java @@ -16,7 +16,6 @@ */ package kafka.log.remote; -import kafka.log.AsyncOffsetReadFutureHolder; import kafka.utils.TestUtils; import org.apache.kafka.common.TopicPartition; @@ -28,6 +27,7 @@ import org.apache.kafka.server.util.MockTime; import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala index 4061e6aaaf8ff..eaa4589595954 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteListOffsetsTest.scala @@ -16,7 +16,6 @@ */ package kafka.server -import kafka.log.AsyncOffsetReadFutureHolder import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.NotLeaderOrFollowerException import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse @@ -25,6 +24,7 @@ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.requests.ListOffsetsResponse import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} import org.apache.kafka.server.util.timer.MockTimer +import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder import org.junit.jupiter.api.{AfterEach, Test} import org.junit.jupiter.api.Assertions.assertEquals import org.mockito.ArgumentMatchers.anyBoolean diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java new file mode 100644 index 0000000000000..990a5ef67ddf0 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/AsyncOffsetReadFutureHolder.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +/** + * A remote log offset read task future holder. It contains two futures: + *
    + *
  1. JobFuture - Use this future to cancel the running job. + *
  2. TaskFuture - Use this future to get the result of the job/computation. + *
+ */ +public record AsyncOffsetReadFutureHolder(Future jobFuture, CompletableFuture taskFuture) { +}