Skip to content

Commit

Permalink
Fix bug in s3 sink dynamic bucket and catch invalid bucket message (o…
Browse files Browse the repository at this point in the history
…pensearch-project#4413)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Apr 11, 2024
1 parent 2d50050 commit 1e5e0d0
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class BufferUtilities {
private static final Logger LOG = LoggerFactory.getLogger(BufferUtilities.class);

static final String ACCESS_DENIED = "Access Denied";
static final String INVALID_BUCKET = "The specified bucket is not valid";

static void putObjectOrSendToDefaultBucket(final S3Client s3Client,
final RequestBody requestBody,
Expand All @@ -29,7 +30,8 @@ static void putObjectOrSendToDefaultBucket(final S3Client s3Client,
PutObjectRequest.builder().bucket(targetBucket).key(objectKey).build(),
requestBody);
} catch (final S3Exception e) {
if (defaultBucket != null && (e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED))) {
if (defaultBucket != null &&
(e instanceof NoSuchBucketException || e.getMessage().contains(ACCESS_DENIED) || e.getMessage().contains(INVALID_BUCKET))) {
LOG.warn("Bucket {} could not be accessed, attempting to send to default_bucket {}", targetBucket, defaultBucket);
s3Client.putObject(
PutObjectRequest.builder().bucket(defaultBucket).key(objectKey).build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public S3Group getOrCreateGroupForEvent(final Event event) {
if (allGroups.containsKey(s3GroupIdentifier)) {
return allGroups.get(s3GroupIdentifier);
} else {
final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3SinkConfig::getBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket());
final Buffer bufferForNewGroup = bufferFactory.getBuffer(s3Client, s3GroupIdentifier::getFullBucketName, s3GroupIdentifier::getGroupIdentifierFullObjectKey, s3SinkConfig.getDefaultBucket());
final OutputCodec outputCodec = codecFactory.provideCodec();
final S3Group s3Group = new S3Group(s3GroupIdentifier, bufferForNewGroup, outputCodec);
allGroups.put(s3GroupIdentifier, s3Group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferUtilities.ACCESS_DENIED;
import static org.opensearch.dataprepper.plugins.sink.s3.accumulator.BufferUtilities.INVALID_BUCKET;

@ExtendWith(MockitoExtension.class)
public class BufferUtilitiesTest {
Expand Down Expand Up @@ -121,14 +122,18 @@ private static class ExceptionsProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) throws Exception {
final S3Exception s3Exception = mock(S3Exception.class);
when(s3Exception.getMessage()).thenReturn(UUID.randomUUID() + ACCESS_DENIED + UUID.randomUUID());
final S3Exception accessDeniedException = mock(S3Exception.class);
when(accessDeniedException.getMessage()).thenReturn(UUID.randomUUID() + ACCESS_DENIED + UUID.randomUUID());

final S3Exception invalidBucketException = mock(S3Exception.class);
when(invalidBucketException.getMessage()).thenReturn(UUID.randomUUID() + INVALID_BUCKET + UUID.randomUUID());

final NoSuchBucketException noSuchBucketException = mock(NoSuchBucketException.class);

return Stream.of(
Arguments.arguments(noSuchBucketException),
Arguments.arguments(s3Exception)
Arguments.arguments(accessDeniedException),
Arguments.arguments(invalidBucketException)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() {

final Buffer buffer = mock(Buffer.class);
when(bufferFactory.getBuffer(eq(s3Client), any(Supplier.class), any(Supplier.class), eq(defaultBucket)))
.thenReturn(buffer);
.thenAnswer(invocation -> {
Supplier<String> bucketSupplier = invocation.getArgument(1);
Supplier<String> objectKeySupplier = invocation.getArgument(2);
bucketSupplier.get();
objectKeySupplier.get();
return buffer;
});
final OutputCodec outputCodec = mock(OutputCodec.class);
when(codecFactory.provideCodec()).thenReturn(outputCodec);

Expand All @@ -90,6 +96,9 @@ void getOrCreateGroupForEvent_creates_expected_group_when_it_does_not_exist() {

assertThat(groups.contains(result), equalTo(true));
assertThat(objectUnderTest.hasNoGroups(), equalTo(false));

verify(s3GroupIdentifier).getFullBucketName();
verify(s3GroupIdentifier).getGroupIdentifierFullObjectKey();
}

@Test
Expand Down

0 comments on commit 1e5e0d0

Please sign in to comment.