diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java index 731803f7cf..e1ebea9fa0 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/S3ScanPartitionCreationSupplier.java @@ -116,10 +116,14 @@ private List listFilteredS3ObjectsForBucket(final List PartitionIdentifier.builder().withPartitionKey(String.format(BUCKET_OBJECT_PARTITION_KEY_FORMAT, bucket, objectKey)).build()) .collect(Collectors.toList())); + LOG.info("Found page of {} objects from bucket {}", listObjectsV2Response.keyCount(), bucket); + mostRecentLastModifiedTimestamp = getMostRecentLastModifiedTimestamp(listObjectsV2Response, mostRecentLastModifiedTimestamp); } while (listObjectsV2Response.isTruncated()); globalStateMap.put(bucket, Objects.nonNull(mostRecentLastModifiedTimestamp) ? mostRecentLastModifiedTimestamp.toString() : null); + + LOG.info("Returning partitions for {} S3 objects from bucket {}", allPartitionIdentifiers.size(), bucket); return allPartitionIdentifiers; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java index 1257efeaf4..39a251cddc 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/ScanObjectWorker.java @@ -42,6 +42,7 @@ public class ScanObjectWorker implements Runnable{ private static final Logger LOG = LoggerFactory.getLogger(ScanObjectWorker.class); private static final int STANDARD_BACKOFF_MILLIS = 30_000; + private static final int RETRY_BACKOFF_ON_EXCEPTION_MILLIS = 5_000; static final int ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS = Integer.MAX_VALUE; static final String ACKNOWLEDGEMENT_SET_CALLBACK_METRIC_NAME = "acknowledgementSetCallbackCounter"; @@ -99,7 +100,18 @@ public ScanObjectWorker(final S3Client s3Client, @Override public void run() { while (!shouldStopProcessing) { - startProcessingObject(STANDARD_BACKOFF_MILLIS); + + try { + startProcessingObject(STANDARD_BACKOFF_MILLIS); + } catch (final Exception e) { + LOG.error("Received an exception while processing S3 objects, backing off and retrying", e); + try { + Thread.sleep(RETRY_BACKOFF_ON_EXCEPTION_MILLIS); + } catch (InterruptedException ex) { + LOG.error("S3 Scan worker thread interrupted while backing off.", ex); + } + } + } }