From f261e3c0eaae430ec109383a50aed2c40d1a26a9 Mon Sep 17 00:00:00 2001 From: Dinu John <86094133+dinujoh@users.noreply.github.com> Date: Thu, 9 May 2024 18:03:26 -0500 Subject: [PATCH] Handle exception in Stream Acknowledgement Manager. Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com> --- .../stream/StreamAcknowledgementManager.java | 51 +++++++++++-------- .../plugins/mongo/stream/StreamWorker.java | 7 +-- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java index c6d7b0633e..466fd6037d 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java @@ -53,30 +53,41 @@ private void monitorAcknowledgment(final ExecutorService executorService, final long lastCheckpointTime = System.currentTimeMillis(); CheckpointStatus lastCheckpointStatus = null; while (!Thread.currentThread().isInterrupted()) { - final CheckpointStatus checkpointStatus = checkpoints.peek(); - if (checkpointStatus != null) { - if (checkpointStatus.isAcknowledged()) { - lastCheckpointStatus = checkpoints.poll(); - ackStatus.remove(checkpointStatus.getResumeToken()); - if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { - LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); - partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); - lastCheckpointTime = System.currentTimeMillis(); + try { + final CheckpointStatus checkpointStatus = checkpoints.peek(); + if (checkpointStatus != null) { + if (checkpointStatus.isAcknowledged()) { + lastCheckpointStatus = checkpoints.poll(); + ackStatus.remove(checkpointStatus.getResumeToken()); + if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { + LOG.debug("Perform regular checkpointing for resume token {} at record count {}", checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); + partitionCheckpoint.checkpoint(checkpointStatus.getResumeToken(), checkpointStatus.getRecordCount()); + lastCheckpointTime = System.currentTimeMillis(); + } + } else { + LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken()); + final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now()); + // Acknowledgement not received for the checkpoint after twice ack wait time + if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) { + // Give up partition and should interrupt parent thread to stop processing stream + if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) { + partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); + } + LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken()); + partitionCheckpoint.giveUpPartition(); + break; + } } } else { - LOG.debug("Checkpoint not complete for resume token {}", checkpointStatus.getResumeToken()); - final Duration ackWaitDuration = Duration.between(Instant.ofEpochMilli(checkpointStatus.getCreateTimestamp()), Instant.now()); - // Acknowledgement not received for the checkpoint after twice ack wait time - if (ackWaitDuration.getSeconds() >= partitionAcknowledgmentTimeout.getSeconds() * 2) { - // Give up partition and should interrupt parent thread to stop processing stream - if (lastCheckpointStatus != null && lastCheckpointStatus.isAcknowledged()) { - partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); - } - LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken()); - partitionCheckpoint.giveUpPartition(); - break; + if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { + LOG.debug("No records processed. Checkpoint to extend the lease of the worker"); + partitionCheckpoint.checkpoint(null, 0); + lastCheckpointTime = System.currentTimeMillis(); } } + } catch (Exception e) { + LOG.warn("Exception monitoring acknowledgments. The stream record processing will start from previous checkpoint.", e); + break; } try { diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java index b187a94f26..4af4590a71 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamWorker.java @@ -60,6 +60,7 @@ public class StreamWorker { static final String BYTES_PROCESSED = "bytesProcessed"; private static final long MILLI_SECOND = 1_000_000L; private static final String UPDATE_DESCRIPTION = "updateDescription"; + private static final int BUFFER_WRITE_TIMEOUT_MILLIS = 15_000; private final RecordBufferWriter recordBufferWriter; private final PartitionKeyRecordConverter recordConverter; private final DataStreamPartitionCheckpoint partitionCheckpoint; @@ -332,7 +333,7 @@ private void writeToBuffer() { private void bufferWriteAndCheckpointStream() { long lastCheckpointTime = System.currentTimeMillis(); while (!Thread.currentThread().isInterrupted() && !stopWorker) { - if (!records.isEmpty() && lastBufferWriteTime < Instant.now().minusMillis(checkPointIntervalInMs).toEpochMilli()) { + if (!records.isEmpty() && lastBufferWriteTime < Instant.now().minusMillis(BUFFER_WRITE_TIMEOUT_MILLIS).toEpochMilli()) { lock.lock(); LOG.debug("Writing to buffer due to buffer write delay"); try { @@ -354,7 +355,7 @@ private void bufferWriteAndCheckpointStream() { LOG.debug("Perform regular checkpoint for resume token {} at record count {}", lastLocalCheckpoint, lastLocalRecordCount); partitionCheckpoint.checkpoint(lastLocalCheckpoint, lastLocalRecordCount); } catch (Exception e) { - LOG.warn("Exception checkpointing the current state. New thread should start the stream from previous checkpoint.", e); + LOG.warn("Exception checkpointing the current state. The stream record processing will start from previous checkpoint.", e); stop(); } finally { lock.unlock();; @@ -364,7 +365,7 @@ private void bufferWriteAndCheckpointStream() { } try { - Thread.sleep(checkPointIntervalInMs); + Thread.sleep(BUFFER_WRITE_TIMEOUT_MILLIS); } catch (InterruptedException ex) { break; }