Skip to content

Commit

Permalink
KAFKA-18084 Added usage for rollback state while SharePartition acqui…
Browse files Browse the repository at this point in the history
…res records (apache#17965)

Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
adixitconfluent authored Dec 6, 2024
1 parent 755adf8 commit 15206d5
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 45 deletions.
60 changes: 35 additions & 25 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ public ShareAcquiredRecords acquire(
// acquire subset of offsets from the in-flight batch but only if the
// complete batch is available yet. Hence, do a pre-check to avoid exploding
// the in-flight offset tracking unnecessarily.
if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
if (inFlightBatch.batchState() != RecordState.AVAILABLE || inFlightBatch.batchHasOngoingStateTransition()) {
log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}"
+ " skipping offset tracking for batch as well.", groupId,
topicIdPartition, inFlightBatch);
Expand All @@ -709,7 +709,7 @@ public ShareAcquiredRecords acquire(
}

// The in-flight batch is a full match hence change the state of the complete batch.
if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
if (inFlightBatch.batchState() != RecordState.AVAILABLE || inFlightBatch.batchHasOngoingStateTransition()) {
log.trace("The batch is not available to acquire in share partition: {}-{}, skipping: {}",
groupId, topicIdPartition, inFlightBatch);
continue;
Expand Down Expand Up @@ -1280,10 +1280,9 @@ private int acquireSubsetBatchRecords(
break;
}

if (offsetState.getValue().state != RecordState.AVAILABLE) {
log.trace("The offset is not available skipping, offset: {} batch: {}"
+ " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch,
groupId, topicIdPartition);
if (offsetState.getValue().state != RecordState.AVAILABLE || offsetState.getValue().hasOngoingStateTransition()) {
log.trace("The offset {} is not available in share partition: {}-{}, skipping: {}",
offsetState.getKey(), groupId, topicIdPartition, inFlightBatch);
continue;
}

Expand Down Expand Up @@ -2066,8 +2065,8 @@ private void releaseAcquisitionLockOnTimeoutForCompleteBatch(InFlightBatch inFli
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state.id, (short) updateResult.deliveryCount));

// Update acquisition lock timeout task for the batch to null since it is completed now.
updateResult.updateAcquisitionLockTimeoutTask(null);
// Cancel the acquisition lock timeout task for the batch since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
Expand Down Expand Up @@ -2113,8 +2112,8 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
updateResult.state.id, (short) updateResult.deliveryCount));

