From 1e5e0d06b825923ccce47689d3cbf5bccd365a29 Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Thu, 11 Apr 2024 18:48:14 -0500 Subject: [PATCH] Fix bug in s3 sink dynamic bucket and catch invalid bucket message (#4413) Signed-off-by: Taylor Gray --- .../plugins/sink/s3/accumulator/BufferUtilities.java | 4 +++- .../plugins/sink/s3/grouping/S3GroupManager.java | 2 +- .../sink/s3/accumulator/BufferUtilitiesTest.java | 11 ++++++++--- .../plugins/sink/s3/grouping/S3GroupManagerTest.java | 11 ++++++++++- 4 files changed, 22 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java index 228801f065..ae02fd01bc 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilities.java @@ -18,6 +18,7 @@ class BufferUtilities { private static final Logger LOG = LoggerFactory.getLogger(BufferUtilities.class); static final String ACCESS_DENIED = "Access Denied"; + static final String INVALID_BUCKET = "The specified bucket is not valid"; static void putObjectOrSendToDefaultBucket(final S3Client s3Client, final RequestBody requestBody, @@ -29,7 +30,8 @@ static void putObjectOrSendToDefaultBucket(final S3Client s3Client, PutObjectRequest.builder().bucket(targetBucket).key(objectKey).build(), requestBody); } catch (final S3Exception e) { - if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) { + if (defaultBucket != null && + (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED) || e.getMessage().contains(INVALID_BUCKET))) { LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket); s3Client.putObject( PutObjectRequest.builder().bucket(defaultBucket).key(objectKey).build(), diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java index a7af39ed9e..5945b7f648 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManager.java @@ -74,7 +74,7 @@ public S3Group getOrCreateGroupForEvent(final Event event) { if (allGroups.containsKey(s3GroupIdentifier)) { return allGroups.get(s3GroupIdentifier); } else { - final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket()); + final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3GroupIdentifier::getFullBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket()); final OutputCodec outputCodec = codecFactory.provideCodec(); final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec); allGroups.put(s3GroupIdentifier, s3Group); diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java index fd2341636d..23de985b58 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/accumulator/BufferUtilitiesTest.java @@ -33,6 +33,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferUtilities.ACCESS_DENIED; +import static org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferUtilities.INVALID_BUCKET; @ExtendWith(MockitoExtension.class) public class BufferUtilitiesTest { @@ -121,14 +122,18 @@ private static class ExceptionsProvider implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext extensionContext) throws Exception { - final S3Exception s3Exception = mock(S3Exception.class); - when(s3Exception.getMessage()).thenReturn(UUID.randomUUID() + ACCESS_DENIED + UUID.randomUUID()); + final S3Exception accessDeniedException = mock(S3Exception.class); + when(accessDeniedException.getMessage()).thenReturn(UUID.randomUUID() + ACCESS_DENIED + UUID.randomUUID()); + + final S3Exception invalidBucketException = mock(S3Exception.class); + when(invalidBucketException.getMessage()).thenReturn(UUID.randomUUID() + INVALID_BUCKET + UUID.randomUUID()); final NoSuchBucketException noSuchBucketException = mock(NoSuchBucketException.class); return Stream.of( Arguments.arguments(noSuchBucketException), - Arguments.arguments(s3Exception) + Arguments.arguments(accessDeniedException), + Arguments.arguments(invalidBucketException) ); } } diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java index 1cfcb41e21..72afddb1ed 100644 --- a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/grouping/S3GroupManagerTest.java @@ -71,7 +71,13 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { final Buffer buffer = mock(Buffer.class); when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket))) - .thenReturn(buffer); + .thenAnswer(invocation -> { + Supplier bucketSupplier = invocation.getArgument(1); + Supplier objectKeySupplier = invocation.getArgument(2); + bucketSupplier.get(); + objectKeySupplier.get(); + return buffer; + }); final OutputCodec outputCodec = mock(OutputCodec.class); when(codecFactory.provideCodec()).thenReturn(outputCodec); @@ -90,6 +96,9 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() { assertThat(groups.contains(result), equalTo(true)); assertThat(objectUnderTest.hasNoGroups(), equalTo(false)); + + verify(s3GroupIdentifier).getFullBucketName(); + verify(s3GroupIdentifier).getGroupIdentifierFullObjectKey(); } @Test