Skip to content

Commit

Permalink
KAFKA-18022: fetchOffsetMetadata handling for minBytes estimation in …
Browse files Browse the repository at this point in the history
…both common/uncommon cases of share fetch (apache#17825)

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
adixitconfluent authored Nov 16, 2024
1 parent a8f84ca commit dfa5aa5
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 34 deletions.
21 changes: 12 additions & 9 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void onExpiration() {
*/
@Override
public void onComplete() {
// We are utilizing lock so that onComplete doesn't do a dirty read for global variables -
// We are utilizing lock so that onComplete doesn't do a dirty read for instance variables -
// partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread.
lock.lock();
log.trace("Completing the delayed share fetch request for group {}, member {}, "
Expand Down Expand Up @@ -165,7 +165,7 @@ public boolean tryComplete() {
// replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
maybeUpdateFetchOffsetMetadata(replicaManagerReadResponse);
maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData)) {
partitionsAcquired = topicPartitionData;
partitionsAlreadyFetched = replicaManagerReadResponse;
Expand Down Expand Up @@ -239,21 +239,22 @@ LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> acquirablePartitions
}

private LinkedHashMap<TopicIdPartition, LogReadResult> maybeReadFromLog(LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData) {
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsMissingFetchOffsetMetadata = new LinkedHashMap<>();
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> partitionsNotMatchingFetchOffsetMetadata = new LinkedHashMap<>();
topicPartitionData.forEach((topicIdPartition, partitionData) -> {
SharePartition sharePartition = sharePartitions.get(topicIdPartition);
if (sharePartition.fetchOffsetMetadata().isEmpty()) {
partitionsMissingFetchOffsetMetadata.put(topicIdPartition, partitionData);
if (sharePartition.fetchOffsetMetadata(partitionData.fetchOffset).isEmpty()) {
partitionsNotMatchingFetchOffsetMetadata.put(topicIdPartition, partitionData);
}
});
if (partitionsMissingFetchOffsetMetadata.isEmpty()) {
if (partitionsNotMatchingFetchOffsetMetadata.isEmpty()) {
return new LinkedHashMap<>();
}
// We fetch data from replica manager corresponding to the topic partitions that have missing fetch offset metadata.
return readFromLog(partitionsMissingFetchOffsetMetadata);
return readFromLog(partitionsNotMatchingFetchOffsetMetadata);
}

private void maybeUpdateFetchOffsetMetadata(
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData,
LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponseData) {
for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponseData.entrySet()) {
TopicIdPartition topicIdPartition = entry.getKey();
Expand All @@ -264,7 +265,9 @@ private void maybeUpdateFetchOffsetMetadata(
replicaManagerLogReadResult, topicIdPartition);
continue;
}
sharePartition.updateFetchOffsetMetadata(Optional.of(replicaManagerLogReadResult.info().fetchOffsetMetadata));
sharePartition.updateFetchOffsetMetadata(
topicPartitionData.get(topicIdPartition).fetchOffset,
replicaManagerLogReadResult.info().fetchOffsetMetadata);
}
}

Expand All @@ -290,7 +293,7 @@ private boolean isMinBytesSatisfied(LinkedHashMap<TopicIdPartition, FetchRequest

SharePartition sharePartition = sharePartitions.get(topicIdPartition);

Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata();
Optional<LogOffsetMetadata> optionalFetchOffsetMetadata = sharePartition.fetchOffsetMetadata(partitionData.fetchOffset);
if (optionalFetchOffsetMetadata.isEmpty() || optionalFetchOffsetMetadata.get() == LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)
continue;
LogOffsetMetadata fetchOffsetMetadata = optionalFetchOffsetMetadata.get();
Expand Down
59 changes: 40 additions & 19 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ public static RecordState forId(byte id) {
private long endOffset;

/**
* We maintain the latest fetch offset metadata to estimate the minBytes requirement more efficiently.
* We maintain the latest fetch offset and its metadata to estimate the minBytes requirement more efficiently.
*/
private Optional<LogOffsetMetadata> fetchOffsetMetadata;
private final OffsetMetadata fetchOffsetMetadata;

/**
* The state epoch is used to track the version of the state of the share partition.
Expand Down Expand Up @@ -347,6 +347,7 @@ public static RecordState forId(byte id) {
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
this.fetchOffsetMetadata = new OffsetMetadata();
}

/**
Expand Down Expand Up @@ -451,12 +452,12 @@ public CompletableFuture<Void> maybeInitialize() {
// If the cachedState is not empty, findNextFetchOffset flag is set to true so that any AVAILABLE records
// in the cached state are not missed
findNextFetchOffset.set(true);
updateEndOffsetAndResetFetchOffsetMetadata(cachedState.lastEntry().getValue().lastOffset());
endOffset = cachedState.lastEntry().getValue().lastOffset();
// In case the persister read state RPC result contains no AVAILABLE records, we can update cached state
// and start/end offsets.
maybeUpdateCachedStateAndOffsets();
} else {
updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
endOffset = startOffset;
}
// Set the partition state to Active and complete the future.
partitionState = SharePartitionState.ACTIVE;
Expand Down Expand Up @@ -943,7 +944,7 @@ void updateCacheAndOffsets(long logStartOffset) {
// If the cached state is empty, then the start and end offset will be the new log start offset.
// This can occur during the initialization of share partition if LSO has moved.
startOffset = logStartOffset;
updateEndOffsetAndResetFetchOffsetMetadata(logStartOffset);
endOffset = logStartOffset;
return;
}

Expand All @@ -961,7 +962,7 @@ void updateCacheAndOffsets(long logStartOffset) {
// This case means that the cached state is completely fresh now.
// Example scenario - batch of 0-10 in acquired state in cached state, then LSO moves to 15,
// then endOffset should be 15 as well.
updateEndOffsetAndResetFetchOffsetMetadata(startOffset);
endOffset = startOffset;
}

// Note -
Expand Down Expand Up @@ -1192,7 +1193,7 @@ private AcquiredRecords acquireNewBatchRecords(
if (cachedState.firstKey() == firstAcquiredOffset) {
startOffset = firstAcquiredOffset;
}
updateEndOffsetAndResetFetchOffsetMetadata(lastAcquiredOffset);
endOffset = lastAcquiredOffset;
return new AcquiredRecords()
.setFirstOffset(firstAcquiredOffset)
.setLastOffset(lastAcquiredOffset)
Expand Down Expand Up @@ -1592,27 +1593,21 @@ private Optional<Throwable> acknowledgeCompleteBatch(
return Optional.empty();
}

// The caller of this function is expected to hold lock.writeLock() when calling this method.
protected void updateEndOffsetAndResetFetchOffsetMetadata(long updatedEndOffset) {
endOffset = updatedEndOffset;
fetchOffsetMetadata = Optional.empty();
}

protected void updateFetchOffsetMetadata(Optional<LogOffsetMetadata> fetchOffsetMetadata) {
protected void updateFetchOffsetMetadata(long nextFetchOffset, LogOffsetMetadata logOffsetMetadata) {
lock.writeLock().lock();
try {
this.fetchOffsetMetadata = fetchOffsetMetadata;
fetchOffsetMetadata.updateOffsetMetadata(nextFetchOffset, logOffsetMetadata);
} finally {
lock.writeLock().unlock();
}
}

protected Optional<LogOffsetMetadata> fetchOffsetMetadata() {
protected Optional<LogOffsetMetadata> fetchOffsetMetadata(long nextFetchOffset) {
lock.readLock().lock();
try {
if (findNextFetchOffset.get())
if (fetchOffsetMetadata.offsetMetadata() == null || fetchOffsetMetadata.offset() != nextFetchOffset)
return Optional.empty();
return fetchOffsetMetadata;
return Optional.of(fetchOffsetMetadata.offsetMetadata());
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -1696,7 +1691,7 @@ private void maybeUpdateCachedStateAndOffsets() {
long lastCachedOffset = cachedState.lastEntry().getValue().lastOffset();
if (lastOffsetAcknowledged == lastCachedOffset) {
startOffset = lastCachedOffset + 1; // The next offset that will be fetched and acquired in the share partition
updateEndOffsetAndResetFetchOffsetMetadata(lastCachedOffset + 1);
endOffset = lastCachedOffset + 1;
cachedState.clear();
// Nothing further to do.
return;
Expand Down Expand Up @@ -2426,4 +2421,30 @@ public String toString() {
")";
}
}

/**
* FetchOffsetMetadata class is used to cache offset and its log metadata.
*/
static final class OffsetMetadata {
// This offset could be different from offsetMetadata.messageOffset if it's in the middle of a batch.
private long offset;
private LogOffsetMetadata offsetMetadata;

OffsetMetadata() {
offset = -1;
}

long offset() {
return offset;
}

LogOffsetMetadata offsetMetadata() {
return offsetMetadata;
}

void updateOffsetMetadata(long offset, LogOffsetMetadata offsetMetadata) {
this.offset = offset;
this.offsetMetadata = offsetMetadata;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -165,7 +166,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
// We are testing the case when the share partition is getting fetched for the first time, so for the first time
// the fetchOffsetMetadata will return empty. Post the readFromLog call, the fetchOffsetMetadata will be
// populated for the share partition, which has 1 as the positional difference, so it doesn't satisfy the minBytes(2).
when(sp0.fetchOffsetMetadata())
when(sp0.fetchOffsetMetadata(anyLong()))
.thenReturn(Optional.empty())
.thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
Expand Down Expand Up @@ -223,7 +224,7 @@ public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
// functionality to give the file position difference as 1 byte, so it doesn't satisfy the minBytes(2).
LogOffsetMetadata hwmOffsetMetadata = mock(LogOffsetMetadata.class);
when(hwmOffsetMetadata.positionDiff(any())).thenReturn(1);
when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(mock(LogOffsetMetadata.class)));
mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);

DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
Expand Down Expand Up @@ -271,7 +272,7 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() {
ShareAcquiredRecords.fromAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)));
doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());

when(sp0.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder()
.withShareFetchData(shareFetch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
Expand Down Expand Up @@ -1677,7 +1678,7 @@ public void testAcknowledgeCompletesDelayedShareFetchRequest() {
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 2);

// Initially you cannot acquire records for both sp1 and sp2.
Expand Down Expand Up @@ -1877,7 +1878,7 @@ public void testReleaseSessionCompletesDelayedShareFetchRequest() {
"TestShareFetch", mockTimer, mockReplicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(mockReplicaManager, delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(mockReplicaManager, tp1, 1);

// Initially you cannot acquire records for both sp1 and sp2.
Expand Down Expand Up @@ -2363,7 +2364,7 @@ public void testSharePartitionPartialInitializationFailure() throws Exception {
"TestShareFetch", mockTimer, replicaManager.localBrokerId(),
DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory);
when(sp1.fetchOffsetMetadata()).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1);

doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean());
Expand Down

0 comments on commit dfa5aa5

Please sign in to comment.