From 7d702e17604c6cc95e4e8bfdea58b04be3f5ffc3 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Fri, 21 Jun 2024 15:54:21 +0100 Subject: [PATCH] KAFKA-16749: Implemented share fetch messages (KIP-932) (#16377) Share group consumers use the ShareFetch API to retrieve messages they've claimed (acquired records) from the leader brokers of share partitions. The replica manager provides an API to retrieve messages directly from the underlying topic partition. The implementation of the fetch messages uses replica manager to fetch messages from specific offset known by share partition leader. The requests are sent to a queue and processed asynchronously. Reviewers: Andrew Schofield , Manikumar Reddy --- .../kafka/server/share/SharePartition.java | 25 +- .../server/share/SharePartitionManager.java | 288 ++++++++++++- .../share/SharePartitionManagerTest.java | 380 +++++++++++++++++- .../server/share/SharePartitionTest.java | 17 +- 4 files changed, 671 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index dc21dfcb7b5ce..024f65fb7043b 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -16,8 +16,6 @@ */ package kafka.server.share; -import kafka.server.ReplicaManager; - import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.InvalidRecordStateException; @@ -230,11 +228,6 @@ public static RecordState forId(byte id) { */ private int stateEpoch; - /** - * The replica manager is used to get the earliest offset of the share partition, so we can adjust the start offset. - */ - private final ReplicaManager replicaManager; - SharePartition( String groupId, TopicIdPartition topicIdPartition, @@ -243,8 +236,7 @@ public static RecordState forId(byte id) { int recordLockDurationMs, Timer timer, Time time, - Persister persister, - ReplicaManager replicaManager + Persister persister ) { this.groupId = groupId; this.topicIdPartition = topicIdPartition; @@ -258,7 +250,6 @@ public static RecordState forId(byte id) { this.timer = timer; this.time = time; this.persister = persister; - this.replicaManager = replicaManager; // Initialize the partition. initialize(); } @@ -575,12 +566,18 @@ void updateCacheAndOffsets(long logStartOffset) { } /** - * Checks if the number of records between startOffset and endOffset exceeds the record max - * in-flight limit. + * Checks if the records can be acquired for the share partition. The records can be acquired if + * the number of records in-flight is less than the max in-flight messages. Or if the fetch is + * to happen somewhere in between the record states cached in the share partition i.e. re-acquire + * the records that are already fetched before. * - * @return A boolean which indicates whether additional messages can be fetched for share partition. + * @return A boolean which indicates whether more records can be acquired or not. */ - boolean canFetchRecords() { + boolean canAcquireRecords() { + if (nextFetchOffset() != endOffset() + 1) { + return true; + } + lock.readLock().lock(); long numRecords; try { diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 7aa16cce63cd4..59bbfc3ed68a9 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -17,6 +17,7 @@ package kafka.server.share; import kafka.server.FetchSession; +import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; @@ -25,7 +26,11 @@ import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.message.ShareAcknowledgeResponseData; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ShareFetchMetadata; import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; @@ -36,21 +41,35 @@ import org.apache.kafka.server.share.ShareSession; import org.apache.kafka.server.share.ShareSessionCache; import org.apache.kafka.server.share.ShareSessionKey; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.SystemTimerReaper; +import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.storage.internals.log.FetchParams; +import org.apache.kafka.storage.internals.log.FetchPartitionData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import scala.Option; +import scala.Tuple2; +import scala.jdk.javaapi.CollectionConverters; +import scala.runtime.BoxedUnit; /** * The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions. @@ -95,6 +114,11 @@ public class SharePartitionManager implements AutoCloseable { */ private final int recordLockDurationMs; + /** + * The timer is used to schedule the records lock timeout. + */ + private final Timer timer; + /** * The max in flight messages is the maximum number of messages that can be in flight at any one time per share-partition. */ @@ -119,7 +143,16 @@ public SharePartitionManager( int maxInFlightMessages, Persister persister ) { - this(replicaManager, time, cache, new ConcurrentHashMap<>(), recordLockDurationMs, maxDeliveryCount, maxInFlightMessages, persister); + this( + replicaManager, + time, + cache, + new ConcurrentHashMap<>(), + recordLockDurationMs, + maxDeliveryCount, + maxInFlightMessages, + persister + ); } SharePartitionManager( @@ -139,6 +172,8 @@ public SharePartitionManager( this.fetchQueue = new ConcurrentLinkedQueue<>(); this.processFetchQueueLock = new AtomicBoolean(false); this.recordLockDurationMs = recordLockDurationMs; + this.timer = new SystemTimerReaper("share-group-lock-timeout-reaper", + new SystemTimer("share-group-lock-timeout")); this.maxDeliveryCount = maxDeliveryCount; this.maxInFlightMessages = maxInFlightMessages; this.persister = persister; @@ -167,7 +202,10 @@ public CompletableFuture> future = new CompletableFuture<>(); - future.completeExceptionally(new UnsupportedOperationException("Not implemented yet")); + ShareFetchPartitionData shareFetchPartitionData = new ShareFetchPartitionData(fetchParams, groupId, memberId, + topicIdPartitions, future, partitionMaxBytes); + fetchQueue.add(shareFetchPartitionData); + maybeProcessFetchQueue(); return future; } @@ -302,14 +340,14 @@ public ShareFetchContext newContext(String groupId, Map partitions) { - return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled()); - } - + /** + * The cachedTopicIdPartitionsInShareSession method is used to get the cached topic-partitions in the share session. + * + * @param groupId The group id in the share fetch request. + * @param memberId The member id in the share fetch request. + * + * @return The list of cached topic-partitions in the share session. + */ public List cachedTopicIdPartitionsInShareSession(String groupId, Uuid memberId) { ShareSessionKey key = shareSessionKey(groupId, memberId); ShareSession shareSession = cache.get(key); @@ -318,8 +356,8 @@ public List cachedTopicIdPartitionsInShareSession(String group } List cachedTopicIdPartitions = new ArrayList<>(); shareSession.partitionMap().forEach(cachedSharePartition -> cachedTopicIdPartitions.add( - new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition() - )))); + new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition() + )))); return cachedTopicIdPartitions; } @@ -328,6 +366,202 @@ public void close() throws Exception { // TODO: Provide Implementation } + private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) { + return new ShareSessionKey(groupId, memberId); + } + + private static String partitionsToLogString(Collection partitions) { + return FetchSession.partitionsToLogString(partitions, log.isTraceEnabled()); + } + + /** + * Recursive function to process all the fetch requests present inside the fetch queue + */ + private void maybeProcessFetchQueue() { + if (!processFetchQueueLock.compareAndSet(false, true)) { + // The queue is already being processed hence avoid re-triggering. + return; + } + + // Initialize the topic partitions for which the fetch should be attempted. + Map topicPartitionData = new LinkedHashMap<>(); + ShareFetchPartitionData shareFetchPartitionData = fetchQueue.poll(); + if (shareFetchPartitionData == null) { + // No more requests to process, so release the lock. Though we should not reach here as the lock + // is acquired only when there are requests in the queue. But still, it's safe to release the lock. + releaseProcessFetchQueueLock(); + return; + } + + try { + shareFetchPartitionData.topicIdPartitions.forEach(topicIdPartition -> { + SharePartitionKey sharePartitionKey = sharePartitionKey( + shareFetchPartitionData.groupId, + topicIdPartition + ); + SharePartition sharePartition = partitionCacheMap.computeIfAbsent(sharePartitionKey, + k -> new SharePartition(shareFetchPartitionData.groupId, topicIdPartition, maxInFlightMessages, maxDeliveryCount, + recordLockDurationMs, timer, time, persister)); + int partitionMaxBytes = shareFetchPartitionData.partitionMaxBytes.getOrDefault(topicIdPartition, 0); + // Add the share partition to the list of partitions to be fetched only if we can + // acquire the fetch lock on it. + if (sharePartition.maybeAcquireFetchLock()) { + // If the share partition is already at capacity, we should not attempt to fetch. + if (sharePartition.canAcquireRecords()) { + topicPartitionData.put( + topicIdPartition, + new FetchRequest.PartitionData( + topicIdPartition.topicId(), + sharePartition.nextFetchOffset(), + 0, + partitionMaxBytes, + Optional.empty() + ) + ); + } else { + sharePartition.releaseFetchLock(); + log.info("Record lock partition limit exceeded for SharePartition with key {}, " + + "cannot acquire more records", sharePartitionKey); + } + } + }); + + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request and + // will re-fetch for the client in next poll. + shareFetchPartitionData.future.complete(Collections.emptyMap()); + // Though if no partitions can be locked then there must be some other request which + // is in-flight and should release the lock. But it's safe to release the lock as + // the lock on share partition already exists which facilitates correct behaviour + // with multiple requests from queue being processed. + releaseProcessFetchQueueLock(); + return; + } + + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetchPartitionData.groupId, shareFetchPartitionData.fetchParams); + + replicaManager.fetchMessages( + shareFetchPartitionData.fetchParams, + CollectionConverters.asScala( + topicPartitionData.entrySet().stream().map(entry -> + new Tuple2<>(entry.getKey(), entry.getValue())).collect(Collectors.toList()) + ), + QuotaFactory.UnboundedQuota$.MODULE$, + responsePartitionData -> { + log.trace("Data successfully retrieved by replica manager: {}", responsePartitionData); + List> responseData = CollectionConverters.asJava( + responsePartitionData); + processFetchResponse(shareFetchPartitionData, responseData).whenComplete( + (result, throwable) -> { + if (throwable != null) { + log.error("Error processing fetch response for share partitions", throwable); + shareFetchPartitionData.future.completeExceptionally(throwable); + } else { + shareFetchPartitionData.future.complete(result); + } + // Releasing the lock to move ahead with the next request in queue. + releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet()); + }); + return BoxedUnit.UNIT; + }); + + // If there are more requests in the queue, then process them. + if (!fetchQueue.isEmpty()) + maybeProcessFetchQueue(); + + } catch (Exception e) { + // In case exception occurs then release the locks so queue can be further processed. + log.error("Error processing fetch queue for share partitions", e); + releaseFetchQueueAndPartitionsLock(shareFetchPartitionData.groupId, topicPartitionData.keySet()); + } + } + + // Visible for testing. + CompletableFuture> processFetchResponse( + ShareFetchPartitionData shareFetchPartitionData, + List> responseData + ) { + Map> futures = new HashMap<>(); + responseData.forEach(data -> { + TopicIdPartition topicIdPartition = data._1; + FetchPartitionData fetchPartitionData = data._2; + + SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey(shareFetchPartitionData.groupId, topicIdPartition)); + futures.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId, fetchPartitionData) + .handle((acquiredRecords, throwable) -> { + log.trace("Acquired records for topicIdPartition: {} with share fetch data: {}, records: {}", + topicIdPartition, shareFetchPartitionData, acquiredRecords); + ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(topicIdPartition.partition()); + + if (throwable != null) { + partitionData.setErrorCode(Errors.forException(throwable).code()); + return partitionData; + } + + if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { + // In case we get OFFSET_OUT_OF_RANGE error, that's because the LSO is later than the fetch offset. + // So, we would update the start and end offset of the share partition and still return an empty + // response and let the client retry the fetch. This way we do not lose out on the data that + // would be returned for other share partitions in the fetch request. + sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition)); + partitionData + .setPartitionIndex(topicIdPartition.partition()) + .setRecords(null) + .setErrorCode(Errors.NONE.code()) + .setAcquiredRecords(Collections.emptyList()) + .setAcknowledgeErrorCode(Errors.NONE.code()); + return partitionData; + } + + // Maybe, in the future, check if no records are acquired, and we want to retry + // replica manager fetch. Depends on the share partition manager implementation, + // if we want parallel requests for the same share partition or not. + partitionData + .setPartitionIndex(topicIdPartition.partition()) + .setRecords(fetchPartitionData.records) + .setErrorCode(fetchPartitionData.error.code()) + .setAcquiredRecords(acquiredRecords) + .setAcknowledgeErrorCode(Errors.NONE.code()); + return partitionData; + })); + }); + + return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).thenApply(v -> { + Map processedResult = new HashMap<>(); + futures.forEach((topicIdPartition, future) -> processedResult.put(topicIdPartition, future.join())); + return processedResult; + }); + } + + // Visible for testing. + void releaseFetchQueueAndPartitionsLock(String groupId, Set topicIdPartitions) { + topicIdPartitions.forEach(tp -> partitionCacheMap.get(sharePartitionKey(groupId, tp)).releaseFetchLock()); + releaseProcessFetchQueueLock(); + } + + private void releaseProcessFetchQueueLock() { + processFetchQueueLock.set(false); + } + + private SharePartitionKey sharePartitionKey(String groupId, TopicIdPartition topicIdPartition) { + return new SharePartitionKey(groupId, topicIdPartition); + } + + /** + * The method is used to get the offset for the earliest timestamp for the topic-partition. + * + * @return The offset for the earliest timestamp. + */ + private long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition) { + // TODO: We need to know the isolation level from group configs, for now we are passing Option.empty() for isolationLevel + Option timestampAndOffset = replicaManager.fetchOffsetForTimestamp( + topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(), + Optional.empty(), true); + return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset; + } + /** * The SharePartitionKey is used to uniquely identify a share partition. The key is made up of the * share group id, the topic id and the partition id. The key is used to store the SharePartition @@ -359,12 +593,38 @@ else if (obj == null || getClass() != obj.getClass()) return groupId.equals(that.groupId) && Objects.equals(topicIdPartition, that.topicIdPartition); } } + + @Override + public String toString() { + return "SharePartitionKey{" + + "groupId='" + groupId + + ", topicIdPartition=" + topicIdPartition + + '}'; + } } /** * The ShareFetchPartitionData class is used to store the fetch parameters for a share fetch request. */ - private static class ShareFetchPartitionData { - // TODO: Provide Implementation + // Visible for testing + static class ShareFetchPartitionData { + private final FetchParams fetchParams; + private final String groupId; + private final String memberId; + private final List topicIdPartitions; + private final CompletableFuture> future; + private final Map partitionMaxBytes; + + public ShareFetchPartitionData(FetchParams fetchParams, String groupId, String memberId, + List topicIdPartitions, + CompletableFuture> future, + Map partitionMaxBytes) { + this.fetchParams = fetchParams; + this.groupId = groupId; + this.memberId = memberId; + this.topicIdPartitions = topicIdPartitions; + this.future = future; + this.partitionMaxBytes = partitionMaxBytes; + } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 6bb20c51bdc10..68459cf09b7e9 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -17,17 +17,23 @@ package kafka.server.share; import kafka.server.ReplicaManager; +import kafka.server.ReplicaQuota; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidShareSessionEpochException; import org.apache.kafka.common.errors.ShareSessionNotFoundException; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.message.ShareFetchResponseData.PartitionData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ShareFetchMetadata; import org.apache.kafka.common.requests.ShareFetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; @@ -37,9 +43,18 @@ import org.apache.kafka.server.group.share.Persister; import org.apache.kafka.server.share.ShareSessionCache; import org.apache.kafka.server.share.ShareSessionKey; - +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.SystemTimerReaper; +import org.apache.kafka.server.util.timer.Timer; +import org.apache.kafka.storage.internals.log.FetchIsolation; +import org.apache.kafka.storage.internals.log.FetchParams; +import org.apache.kafka.storage.internals.log.FetchPartitionData; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; import java.util.ArrayList; import java.util.Arrays; @@ -49,7 +64,17 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import scala.Tuple2; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -58,7 +83,13 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; @Timeout(120) public class SharePartitionManagerTest { @@ -66,9 +97,23 @@ public class SharePartitionManagerTest { private static final int RECORD_LOCK_DURATION_MS = 30000; private static final int MAX_DELIVERY_COUNT = 5; private static final short MAX_IN_FLIGHT_MESSAGES = 200; + private static final int PARTITION_MAX_BYTES = 40000; + + private static Timer mockTimer; private static final List EMPTY_PART_LIST = Collections.unmodifiableList(new ArrayList<>()); + @BeforeEach + public void setUp() { + mockTimer = new SystemTimerReaper("sharePartitionManagerTestReaper", + new SystemTimer("sharePartitionManagerTestTimer")); + } + + @AfterEach + public void tearDown() throws Exception { + mockTimer.close(); + } + @Test public void testNewContextReturnsFinalContext() { SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().build(); @@ -843,6 +888,339 @@ public void testSharePartitionKey() { assertNotEquals(sharePartitionKey1, null); } + @Test + public void testMultipleSequentialShareFetches() { + String groupId = "grp"; + Uuid memberId1 = Uuid.randomUuid(); + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + Uuid fooId = Uuid.randomUuid(); + Uuid barId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0)); + TopicIdPartition tp3 = new TopicIdPartition(barId, new TopicPartition("bar", 1)); + TopicIdPartition tp4 = new TopicIdPartition(fooId, new TopicPartition("foo", 2)); + TopicIdPartition tp5 = new TopicIdPartition(barId, new TopicPartition("bar", 2)); + TopicIdPartition tp6 = new TopicIdPartition(fooId, new TopicPartition("foo", 3)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp4, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp5, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp6, PARTITION_MAX_BYTES); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withReplicaManager(replicaManager) + .build(); + + doAnswer(invocation -> { + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); + return null; + }).when(replicaManager).fetchMessages(any(), any(), any(ReplicaQuota.class), any()); + + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, Arrays.asList(tp0, tp1, tp2, tp3), partitionMaxBytes); + Mockito.verify(replicaManager, times(1)).fetchMessages( + any(), any(), any(ReplicaQuota.class), any()); + + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, Collections.singletonList(tp4), partitionMaxBytes); + Mockito.verify(replicaManager, times(2)).fetchMessages( + any(), any(), any(ReplicaQuota.class), any()); + + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, Arrays.asList(tp5, tp6), partitionMaxBytes); + Mockito.verify(replicaManager, times(3)).fetchMessages( + any(), any(), any(ReplicaQuota.class), any()); + } + + @Test + public void testProcessFetchResponse() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + Uuid topicId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp0), + k -> new SharePartition(groupId, tp0, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, + RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp1), + k -> new SharePartition(groupId, tp1, MAX_IN_FLIGHT_MESSAGES, MAX_DELIVERY_COUNT, + RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + CompletableFuture> future = new CompletableFuture<>(); + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, + Arrays.asList(tp0, tp1), future, partitionMaxBytes); + + MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); + + MemoryRecords records1 = MemoryRecords.withRecords(100L, Compression.NONE, + new SimpleRecord("0".getBytes(), "v".getBytes()), + new SimpleRecord("1".getBytes(), "v".getBytes()), + new SimpleRecord("2".getBytes(), "v".getBytes()), + new SimpleRecord(null, "value".getBytes())); + + List> responseData = new ArrayList<>(); + responseData.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, + records, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false))); + responseData.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 100L, + records1, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false))); + CompletableFuture> result = + sharePartitionManager.processFetchResponse(shareFetchPartitionData, responseData); + + assertTrue(result.isDone()); + Map resultData = result.join(); + assertEquals(2, resultData.size()); + assertTrue(resultData.containsKey(tp0)); + assertTrue(resultData.containsKey(tp1)); + assertEquals(0, resultData.get(tp0).partitionIndex()); + assertEquals(1, resultData.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode()); + assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)), + resultData.get(tp0).acquiredRecords()); + assertEquals(Collections.singletonList(new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(100).setLastOffset(103).setDeliveryCount((short) 1)), + resultData.get(tp1).acquiredRecords()); + } + + @Test + public void testProcessFetchResponseWithEmptyRecords() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + Uuid topicId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp0), + k -> new SharePartition(groupId, tp0, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, + RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp1), + k -> new SharePartition(groupId, tp1, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, + RECORD_LOCK_DURATION_MS, mockTimer, new MockTime(), NoOpShareStatePersister.getInstance())); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).build(); + + CompletableFuture> future = new CompletableFuture<>(); + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, memberId, + Arrays.asList(tp0, tp1), future, partitionMaxBytes); + + List> responseData = new ArrayList<>(); + responseData.add(new Tuple2<>(tp0, new FetchPartitionData(Errors.NONE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false))); + responseData.add(new Tuple2<>(tp1, new FetchPartitionData(Errors.NONE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false))); + CompletableFuture> result = + sharePartitionManager.processFetchResponse(shareFetchPartitionData, responseData); + + assertTrue(result.isDone()); + Map resultData = result.join(); + assertEquals(2, resultData.size()); + assertTrue(resultData.containsKey(tp0)); + assertTrue(resultData.containsKey(tp1)); + assertEquals(0, resultData.get(tp0).partitionIndex()); + assertEquals(1, resultData.get(tp1).partitionIndex()); + assertEquals(Errors.NONE.code(), resultData.get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), resultData.get(tp1).errorCode()); + assertEquals(Collections.emptyList(), resultData.get(tp0).acquiredRecords()); + assertEquals(Collections.emptyList(), resultData.get(tp1).acquiredRecords()); + } + + @Test + public void testMultipleConcurrentShareFetches() throws InterruptedException { + + String groupId = "grp"; + Uuid memberId1 = Uuid.randomUuid(); + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + Uuid fooId = Uuid.randomUuid(); + Uuid barId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0)); + TopicIdPartition tp3 = new TopicIdPartition(barId, new TopicPartition("bar", 1)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp3, PARTITION_MAX_BYTES); + + final Time time = new MockTime(0, System.currentTimeMillis(), 0); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp0), + k -> new SharePartition(groupId, tp0, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, + RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp1), + k -> new SharePartition(groupId, tp1, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, + RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp2), + k -> new SharePartition(groupId, tp2, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, + RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); + partitionCacheMap.computeIfAbsent(new SharePartitionManager.SharePartitionKey(groupId, tp3), + k -> new SharePartition(groupId, tp3, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, + RECORD_LOCK_DURATION_MS, mockTimer, time, NoOpShareStatePersister.getInstance())); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).withTime(time).withReplicaManager(replicaManager).build(); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + SharePartition sp3 = mock(SharePartition.class); + + when(sp0.nextFetchOffset()).thenReturn((long) 1, (long) 15, (long) 6, (long) 30, (long) 25); + when(sp1.nextFetchOffset()).thenReturn((long) 4, (long) 1, (long) 18, (long) 5); + when(sp2.nextFetchOffset()).thenReturn((long) 10, (long) 25, (long) 26); + when(sp3.nextFetchOffset()).thenReturn((long) 20, (long) 15, (long) 23, (long) 16); + + doAnswer(invocation -> { + assertEquals(1, sp0.nextFetchOffset()); + assertEquals(4, sp1.nextFetchOffset()); + assertEquals(10, sp2.nextFetchOffset()); + assertEquals(20, sp3.nextFetchOffset()); + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); + return null; + }).doAnswer(invocation -> { + assertEquals(15, sp0.nextFetchOffset()); + assertEquals(1, sp1.nextFetchOffset()); + assertEquals(25, sp2.nextFetchOffset()); + assertEquals(15, sp3.nextFetchOffset()); + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); + return null; + }).doAnswer(invocation -> { + assertEquals(6, sp0.nextFetchOffset()); + assertEquals(18, sp1.nextFetchOffset()); + assertEquals(26, sp2.nextFetchOffset()); + assertEquals(23, sp3.nextFetchOffset()); + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); + return null; + }).doAnswer(invocation -> { + assertEquals(30, sp0.nextFetchOffset()); + assertEquals(5, sp1.nextFetchOffset()); + assertEquals(26, sp2.nextFetchOffset()); + assertEquals(16, sp3.nextFetchOffset()); + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); + return null; + }).doAnswer(invocation -> { + assertEquals(25, sp0.nextFetchOffset()); + assertEquals(5, sp1.nextFetchOffset()); + assertEquals(26, sp2.nextFetchOffset()); + assertEquals(16, sp3.nextFetchOffset()); + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); + return null; + }).when(replicaManager).fetchMessages(any(), any(), any(ReplicaQuota.class), any()); + + int threadCount = 100; + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + + try { + for (int i = 0; i != threadCount; ++i) { + executorService.submit(() -> { + sharePartitionManager.fetchMessages(groupId, memberId1.toString(), fetchParams, Arrays.asList(tp0, tp1, tp2, tp3), partitionMaxBytes); + }); + // We are blocking the main thread at an interval of 10 threads so that the currently running executorService threads can complete. + if (i % 10 == 0) + executorService.awaitTermination(50, TimeUnit.MILLISECONDS); + } + } finally { + if (!executorService.awaitTermination(50, TimeUnit.MILLISECONDS)) + executorService.shutdown(); + } + // We are checking the number of replicaManager fetchMessages() calls + Mockito.verify(replicaManager, atMost(100)).fetchMessages( + any(), any(), any(ReplicaQuota.class), any()); + Mockito.verify(replicaManager, atLeast(10)).fetchMessages( + any(), any(), any(ReplicaQuota.class), any()); + } + + @Test + public void testReplicaManagerFetchShouldNotProceed() { + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + Uuid fooId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + + SharePartition sp0 = mock(SharePartition.class); + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(false); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build(); + + CompletableFuture> future = + sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, Collections.singletonList(tp0), partitionMaxBytes); + Mockito.verify(replicaManager, times(0)).fetchMessages( + any(), any(), any(ReplicaQuota.class), any()); + Map result = future.join(); + assertEquals(0, result.size()); + } + + @Test + public void testReplicaManagerFetchShouldProceed() { + String groupId = "grp"; + Uuid memberId = Uuid.randomUuid(); + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, 0, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + Uuid fooId = Uuid.randomUuid(); + TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); + Map partitionMaxBytes = new HashMap<>(); + partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + + SharePartition sp0 = mock(SharePartition.class); + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + Map partitionCacheMap = new HashMap<>(); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build(); + + sharePartitionManager.fetchMessages(groupId, memberId.toString(), fetchParams, Collections.singletonList(tp0), partitionMaxBytes); + // Since the nextFetchOffset does not point to endOffset + 1, i.e. some of the records in the cachedState are AVAILABLE, + // even though the maxInFlightMessages limit is exceeded, replicaManager.fetchMessages should be called + Mockito.verify(replicaManager, times(1)).fetchMessages( + any(), any(), any(ReplicaQuota.class), any()); + } + private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() { return new ShareFetchResponseData.PartitionData().setPartitionIndex(0); } diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 6fbd01e7c08d6..874b08d7dc94a 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -16,7 +16,6 @@ */ package kafka.server.share; -import kafka.server.ReplicaManager; import kafka.server.share.SharePartition.InFlightState; import kafka.server.share.SharePartition.RecordState; @@ -81,7 +80,6 @@ public class SharePartitionTest { private static Timer mockTimer; private static final Time MOCK_TIME = new MockTime(); private static final short MAX_IN_FLIGHT_MESSAGES = 200; - private static final ReplicaManager REPLICA_MANAGER = Mockito.mock(ReplicaManager.class); @BeforeEach public void setUp() { @@ -434,31 +432,31 @@ public void testNextFetchOffsetWithFindAndCachedState() { } @Test - public void testCanFetchRecordsWithEmptyCache() { + public void testCanAcquireRecordsWithEmptyCache() { SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(1).build(); - assertTrue(sharePartition.canFetchRecords()); + assertTrue(sharePartition.canAcquireRecords()); } @Test - public void testCanFetchRecordsWithCachedDataAndLimitNotReached() { + public void testCanAcquireRecordsWithCachedDataAndLimitNotReached() { SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(6).build(); sharePartition.acquire( MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); // Limit not reached as only 6 in-flight messages is the limit. - assertTrue(sharePartition.canFetchRecords()); + assertTrue(sharePartition.canAcquireRecords()); } @Test - public void testCanFetchRecordsWithCachedDataAndLimitReached() { + public void testCanAcquireRecordsWithCachedDataAndLimitReached() { SharePartition sharePartition = SharePartitionBuilder.builder().withMaxInflightMessages(1).build(); sharePartition.acquire( MEMBER_ID, new FetchPartitionData(Errors.NONE, 20, 3, memoryRecords(5), Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false)); // Limit reached as only one in-flight message is the limit. - assertFalse(sharePartition.canFetchRecords()); + assertFalse(sharePartition.canAcquireRecords()); } @Test @@ -1108,7 +1106,6 @@ private static class SharePartitionBuilder { private int maxDeliveryCount = MAX_DELIVERY_COUNT; private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES; private Persister persister = NoOpShareStatePersister.getInstance(); - private ReplicaManager replicaManager = REPLICA_MANAGER; private SharePartitionBuilder withMaxInflightMessages(int maxInflightMessages) { this.maxInflightMessages = maxInflightMessages; @@ -1126,7 +1123,7 @@ public static SharePartitionBuilder builder() { public SharePartition build() { return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, maxInflightMessages, maxDeliveryCount, - acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister, replicaManager); + acquisitionLockTimeoutMs, mockTimer, MOCK_TIME, persister); } } }