Skip to content

Commit

Permalink
Update StreamAcknowledgementManagerTest
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed Apr 1, 2024
1 parent d16feb4 commit 9c7e6b8
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,21 @@ private void monitorCheckpoints(final ExecutorService executorService, final Thr
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime > checkPointIntervalInMs) {
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
}
} else {
LOG.info("Checkpoint not complete");
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait duration
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() > partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class StreamAcknowledgementManagerTest {

@BeforeEach
public void setup() {
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 100, 100);
streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0);
}

@Test
Expand Down Expand Up @@ -72,7 +72,7 @@ public void createAcknowledgementSet_enabled_ackSetWithAck() {
final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken);
assertThat(ackCheckpointStatus.isAcknowledged(), is(true));
await()
.atMost(Duration.ofSeconds(4)).untilAsserted(() ->
.atMost(Duration.ofSeconds(10)).untilAsserted(() ->
verify(partitionCheckpoint).checkpoint(resumeToken, recordCount));
assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue()));
}
Expand Down Expand Up @@ -106,7 +106,7 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAck() {
CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken2);
assertThat(ackCheckpointStatus.isAcknowledged(), is(true));
await()
.atMost(Duration.ofSeconds(4)).untilAsserted(() ->
.atMost(Duration.ofSeconds(10)).untilAsserted(() ->
verify(partitionCheckpoint).checkpoint(resumeToken2, recordCount2));
assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue()));
}
Expand Down Expand Up @@ -140,7 +140,7 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() {
CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken2);
assertThat(ackCheckpointStatus.isAcknowledged(), is(true));
await()
.atMost(Duration.ofSeconds(4)).untilAsserted(() ->
.atMost(Duration.ofSeconds(10)).untilAsserted(() ->
verifyNoInteractions(partitionCheckpoint));
assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1));
assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1));
Expand Down

0 comments on commit 9c7e6b8

Please sign in to comment.