From dfa5aa5484a12e18cc763e064a70391103524165 Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Sat, 16 Nov 2024 20:56:30 +0530 Subject: [PATCH] KAFKA-18022: fetchOffsetMetadata handling for minBytes estimation in both common/uncommon cases of share fetch (#17825) Reviewers: Jun Rao --- .../kafka/server/share/DelayedShareFetch.java | 21 ++++--- .../kafka/server/share/SharePartition.java | 59 +++++++++++++------ .../server/share/DelayedShareFetchTest.java | 7 ++- .../share/SharePartitionManagerTest.java | 7 ++- 4 files changed, 60 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 9f1ad3ef651b1..476b309a1eb4e 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -90,7 +90,7 @@ public void onExpiration() { */ @Override public void onComplete() { - // We are utilizing lock so that onComplete doesn't do a dirty read for global variables - + // We are utilizing lock so that onComplete doesn't do a dirty read for instance variables - // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " @@ -165,7 +165,7 @@ public boolean tryComplete() { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); - maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse); + maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) { partitionsAcquired = topicPartitionData; partitionsAlreadyFetched = replicaManagerReadResponse; @@ -239,21 +239,22 @@ LinkedHashMap acquirablePartitions } private LinkedHashMap maybeReadFromLog(LinkedHashMap topicPartitionData) { - LinkedHashMap partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>(); + LinkedHashMap partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>(); topicPartitionData.forEach((topicIdPartition, partitionData) -> { SharePartition sharePartition = sharePartitions.get(topicIdPartition); - if (sharePartition.fetchOffsetMetadata().isEmpty()) { - partitionsMissingFetchOffsetMetadata.put(topicIdPartition, partitionData); + if (sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) { + partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, partitionData); } }); - if (partitionsMissingFetchOffsetMetadata.isEmpty()) { + if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) { return new LinkedHashMap<>(); } // We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata. - return readFromLog(partitionsMissingFetchOffsetMetadata); + return readFromLog(partitionsNotMatchingFetchOffsetMetadata); } private void maybeUpdateFetchOffsetMetadata( + LinkedHashMap topicPartitionData, LinkedHashMap replicaManagerReadResponseData) { for (Map.Entry entry : replicaManagerReadResponseData.entrySet()) { TopicIdPartition topicIdPartition = entry.getKey(); @@ -264,7 +265,9 @@ private void maybeUpdateFetchOffsetMetadata( replicaManagerLogReadResult, topicIdPartition); continue; } - sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata)); + sharePartition.updateFetchOffsetMetadata( + topicPartitionData.get(topicIdPartition).fetchOffset, + replicaManagerLogReadResult.info().fetchOffsetMetadata); } } @@ -290,7 +293,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(); + Optional optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(partitionData.fetchOffset); if (optionalFetchOffsetMetadata.isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) continue; LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get(); diff --git a/core/src/main/java/kafka/server/share/SharePartition.java b/core/src/main/java/kafka/server/share/SharePartition.java index 632cb1e316919..ddc023a53151e 100644 --- a/core/src/main/java/kafka/server/share/SharePartition.java +++ b/core/src/main/java/kafka/server/share/SharePartition.java @@ -280,9 +280,9 @@ public static RecordState forId(byte id) { private long endOffset; /** - * We maintain the latest fetch offset metadata to estimate the minBytes requirement more efficiently. + * We maintain the latest fetch offset and its metadata to estimate the minBytes requirement more efficiently. */ - private Optional fetchOffsetMetadata; + private final OffsetMetadata fetchOffsetMetadata; /** * The state epoch is used to track the version of the state of the share partition. @@ -347,6 +347,7 @@ public static RecordState forId(byte id) { this.partitionState = sharePartitionState; this.replicaManager = replicaManager; this.groupConfigManager = groupConfigManager; + this.fetchOffsetMetadata = new OffsetMetadata(); } /** @@ -451,12 +452,12 @@ public CompletableFuture maybeInitialize() { // If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records // in the cached state are not missed findNextFetchOffset.set(true); - updateEndOffsetAndResetFetchOffsetMetadata(cachedState.lastEntry().getValue().lastOffset()); + endOffset = cachedState.lastEntry().getValue().lastOffset(); // In case the persister read state RPC result contains no AVAILABLE records, we can update cached state // and start/end offsets. maybeUpdateCachedStateAndOffsets(); } else { - updateEndOffsetAndResetFetchOffsetMetadata(startOffset); + endOffset = startOffset; } // Set the partition state to Active and complete the future. partitionState = SharePartitionState.ACTIVE; @@ -943,7 +944,7 @@ void updateCacheAndOffsets(long logStartOffset) { // If the cached state is empty, then the start and end offset will be the new log start offset. // This can occur during the initialization of share partition if LSO has moved. startOffset = logStartOffset; - updateEndOffsetAndResetFetchOffsetMetadata(logStartOffset); + endOffset = logStartOffset; return; } @@ -961,7 +962,7 @@ void updateCacheAndOffsets(long logStartOffset) { // This case means that the cached state is completely fresh now. // Example scenario - batch of 0-10 in acquired state in cached state, then LSO moves to 15, // then endOffset should be 15 as well. - updateEndOffsetAndResetFetchOffsetMetadata(startOffset); + endOffset = startOffset; } // Note - @@ -1192,7 +1193,7 @@ private AcquiredRecords acquireNewBatchRecords( if (cachedState.firstKey() == firstAcquiredOffset) { startOffset = firstAcquiredOffset; } - updateEndOffsetAndResetFetchOffsetMetadata(lastAcquiredOffset); + endOffset = lastAcquiredOffset; return new AcquiredRecords() .setFirstOffset(firstAcquiredOffset) .setLastOffset(lastAcquiredOffset) @@ -1592,27 +1593,21 @@ private Optional acknowledgeCompleteBatch( return Optional.empty(); } - // The caller of this function is expected to hold lock.writeLock() when calling this method. - protected void updateEndOffsetAndResetFetchOffsetMetadata(long updatedEndOffset) { - endOffset = updatedEndOffset; - fetchOffsetMetadata = Optional.empty(); - } - - protected void updateFetchOffsetMetadata(Optional fetchOffsetMetadata) { + protected void updateFetchOffsetMetadata(long nextFetchOffset, LogOffsetMetadata logOffsetMetadata) { lock.writeLock().lock(); try { - this.fetchOffsetMetadata = fetchOffsetMetadata; + fetchOffsetMetadata.updateOffsetMetadata(nextFetchOffset, logOffsetMetadata); } finally { lock.writeLock().unlock(); } } - protected Optional fetchOffsetMetadata() { + protected Optional fetchOffsetMetadata(long nextFetchOffset) { lock.readLock().lock(); try { - if (findNextFetchOffset.get()) + if (fetchOffsetMetadata.offsetMetadata() == null || fetchOffsetMetadata.offset() != nextFetchOffset) return Optional.empty(); - return fetchOffsetMetadata; + return Optional.of(fetchOffsetMetadata.offsetMetadata()); } finally { lock.readLock().unlock(); } @@ -1696,7 +1691,7 @@ private void maybeUpdateCachedStateAndOffsets() { long lastCachedOffset = cachedState.lastEntry().getValue().lastOffset(); if (lastOffsetAcknowledged == lastCachedOffset) { startOffset = lastCachedOffset + 1; // The next offset that will be fetched and acquired in the share partition - updateEndOffsetAndResetFetchOffsetMetadata(lastCachedOffset + 1); + endOffset = lastCachedOffset + 1; cachedState.clear(); // Nothing further to do. return; @@ -2426,4 +2421,30 @@ public String toString() { ")"; } } + + /** + * FetchOffsetMetadata class is used to cache offset and its log metadata. + */ + static final class OffsetMetadata { + // This offset could be different from offsetMetadata.messageOffset if it's in the middle of a batch. + private long offset; + private LogOffsetMetadata offsetMetadata; + + OffsetMetadata() { + offset = -1; + } + + long offset() { + return offset; + } + + LogOffsetMetadata offsetMetadata() { + return offsetMetadata; + } + + void updateOffsetMetadata(long offset, LogOffsetMetadata offsetMetadata) { + this.offset = offset; + this.offsetMetadata = offsetMetadata; + } + } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 12b89cffd3773..6ef67afb1054c 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -65,6 +65,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -165,7 +166,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() { // We are testing the case when the share partition is getting fetched for the first time, so for the first time // the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be // populated for the share partition, which has 1 as the positional difference, so it doesn't satisfy the minBytes(2). - when(sp0.fetchOffsetMetadata()) + when(sp0.fetchOffsetMetadata(anyLong())) .thenReturn(Optional.empty()) .thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1); @@ -223,7 +224,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() { // functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2). LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class); when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1); - when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(mock(LogOffsetMetadata.class))); + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class))); mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() @@ -271,7 +272,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); - when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 06bb123f55099..eea0a75c56c6e 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -123,6 +123,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atMost; @@ -1677,7 +1678,7 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() { "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); - when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 2); // Initially you cannot acquire records for both sp1 and sp2. @@ -1877,7 +1878,7 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() { "TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory); - when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 1); // Initially you cannot acquire records for both sp1 and sp2. @@ -2363,7 +2364,7 @@ public void testSharePartitionPartialInitializationFailure() throws Exception { "TestShareFetch", mockTimer, replicaManager.localBrokerId(), DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); - when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0))); mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1); doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());