Skip to content

Commit

Permalink
Handle exception in Stream Acknowledgement Manager.
Browse files Browse the repository at this point in the history
Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh committed May 9, 2024
1 parent 50b940b commit f261e3c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,41 @@ private void monitorAcknowledgment(final ExecutorService executorService, final
long lastCheckpointTime = System.currentTimeMillis();
CheckpointStatus lastCheckpointStatus = null;
while (!Thread.currentThread().isInterrupted()) {
final CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
try {
final CheckpointStatus checkpointStatus = checkpoints.peek();
if (checkpointStatus != null) {
if (checkpointStatus.isAcknowledged()) {
lastCheckpointStatus = checkpoints.poll();
ackStatus.remove(checkpointStatus.getResumeToken());
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount());
lastCheckpointTime = System.currentTimeMillis();
}
} else {
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
break;
}
}
} else {
LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken());
final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now());
// Acknowledgement not received for the checkpoint after twice ack wait time
if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) {
// Give up partition and should interrupt parent thread to stop processing stream
if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) {
partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount());
}
LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken());
partitionCheckpoint.giveUpPartition();
break;
if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) {
LOG.debug("No records processed. Checkpoint to extend the lease of the worker");
partitionCheckpoint.checkpoint(null, 0);
lastCheckpointTime = System.currentTimeMillis();
}
}
} catch (Exception e) {
LOG.warn("Exception monitoring acknowledgments. The stream record processing will start from previous checkpoint.", e);
break;
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class StreamWorker {
static final String BYTES_PROCESSED = "bytesProcessed";
private static final long MILLI_SECOND = 1_000_000L;
private static final String UPDATE_DESCRIPTION = "updateDescription";
private static final int BUFFER_WRITE_TIMEOUT_MILLIS = 15_000;
private final RecordBufferWriter recordBufferWriter;
private final PartitionKeyRecordConverter recordConverter;
private final DataStreamPartitionCheckpoint partitionCheckpoint;
Expand Down Expand Up @@ -332,7 +333,7 @@ private void writeToBuffer() {
private void bufferWriteAndCheckpointStream() {
long lastCheckpointTime = System.currentTimeMillis();
while (!Thread.currentThread().isInterrupted() && !stopWorker) {
if (!records.isEmpty() && lastBufferWriteTime < Instant.now().minusMillis(checkPointIntervalInMs).toEpochMilli()) {
if (!records.isEmpty() && lastBufferWriteTime < Instant.now().minusMillis(BUFFER_WRITE_TIMEOUT_MILLIS).toEpochMilli()) {
lock.lock();
LOG.debug("Writing to buffer due to buffer write delay");
try {
Expand All @@ -354,7 +355,7 @@ private void bufferWriteAndCheckpointStream() {
LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount);
partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount);
} catch (Exception e) {
LOG.warn("Exception checkpointing the current state. New thread should start the stream from previous checkpoint.", e);
LOG.warn("Exception checkpointing the current state. The stream record processing will start from previous checkpoint.", e);
stop();
} finally {
lock.unlock();;
Expand All @@ -364,7 +365,7 @@ private void bufferWriteAndCheckpointStream() {
}

try {
Thread.sleep(checkPointIntervalInMs);
Thread.sleep(BUFFER_WRITE_TIMEOUT_MILLIS);
} catch (InterruptedException ex) {
break;
}
Expand Down

0 comments on commit f261e3c

Please sign in to comment.