// Update acquisition lock timeout task for the offset to null since it is completed now.
updateResult.updateAcquisitionLockTimeoutTask(null);
// Cancel the acquisition lock timeout task for the offset since it is completed now.
updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
Expand Down Expand Up @@ -2251,10 +2250,7 @@ long lastOffset() {

// Visible for testing.
RecordState batchState() {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
return batchState.state;
return inFlightState().state;
}

// Visible for testing.
Expand All @@ -2275,22 +2271,27 @@ int batchDeliveryCount() {

// Visible for testing.
AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
return batchState.acquisitionLockTimeoutTask;
return inFlightState().acquisitionLockTimeoutTask;
}

// Visible for testing.
NavigableMap<Long, InFlightState> offsetState() {
return offsetState;
}

private void archiveBatch(String newMemberId) {
private InFlightState inFlightState() {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
batchState.archive(newMemberId);
return batchState;
}

private boolean batchHasOngoingStateTransition() {
return inFlightState().hasOngoingStateTransition();
}

private void archiveBatch(String newMemberId) {
inFlightState().archive(newMemberId);
}

private InFlightState tryUpdateBatchState(RecordState newState, boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
Expand Down Expand Up @@ -2335,10 +2336,7 @@ private void maybeInitializeOffsetStateUpdate() {
}

private void updateAcquisitionLockTimeout(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
if (batchState == null) {
throw new IllegalStateException("The batch state is not available as the offset state is maintained");
}
batchState.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
inFlightState().acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}

@Override
Expand Down Expand Up @@ -2397,7 +2395,10 @@ TimerTask acquisitionLockTimeoutTask() {
return acquisitionLockTimeoutTask;
}

void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) {
void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask acquisitionLockTimeoutTask) throws IllegalArgumentException {
if (this.acquisitionLockTimeoutTask != null) {
throw new IllegalArgumentException("Existing acquisition lock timeout exists, cannot override.");
}
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}

Expand All @@ -2406,6 +2407,15 @@ void cancelAndClearAcquisitionLockTimeoutTask() {
acquisitionLockTimeoutTask = null;
}

private boolean hasOngoingStateTransition() {
if (rollbackState == null) {
// This case could occur when the batch/offset hasn't transitioned even once or the state transitions have
// been committed.
return false;
}
return rollbackState.state != null;
}

/**
* Try to update the state of the records. The state of the records can only be updated if the
* new state is allowed to be transitioned from old state. The delivery count is not incremented
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5421,6 +5421,78 @@ public void testMaybeInitializeWhenReadStateRpcReturnsZeroAvailableRecords() {
assertEquals(734, sharePartition.endOffset());
}

@Test
public void testAcquireWithWriteShareGroupStateDelay() {
Persister persister = Mockito.mock(Persister.class);
mockPersisterReadStateMethod(persister);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withState(SharePartitionState.ACTIVE)
.build();

// Mock persister writeState method so that sharePartition.isWriteShareGroupStateSuccessful() returns true with a delay of 5 sec.
WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class);
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
new TopicData<>(TOPIC_ID_PARTITION.topicId(), Collections.singletonList(
PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message())))));

CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>();
// persister.writeState RPC will not complete instantaneously due to which commit won't happen for acknowledged offsets.
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);

sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));

sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));

List<ShareAcknowledgementBatch> acknowledgementBatches = new ArrayList<>();
acknowledgementBatches.add(new ShareAcknowledgementBatch(2, 3, Collections.singletonList((byte) 2)));
acknowledgementBatches.add(new ShareAcknowledgementBatch(5, 9, Collections.singletonList((byte) 2)));
// Acknowledge 2-3, 5-9 offsets with RELEASE acknowledge type.
sharePartition.acknowledge(MEMBER_ID, acknowledgementBatches);

assertEquals(2, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(0L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(1L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(2L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(3L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(4L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());

// Even though offsets 2-3, 5-9 are in available state, but they won't be acquired since they are still in transition from ACQUIRED
// to AVAILABLE state as the write state RPC has not completed yet, so the commit hasn't happened yet.
sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));

assertEquals(3, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(0L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(1L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(2L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(0L).offsetState().get(3L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(4L).state());
assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(5L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());

// persister.writeState RPC will complete now. This is going to commit all the acknowledged batches. Hence, their
// rollBack state will become null and they will be available for acquire again.
future.complete(writeShareGroupStateResult);
sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0),
Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false));
assertEquals(3, sharePartition.cachedState().size());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(0L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(1L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(2L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(3L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(4L).state());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState());
assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState());
}

private List<AcquiredRecords> fetchAcquiredRecords(ShareAcquiredRecords shareAcquiredRecords, int expectedOffsetCount) {
assertNotNull(shareAcquiredRecords);
assertEquals(expectedOffsetCount, shareAcquiredRecords.count());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,38 +835,57 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), Collections.singletonList(9), Collections.singletonList(1)))

var fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
val fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)

// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)

// Send a third Share Fetch request with piggybacked acknowledgements
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
.setFirstOffset(0)
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)

shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
assertEquals(1, shareFetchResponseData.responses().get(0).partitions().size())

expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(List(0L, 10L).asJava, List(9L, 19L).asJava, List(2, 1).asJava))

val acquiredRecords : util.List[AcquiredRecords] = new util.ArrayList[AcquiredRecords]()
var releaseAcknowledgementSent = false

TestUtils.waitUntilTrue(() => {
shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
if (releaseAcknowledgementSent) {
// For fourth share fetch request onwards
acknowledgementsMapForFetch = Map.empty
} else {
// Send a third Share Fetch request with piggybacked acknowledgements
acknowledgementsMapForFetch = Map(topicIdPartition -> List(new ShareFetchRequestData.AcknowledgementBatch()
.setFirstOffset(0)
.setLastOffset(9)
.setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // Release the records
releaseAcknowledgementSent = true
}
shareFetchRequest = createShareFetchRequest(groupId, metadata, MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
shareFetchResponse = connectAndReceive[ShareFetchResponse](shareFetchRequest)

shareFetchResponseData = shareFetchResponse.data()
assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
assertEquals(1, shareFetchResponseData.responses().size())
assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
val responseSize = shareFetchResponseData.responses().get(0).partitions().size()
if (responseSize > 0) {
acquiredRecords.addAll(shareFetchResponseData.responses().get(0).partitions().get(0).acquiredRecords())
}
// There should be 2 acquired record batches finally -
// 1. batch containing 0-9 offsets which were initially acknowledged as RELEASED.
// 2. batch containing 10-19 offsets which were produced in the second produceData function call.
acquiredRecords.size() == 2

}, "Share fetch request failed", 5000)

// All the records from offsets 0 to 19 will be fetched. Records from 0 to 9 will have delivery count as 2 because
// they are re delivered, and records from 10 to 19 will have delivery count as 1 because they are newly acquired

fetchPartitionData = shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData, fetchPartitionData)
assertTrue(expectedFetchPartitionData.acquiredRecords().containsAll(acquiredRecords) &&
acquiredRecords.containsAll(expectedFetchPartitionData.acquiredRecords()))
}

@ClusterTests(
Expand Down

0 comments on commit 15206d5

Please sign in to comment.