Skip to content

Commit

Permalink
BUG: Stop S3 source on InterruptedException (#3331)
Browse files Browse the repository at this point in the history
Stop S3 source on InterruptedException

Signed-off-by: Asif Sohail Mohammed <[email protected]>
(cherry picked from commit d55fb73)
  • Loading branch information
asifsmohammed authored and github-actions[bot] committed Sep 15, 2023
1 parent 3cf8f40 commit 0a34644
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class S3ScanService {
private final AcknowledgementSetManager acknowledgementSetManager;
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
private final PluginMetrics pluginMetrics;
private ScanObjectWorker scanObjectWorker;

public S3ScanService(final S3SourceConfig s3SourceConfig,
final S3ClientBuilderFactory s3ClientBuilderFactory,
Expand All @@ -60,13 +61,14 @@ public S3ScanService(final S3SourceConfig s3SourceConfig,
}

public void start() {
scanObjectWorkerThread = new Thread(new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics));
scanObjectWorker = new ScanObjectWorker(s3ClientBuilderFactory.getS3Client(),
getScanOptions(),s3ObjectHandler,bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, s3ObjectDeleteWorker, pluginMetrics);
scanObjectWorkerThread = new Thread(scanObjectWorker);
scanObjectWorkerThread.start();
}

public void stop() {
scanObjectWorkerThread.interrupt();
scanObjectWorker.stop();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class ScanObjectWorker implements Runnable{
private final AcknowledgementSetManager acknowledgementSetManager;

// Should there be a duration or time that is configured in the source to stop processing? Otherwise will only stop when data prepper is stopped
private boolean shouldStopProcessing = false;
private volatile boolean isStopped = false;
private final boolean deleteS3ObjectsOnRead;
private final S3ObjectDeleteWorker s3ObjectDeleteWorker;
private final PluginMetrics pluginMetrics;
Expand Down Expand Up @@ -100,15 +100,14 @@ public ScanObjectWorker(final S3Client s3Client,

@Override
public void run() {
while (!shouldStopProcessing) {

while (!isStopped) {
try {
startProcessingObject(STANDARD_BACKOFF_MILLIS);
} catch (final Exception e) {
LOG.error("Received an exception while processing S3 objects, backing off and retrying", e);
try {
Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS);
} catch (InterruptedException ex) {
} catch (final InterruptedException ex) {
LOG.error("S3 Scan worker thread interrupted while backing off.", ex);
return;
}
Expand All @@ -131,7 +130,7 @@ private void startProcessingObject(final int waitTimeMillis) {
try {
Thread.sleep(waitTimeMillis);
} catch (InterruptedException e) {
shouldStopProcessing = true;
LOG.error("S3 Scan worker thread interrupted while backing off.", e);
}
return;
}
Expand Down Expand Up @@ -174,8 +173,10 @@ private void startProcessingObject(final int waitTimeMillis) {
} catch (final PartitionNotOwnedException | PartitionNotFoundException | PartitionUpdateException e) {
LOG.warn("S3 scan object worker received an exception from the source coordinator. There is a potential for duplicate data from {}, giving up partition and getting next partition: {}", objectKey, e.getMessage());
sourceCoordinator.giveUpPartitions();
} catch (final ExecutionException | TimeoutException | InterruptedException e) {
} catch (final ExecutionException | TimeoutException e) {
LOG.error("Exception occurred while waiting for acknowledgement.", e);
} catch (final InterruptedException e) {
LOG.error("S3 Scan worker thread interrupted while processing S3 object.", e);
}
}

Expand All @@ -194,4 +195,9 @@ private Optional<DeleteObjectRequest> processS3Object(final S3ObjectReference s3
}
return Optional.empty();
}

void stop() {
isStopped = true;
Thread.currentThread().interrupt();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class SqsService {
private final AcknowledgementSetManager acknowledgementSetManager;

private Thread sqsWorkerThread;
private SqsWorker sqsWorker;

public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
final S3SourceConfig s3SourceConfig,
Expand All @@ -46,7 +47,8 @@ public SqsService(final AcknowledgementSetManager acknowledgementSetManager,
public void start() {
final Backoff backoff = Backoff.exponential(INITIAL_DELAY, MAXIMUM_DELAY).withJitter(JITTER_RATE)
.withMaxAttempts(Integer.MAX_VALUE);
sqsWorkerThread = new Thread(new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff));
sqsWorker = new SqsWorker(acknowledgementSetManager, sqsClient, s3Accessor, s3SourceConfig, pluginMetrics, backoff);
sqsWorkerThread = new Thread(sqsWorker);
sqsWorkerThread.start();
}

Expand All @@ -62,6 +64,6 @@ SqsClient createSqsClient(final AwsCredentialsProvider credentialsProvider) {
}

public void stop() {
sqsWorkerThread.interrupt();
sqsWorker.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ public class SqsWorker implements Runnable {
private final Timer sqsMessageDelayTimer;
private final Backoff standardBackoff;
private int failedAttemptCount;
private boolean endToEndAcknowledgementsEnabled;
private final boolean endToEndAcknowledgementsEnabled;
private final AcknowledgementSetManager acknowledgementSetManager;

private final ObjectMapper objectMapper = new ObjectMapper();
private volatile boolean isStopped = false;

public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,
final SqsClient sqsClient,
Expand Down Expand Up @@ -99,8 +100,7 @@ public SqsWorker(final AcknowledgementSetManager acknowledgementSetManager,

@Override
public void run() {

while (!Thread.currentThread().isInterrupted()) {
while (!isStopped) {
int messagesProcessed = 0;
try {
messagesProcessed = processSqsMessages();
Expand Down Expand Up @@ -336,4 +336,8 @@ private S3ObjectReference populateS3Reference(final String bucketName, final Str
.build();
}

void stop() {
isStopped = true;
Thread.currentThread().interrupt();
}
}

0 comments on commit 0a34644

Please sign in to comment.