diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java index 0cfd8fa257..c451d46c49 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParser.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import org.opensearch.dataprepper.model.configuration.DataPrepperVersion; +import org.opensearch.dataprepper.model.configuration.PipelineModel; import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,9 +15,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.SequenceInputStream; -import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -35,18 +35,8 @@ public PipelinesDataflowModelParser(final String pipelineConfigurationFileLocati } public PipelinesDataFlowModel parseConfiguration() { - try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) { - final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles, - PipelinesDataFlowModel.class); - - final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion(); - validateDataPrepperVersion(version); - - return pipelinesDataFlowModel; - } catch (IOException e) { - LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation); - throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e); - } + final List pipelinesDataFlowModels = parsePipelineConfigurationFiles(); + return mergePipelinesDataModels(pipelinesDataFlowModels); } private void validateDataPrepperVersion(final DataPrepperVersion version) { @@ -57,38 +47,61 @@ private void validateDataPrepperVersion(final DataPrepperVersion version) { } } - private InputStream mergePipelineConfigurationFiles() throws IOException { + private List parsePipelineConfigurationFiles() { final File configurationLocation = new File(pipelineConfigurationFileLocation); if (configurationLocation.isFile()) { - return new FileInputStream(configurationLocation); + return Stream.of(configurationLocation).map(this::parsePipelineConfigurationFile) + .filter(Objects::nonNull).collect(Collectors.toList()); } else if (configurationLocation.isDirectory()) { FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml")); - List configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter)) - .map(file -> { - InputStream inputStream; - try { - inputStream = new FileInputStream(file); - LOG.info("Reading pipeline configuration from {}", file.getName()); - } catch (FileNotFoundException e) { - inputStream = null; - LOG.warn("Pipeline configuration file {} not found", file.getName()); - } - return inputStream; - }) + List pipelinesDataFlowModels = Stream.of(configurationLocation.listFiles(yamlFilter)) + .map(this::parsePipelineConfigurationFile) .filter(Objects::nonNull) .collect(Collectors.toList()); - if (configurationFiles.isEmpty()) { + if (pipelinesDataFlowModels.isEmpty()) { LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); throw new ParseException( format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); } - return new SequenceInputStream(Collections.enumeration(configurationFiles)); + return pipelinesDataFlowModels; } else { LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation); throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation)); } } + + private PipelinesDataFlowModel parsePipelineConfigurationFile(final File pipelineConfigurationFile) { + try (final InputStream pipelineConfigurationInputStream = new FileInputStream(pipelineConfigurationFile)) { + LOG.info("Reading pipeline configuration from {}", pipelineConfigurationFile.getName()); + final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(pipelineConfigurationInputStream, + PipelinesDataFlowModel.class); + + final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion(); + validateDataPrepperVersion(version); + + return pipelinesDataFlowModel; + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + LOG.warn("Pipeline configuration file {} not found", pipelineConfigurationFile.getName()); + return null; + } + LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation); + throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e); + } + } + + private PipelinesDataFlowModel mergePipelinesDataModels( + final List pipelinesDataFlowModels) { + final Map pipelinesDataFlowModelMap = pipelinesDataFlowModels.stream() + .map(PipelinesDataFlowModel::getPipelines) + .flatMap(pipelines -> pipelines.entrySet().stream()) + .collect(Collectors.toUnmodifiableMap( + Map.Entry::getKey, + Map.Entry::getValue + )); + return new PipelinesDataFlowModel(pipelinesDataFlowModelMap); + } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java index aaa79eba34..a5a13c0753 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/parser/PipelinesDataflowModelParserTest.java @@ -6,6 +6,7 @@ import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -46,6 +47,7 @@ void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_m final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration(); assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(), equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES)); + assertThat(actualPipelinesDataFlowModel.getDataPrepperVersion(), nullValue()); } @Test diff --git a/data-prepper-main/build.gradle b/data-prepper-main/build.gradle index 4cccbe14d5..63b6c57b99 100644 --- a/data-prepper-main/build.gradle +++ b/data-prepper-main/build.gradle @@ -27,3 +27,21 @@ jar { 'Main-Class': 'org.opensearch.dataprepper.DataPrepperExecute') } } + +configurations { + allDependencyJars { + canBeConsumed = true + canBeResolved = true + extendsFrom runtimeClasspath + } +} + +artifacts { + /** + * Shares a configuration which has all the necessary dependencies and can be used + * as part of the release to load all dependencies. + * https://docs.gradle.org/current/userguide/cross_project_publications.html + */ + allDependencyJars(jar) +} + diff --git a/data-prepper-plugins/aws-plugin-api/build.gradle b/data-prepper-plugins/aws-plugin-api/build.gradle index 53876284be..10c360f9da 100644 --- a/data-prepper-plugins/aws-plugin-api/build.gradle +++ b/data-prepper-plugins/aws-plugin-api/build.gradle @@ -1,6 +1,7 @@ dependencies { implementation 'software.amazon.awssdk:auth' + implementation 'software.amazon.awssdk:apache-client' } test { @@ -12,7 +13,7 @@ jacocoTestCoverageVerification { violationRules { rule { limit { - minimum = 1.0 + minimum = 0.99 } } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/AwsRequestSigningApacheInterceptor.java b/data-prepper-plugins/aws-plugin-api/src/main/java/org/opensearch/dataprepper/aws/api/AwsRequestSigningApache4Interceptor.java similarity index 85% rename from data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/AwsRequestSigningApacheInterceptor.java rename to data-prepper-plugins/aws-plugin-api/src/main/java/org/opensearch/dataprepper/aws/api/AwsRequestSigningApache4Interceptor.java index 5d10779a3e..0f5e3ab3f3 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/AwsRequestSigningApacheInterceptor.java +++ b/data-prepper-plugins/aws-plugin-api/src/main/java/org/opensearch/dataprepper/aws/api/AwsRequestSigningApache4Interceptor.java @@ -1,16 +1,8 @@ /* - * Copyright OpenSearch Contributors. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with - * the License. A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.sink.opensearch; +package org.opensearch.dataprepper.aws.api; import org.apache.http.Header; import org.apache.http.HttpEntityEnclosingRequest; @@ -48,7 +40,7 @@ * An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer} * and {@link AwsCredentialsProvider}. */ -final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor { +public final class AwsRequestSigningApache4Interceptor implements HttpRequestInterceptor { /** * Constant to check content-length @@ -90,10 +82,10 @@ final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor * @param awsCredentialsProvider source of AWS credentials for signing * @param region signing region */ - public AwsRequestSigningApacheInterceptor(final String service, - final Signer signer, - final AwsCredentialsProvider awsCredentialsProvider, - final Region region) { + public AwsRequestSigningApache4Interceptor(final String service, + final Signer signer, + final AwsCredentialsProvider awsCredentialsProvider, + final Region region) { this.service = Objects.requireNonNull(service); this.signer = Objects.requireNonNull(signer); this.awsCredentialsProvider = Objects.requireNonNull(awsCredentialsProvider); @@ -107,10 +99,10 @@ public AwsRequestSigningApacheInterceptor(final String service, * @param awsCredentialsProvider source of AWS credentials for signing * @param region signing region */ - public AwsRequestSigningApacheInterceptor(final String service, - final Signer signer, - final AwsCredentialsProvider awsCredentialsProvider, - final String region) { + public AwsRequestSigningApache4Interceptor(final String service, + final Signer signer, + final AwsCredentialsProvider awsCredentialsProvider, + final String region) { this(service, signer, awsCredentialsProvider, Region.of(region)); } @@ -177,7 +169,7 @@ private URI buildUri(final HttpContext context, URIBuilder uriBuilder) throws IO } return uriBuilder.build(); - } catch (URISyntaxException e) { + } catch (final Exception e) { throw new IOException("Invalid URI", e); } } diff --git a/data-prepper-plugins/aws-plugin-api/src/test/java/org/opensearch/dataprepper/aws/api/AwsRequestSigningApache4InterceptorTest.java b/data-prepper-plugins/aws-plugin-api/src/test/java/org/opensearch/dataprepper/aws/api/AwsRequestSigningApache4InterceptorTest.java new file mode 100644 index 0000000000..0ed6d3f769 --- /dev/null +++ b/data-prepper-plugins/aws-plugin-api/src/test/java/org/opensearch/dataprepper/aws/api/AwsRequestSigningApache4InterceptorTest.java @@ -0,0 +1,144 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.aws.api; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpHost; +import org.apache.http.RequestLine; +import org.apache.http.message.BasicHeader; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpCoreContext; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.signer.Signer; +import software.amazon.awssdk.http.ContentStreamProvider; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.regions.Region; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class AwsRequestSigningApache4InterceptorTest { + + @Mock + private Signer signer; + + @Mock + private AwsCredentialsProvider awsCredentialsProvider; + + @Mock + private HttpEntityEnclosingRequest httpRequest; + + @Mock + private HttpContext httpContext; + + private AwsRequestSigningApache4Interceptor createObjectUnderTest() { + return new AwsRequestSigningApache4Interceptor("es", signer, awsCredentialsProvider, Region.US_EAST_1); + } + + @Test + void invalidURI_throws_IOException() { + + final RequestLine requestLine = mock(RequestLine.class); + when(requestLine.getUri()).thenReturn("http://invalid-uri.com/file[/].html\n"); + + when(httpRequest.getRequestLine()).thenReturn(requestLine); + + final AwsRequestSigningApache4Interceptor objectUnderTest = new AwsRequestSigningApache4Interceptor("es", signer, awsCredentialsProvider, "us-east-1"); + + assertThrows(IOException.class, () -> objectUnderTest.process(httpRequest, httpContext)); + } + + @Test + void IOException_is_thrown_when_buildURI_throws_exception() { + final RequestLine requestLine = mock(RequestLine.class); + when(requestLine.getMethod()).thenReturn("GET"); + when(requestLine.getUri()).thenReturn("http://localhost?param=test"); + when(httpRequest.getRequestLine()).thenReturn(requestLine); + + when(httpContext.getAttribute(HttpCoreContext.HTTP_TARGET_HOST)).thenThrow(RuntimeException.class); + + final AwsRequestSigningApache4Interceptor objectUnderTest = createObjectUnderTest(); + + assertThrows(IOException.class, () -> objectUnderTest.process(httpRequest, httpContext)); + } + + @Test + void empty_contentStreamProvider_throws_IllegalStateException() throws IOException { + final RequestLine requestLine = mock(RequestLine.class); + when(requestLine.getMethod()).thenReturn("GET"); + when(requestLine.getUri()).thenReturn("http://localhost?param=test"); + when(httpRequest.getRequestLine()).thenReturn(requestLine); + when(httpRequest.getAllHeaders()).thenReturn(new BasicHeader[]{ + new BasicHeader("test-name", "test-value"), + new BasicHeader("content-length", "0") + }); + + final HttpEntity httpEntity = mock(HttpEntity.class); + final InputStream inputStream = mock(InputStream.class); + when(httpEntity.getContent()).thenReturn(inputStream); + + when((httpRequest).getEntity()).thenReturn(httpEntity); + + final HttpHost httpHost = HttpHost.create("http://localhost?param=test"); + when(httpContext.getAttribute(HttpCoreContext.HTTP_TARGET_HOST)).thenReturn(httpHost); + + final SdkHttpFullRequest signedRequest = mock(SdkHttpFullRequest.class); + when(signedRequest.headers()).thenReturn(Map.of("test-name", List.of("test-value"))); + when(signedRequest.contentStreamProvider()).thenReturn(Optional.empty()); + when(signer.sign(any(SdkHttpFullRequest.class), any(ExecutionAttributes.class))) + .thenReturn(signedRequest); + + final AwsRequestSigningApache4Interceptor objectUnderTest = createObjectUnderTest(); + + assertThrows(IllegalStateException.class, () -> objectUnderTest.process(httpRequest, httpContext)); + } + + @Test + void testHappyPath() throws IOException { + final RequestLine requestLine = mock(RequestLine.class); + when(requestLine.getMethod()).thenReturn("GET"); + when(requestLine.getUri()).thenReturn("http://localhost?param=test"); + when(httpRequest.getRequestLine()).thenReturn(requestLine); + when(httpRequest.getAllHeaders()).thenReturn(new BasicHeader[]{ + new BasicHeader("test-name", "test-value"), + new BasicHeader("content-length", "0") + }); + + final HttpEntity httpEntity = mock(HttpEntity.class); + final InputStream inputStream = mock(InputStream.class); + when(httpEntity.getContent()).thenReturn(inputStream); + + when((httpRequest).getEntity()).thenReturn(httpEntity); + + final HttpHost httpHost = HttpHost.create("http://localhost?param=test"); + when(httpContext.getAttribute(HttpCoreContext.HTTP_TARGET_HOST)).thenReturn(httpHost); + + final SdkHttpFullRequest signedRequest = mock(SdkHttpFullRequest.class); + when(signedRequest.headers()).thenReturn(Map.of("test-name", List.of("test-value"))); + final ContentStreamProvider contentStreamProvider = mock(ContentStreamProvider.class); + final InputStream contentInputStream = mock(InputStream.class); + when(contentStreamProvider.newStream()).thenReturn(contentInputStream); + when(signedRequest.contentStreamProvider()).thenReturn(Optional.of(contentStreamProvider)); + when(signer.sign(any(SdkHttpFullRequest.class), any(ExecutionAttributes.class))) + .thenReturn(signedRequest); + createObjectUnderTest().process(httpRequest, httpContext); + } +} diff --git a/data-prepper-plugins/common/build.gradle b/data-prepper-plugins/common/build.gradle index 6ec3272bd4..1d64b0c66d 100644 --- a/data-prepper-plugins/common/build.gradle +++ b/data-prepper-plugins/common/build.gradle @@ -15,8 +15,8 @@ dependencies { implementation 'software.amazon.awssdk:acm' implementation 'org.apache.commons:commons-compress:1.23.0' implementation libs.commons.lang3 - implementation "org.bouncycastle:bcprov-jdk15on:1.70" - implementation "org.bouncycastle:bcpkix-jdk15on:1.70" + implementation libs.bouncycastle.bcprov + implementation libs.bouncycastle.bcpkix implementation 'org.reflections:reflections:0.10.2' implementation 'io.micrometer:micrometer-core' testImplementation testLibs.junit.vintage diff --git a/data-prepper-plugins/opensearch-source/build.gradle b/data-prepper-plugins/opensearch-source/build.gradle index eacbd762c4..1ffb49541b 100644 --- a/data-prepper-plugins/opensearch-source/build.gradle +++ b/data-prepper-plugins/opensearch-source/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation project(':data-prepper-plugins:buffer-common') implementation project(':data-prepper-plugins:aws-plugin-api') implementation 'software.amazon.awssdk:apache-client' + implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2' implementation 'software.amazon.awssdk:s3' diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java index 63f6a48f28..4c124afd92 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchService.java @@ -10,6 +10,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.NoSearchContextWorker; import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; @@ -42,6 +43,7 @@ public class OpenSearchService { private final ScheduledExecutorService scheduledExecutorService; private final BufferAccumulator> bufferAccumulator; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; private SearchWorker searchWorker; private ScheduledFuture searchWorkerFuture; @@ -50,11 +52,12 @@ public static OpenSearchService createOpenSearchService(final SearchAccessor sea final SourceCoordinator sourceCoordinator, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer> buffer, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { return new OpenSearchService( searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor(), BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT), - acknowledgementSetManager); + acknowledgementSetManager, openSearchSourcePluginMetrics); } private OpenSearchService(final SearchAccessor searchAccessor, @@ -63,7 +66,8 @@ private OpenSearchService(final SearchAccessor searchAccessor, final Buffer> buffer, final ScheduledExecutorService scheduledExecutorService, final BufferAccumulator> bufferAccumulator, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.buffer = buffer; @@ -73,18 +77,19 @@ private OpenSearchService(final SearchAccessor searchAccessor, this.scheduledExecutorService = scheduledExecutorService; this.bufferAccumulator = bufferAccumulator; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } public void start() { switch(searchAccessor.getSearchContextType()) { case POINT_IN_TIME: - searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; case SCROLL: - searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; case NONE: - searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); break; default: throw new IllegalArgumentException( diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java index 430f5aa618..681e22b075 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java @@ -4,6 +4,7 @@ */ package org.opensearch.dataprepper.plugins.source.opensearch; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; @@ -14,6 +15,7 @@ import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchClientFactory; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy; @@ -24,6 +26,7 @@ public class OpenSearchSource implements Source>, UsesSourceCoordi private final AwsCredentialsSupplier awsCredentialsSupplier; private final OpenSearchSourceConfiguration openSearchSourceConfiguration; private final AcknowledgementSetManager acknowledgementSetManager; + private final PluginMetrics pluginMetrics; private SourceCoordinator sourceCoordinator; private OpenSearchService openSearchService; @@ -31,10 +34,12 @@ public class OpenSearchSource implements Source>, UsesSourceCoordi @DataPrepperPluginConstructor public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final AwsCredentialsSupplier awsCredentialsSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final PluginMetrics pluginMetrics) { this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.awsCredentialsSupplier = awsCredentialsSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.pluginMetrics = pluginMetrics; openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword(); } @@ -47,14 +52,16 @@ public void start(final Buffer> buffer) { startProcess(openSearchSourceConfiguration, buffer); } - private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer> buffer) { + private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, + final Buffer> buffer) { final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier); + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics = OpenSearchSourcePluginMetrics.create(pluginMetrics); final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory); final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor(); - openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager); + openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics); openSearchService.start(); } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java new file mode 100644 index 0000000000..80bc2d22a8 --- /dev/null +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/metrics/OpenSearchSourcePluginMetrics.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.opensearch.metrics; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.metrics.PluginMetrics; + +public class OpenSearchSourcePluginMetrics { + + static final String DOCUMENTS_PROCESSED = "documentsProcessed"; + static final String INDICES_PROCESSED = "indicesProcessed"; + static final String INDEX_PROCESSING_TIME_ELAPSED = "indexProcessingTime"; + static final String PROCESSING_ERRORS = "processingErrors"; + + private final Counter documentsProcessedCounter; + private final Counter indicesProcessedCounter; + private final Counter processingErrorsCounter; + private final Timer indexProcessingTimeTimer; + + public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) { + return new OpenSearchSourcePluginMetrics(pluginMetrics); + } + + private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) { + documentsProcessedCounter = pluginMetrics.counter(DOCUMENTS_PROCESSED); + indicesProcessedCounter = pluginMetrics.counter(INDICES_PROCESSED); + processingErrorsCounter = pluginMetrics.counter(PROCESSING_ERRORS); + indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED); + } + + public Counter getDocumentsProcessedCounter() { + return documentsProcessedCounter; + } + + public Counter getIndicesProcessedCounter() { + return indicesProcessedCounter; + } + + public Counter getProcessingErrorsCounter() { + return processingErrorsCounter; + } + + public Timer getIndexProcessingTimeTimer() { + return indexProcessingTimeTimer; + } +} diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java index cacd49c921..8b33732a57 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java @@ -19,6 +19,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest; @@ -48,19 +49,22 @@ public class NoSearchContextWorker implements SearchWorker, Runnable { private final BufferAccumulator> bufferAccumulator; private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; public NoSearchContextWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.sourceCoordinator = sourceCoordinator; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.bufferAccumulator = bufferAccumulator; this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } @Override @@ -88,11 +92,13 @@ public void run() { sourceCoordinator, indexPartition.get()); - processIndex(indexPartition.get(), acknowledgementSet.getLeft()); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), indexPartition.get(), sourceCoordinator); + openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); + LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey()); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("The search_after worker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -102,8 +108,10 @@ public void run() { } catch (final Exception e) { LOG.error("Unknown exception while processing index '{}', moving on to another index:", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); } } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Received an exception while trying to get index to process with search_after, backing off and retrying", e); try { Thread.sleep(STANDARD_BACKOFF_MILLIS); @@ -118,6 +126,8 @@ public void run() { private void processIndex(final SourcePartition openSearchIndexPartition, final AcknowledgementSet acknowledgementSet) { final String indexName = openSearchIndexPartition.getPartitionKey(); + LOG.info("Started processing for index: '{}'", indexName); + Optional openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState(); if (openSearchIndexProgressStateOptional.isEmpty()) { @@ -143,7 +153,9 @@ private void processIndex(final SourcePartition op acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); + openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); } catch (Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME), record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage()); @@ -157,6 +169,7 @@ private void processIndex(final SourcePartition op try { bufferAccumulator.flush(); } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java index 4162fedd01..38b4e06dd2 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -63,19 +64,22 @@ public class PitWorker implements SearchWorker, Runnable { private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; public PitWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.sourceCoordinator = sourceCoordinator; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.bufferAccumulator = bufferAccumulator; this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } @Override @@ -102,10 +106,13 @@ public void run() { sourceCoordinator, indexPartition.get()); - processIndex(indexPartition.get(), acknowledgementSet.getLeft()); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), indexPartition.get(), sourceCoordinator); + + openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); + LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey()); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -113,6 +120,7 @@ public void run() { LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}", indexPartition.get().getPartitionKey(), BACKOFF_ON_PIT_LIMIT_REACHED.getSeconds(), e.getMessage()); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(BACKOFF_ON_PIT_LIMIT_REACHED.toMillis()); } catch (final InterruptedException ex) { @@ -124,9 +132,11 @@ public void run() { } catch (final RuntimeException e) { LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); } } catch (final Exception e) { LOG.error("Received an exception while trying to get index to process with PIT, backing off and retrying", e); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(STANDARD_BACKOFF_MILLIS); } catch (final InterruptedException ex) { @@ -140,6 +150,8 @@ public void run() { private void processIndex(final SourcePartition openSearchIndexPartition, final AcknowledgementSet acknowledgementSet) { final String indexName = openSearchIndexPartition.getPartitionKey(); + + LOG.info("Starting processing for index: '{}'", indexName); Optional openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState(); if (openSearchIndexProgressStateOptional.isEmpty()) { @@ -180,7 +192,9 @@ private void processIndex(final SourcePartition op acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); + openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); } catch (Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME), record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage()); @@ -195,6 +209,7 @@ private void processIndex(final SourcePartition op try { bufferAccumulator.flush(); } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java index 0b53b7e006..213e8dede5 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.source.coordinator.exceptions.PartitionUpdateException; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -55,19 +56,22 @@ public class ScrollWorker implements SearchWorker { private final BufferAccumulator> bufferAccumulator; private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier; private final AcknowledgementSetManager acknowledgementSetManager; + private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; public ScrollWorker(final SearchAccessor searchAccessor, final OpenSearchSourceConfiguration openSearchSourceConfiguration, final SourceCoordinator sourceCoordinator, final BufferAccumulator> bufferAccumulator, final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier, - final AcknowledgementSetManager acknowledgementSetManager) { + final AcknowledgementSetManager acknowledgementSetManager, + final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) { this.searchAccessor = searchAccessor; this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.sourceCoordinator = sourceCoordinator; this.bufferAccumulator = bufferAccumulator; this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier; this.acknowledgementSetManager = acknowledgementSetManager; + this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics; } @Override @@ -95,10 +99,13 @@ public void run() { sourceCoordinator, indexPartition.get()); - processIndex(indexPartition.get(), acknowledgementSet.getLeft()); + openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft())); completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(), indexPartition.get(), sourceCoordinator); + + openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment(); + LOG.info("Completed processing for index: '{}'", indexPartition.get().getPartitionKey()); } catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) { LOG.warn("ScrollWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage()); sourceCoordinator.giveUpPartitions(); @@ -106,6 +113,7 @@ public void run() { LOG.warn("Received SearchContextLimitExceeded exception for index {}. Giving up index and waiting {} seconds before retrying: {}", indexPartition.get().getPartitionKey(), BACKOFF_ON_SCROLL_LIMIT_REACHED.getSeconds(), e.getMessage()); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(BACKOFF_ON_SCROLL_LIMIT_REACHED.toMillis()); } catch (final InterruptedException ex) { @@ -117,9 +125,11 @@ public void run() { } catch (final RuntimeException e) { LOG.error("Unknown exception while processing index '{}':", indexPartition.get().getPartitionKey(), e); sourceCoordinator.giveUpPartitions(); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); } } catch (final Exception e) { LOG.error("Received an exception while trying to get index to process with scroll, backing off and retrying", e); + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); try { Thread.sleep(STANDARD_BACKOFF_MILLIS); } catch (final InterruptedException ex) { @@ -133,6 +143,7 @@ public void run() { private void processIndex(final SourcePartition openSearchIndexPartition, final AcknowledgementSet acknowledgementSet) { final String indexName = openSearchIndexPartition.getPartitionKey(); + LOG.info("Started processing for index: '{}'", indexName); final Integer batchSize = openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(); @@ -168,6 +179,7 @@ private void processIndex(final SourcePartition op try { bufferAccumulator.flush(); } catch (final Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed flushing remaining OpenSearch documents to buffer due to: {}", e.getMessage()); } } @@ -180,7 +192,9 @@ private void writeDocumentsToBuffer(final List documents, acknowledgementSet.add(record.getData()); } bufferAccumulator.add(record); + openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment(); } catch (Exception e) { + openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment(); LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}", record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME), record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage()); diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java index 2bb193286f..f240f7bb24 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java @@ -9,6 +9,7 @@ import co.elastic.clients.transport.ElasticsearchTransport; import org.apache.http.Header; import org.apache.http.HttpHost; +import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponseInterceptor; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; @@ -31,11 +32,13 @@ import org.opensearch.client.transport.rest_client.RestClientTransport; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor; import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.signer.Aws4Signer; import software.amazon.awssdk.http.SdkHttpClient; import software.amazon.awssdk.http.apache.ApacheHttpClient; @@ -165,12 +168,38 @@ private org.elasticsearch.client.RestClient createElasticSearchRestClient(final new BasicHeader("Content-type", "application/json") }); - attachBasicAuth(restClientBuilder, openSearchSourceConfiguration); + if (Objects.nonNull(openSearchSourceConfiguration.getAwsAuthenticationOptions())) { + attachSigV4ForElasticsearchClient(restClientBuilder, openSearchSourceConfiguration); + } else { + attachBasicAuth(restClientBuilder, openSearchSourceConfiguration); + } setConnectAndSocketTimeout(restClientBuilder, openSearchSourceConfiguration); return restClientBuilder.build(); } + private void attachSigV4ForElasticsearchClient(final org.elasticsearch.client.RestClientBuilder restClientBuilder, + final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(AwsCredentialsOptions.builder() + .withRegion(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()) + .withStsRoleArn(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsRoleArn()) + .withStsExternalId(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsExternalId()) + .withStsHeaderOverrides(openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsStsHeaderOverrides()) + .build()); + final Aws4Signer aws4Signer = Aws4Signer.create(); + final HttpRequestInterceptor httpRequestInterceptor = new AwsRequestSigningApache4Interceptor(AOS_SERVICE_NAME, aws4Signer, + awsCredentialsProvider, openSearchSourceConfiguration.getAwsAuthenticationOptions().getAwsRegion()); + restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { + httpClientBuilder.addInterceptorLast(httpRequestInterceptor); + attachSSLContext(httpClientBuilder, openSearchSourceConfiguration); + httpClientBuilder.addInterceptorLast( + (HttpResponseInterceptor) + (response, context) -> + response.addHeader("X-Elastic-Product", "Elasticsearch")); + return httpClientBuilder; + }); + } + private void attachBasicAuth(final RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java index 0e0566e2ba..731ee07e50 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchServiceTest.java @@ -22,6 +22,7 @@ import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.NoSearchContextWorker; import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier; import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker; @@ -65,6 +66,9 @@ public class OpenSearchServiceTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + @Mock private SourceCoordinator sourceCoordinator; @@ -93,7 +97,7 @@ private OpenSearchService createObjectUnderTest() { })) { executorsMockedStatic.when(Executors::newSingleThreadScheduledExecutor).thenReturn(scheduledExecutorService); bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - return OpenSearchService.createOpenSearchService(openSearchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager); + return OpenSearchService.createOpenSearchService(openSearchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java index c48f552bf7..4ed020a173 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceTest.java @@ -11,11 +11,13 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchClientFactory; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy; @@ -52,11 +54,17 @@ public class OpenSearchSourceTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + @Mock private SourceCoordinator sourceCoordinator; private OpenSearchSource createObjectUnderTest() { - return new OpenSearchSource(openSearchSourceConfiguration, awsCredentialsSupplier, acknowledgementSetManager); + return new OpenSearchSource(openSearchSourceConfiguration, awsCredentialsSupplier, acknowledgementSetManager, pluginMetrics); } @Test @@ -75,11 +83,13 @@ void start_with_non_null_buffer_does_not_throw() { try (final MockedStatic searchAccessorStrategyMockedStatic = mockStatic(SearchAccessorStrategy.class); final MockedStatic openSearchClientFactoryMockedStatic = mockStatic(OpenSearchClientFactory.class); + final MockedStatic openSearchSourcePluginMetricsMockedStatic = mockStatic(OpenSearchSourcePluginMetrics.class); final MockedStatic openSearchServiceMockedStatic = mockStatic(OpenSearchService.class)) { openSearchClientFactoryMockedStatic.when(() -> OpenSearchClientFactory.create(awsCredentialsSupplier)).thenReturn(openSearchClientFactory); searchAccessorStrategyMockedStatic.when(() -> SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory)).thenReturn(searchAccessorStrategy); + openSearchSourcePluginMetricsMockedStatic.when(() -> OpenSearchSourcePluginMetrics.create(pluginMetrics)).thenReturn(openSearchSourcePluginMetrics); - openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager)) + openSearchServiceMockedStatic.when(() -> OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics)) .thenReturn(openSearchService); objectUnderTest.start(buffer); diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java index 0b51ae2e0f..12eb0223d2 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorkerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest; @@ -53,6 +56,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.ACKNOWLEDGEMENT_SET_TIMEOUT_SECONDS; @@ -77,16 +81,35 @@ public class NoSearchContextWorkerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock(lenient = true) + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + + @Mock + private Counter documentsProcessedCounter; + + @Mock + private Counter indicesProcessedCounter; + + @Mock + private Counter processingErrorsCounter; + + @Mock + private Timer indexProcessingTimeTimer; + private ExecutorService executorService; @BeforeEach void setup() { executorService = Executors.newSingleThreadExecutor(); lenient().when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourcePluginMetrics.getDocumentsProcessedCounter()).thenReturn(documentsProcessedCounter); + when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); + when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); + when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); } private NoSearchContextWorker createObjectUnderTest() { - return new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + return new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -104,7 +127,8 @@ void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrup } @Test - void run_when_search_without_search_context_throws_index_not_found_exception_completes_the_partition() throws InterruptedException { + void run_when_search_without_search_context_throws_index_not_found_exception_completes_the_partition() throws Exception { + mockTimerCallable(); final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); @@ -131,10 +155,16 @@ void run_when_search_without_search_context_throws_index_not_found_exception_com verify(sourceCoordinator).completePartition(partitionKey); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -188,10 +218,16 @@ void run_with_getNextPartition_with_non_empty_partition_processes_and_closes_tha assertThat(noSearchContextSearchRequests.get(1).getIndex(), equalTo(partitionKey)); assertThat(noSearchContextSearchRequests.get(1).getPaginationSize(), equalTo(2)); assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); AtomicReference numEventsAdded = new AtomicReference<>(0); doAnswer(a -> { @@ -261,5 +297,16 @@ void run_with_getNextPartition_with_acknowledgments_processes_and_closes_that_pa assertThat(noSearchContextSearchRequests.get(1).getSearchAfter(), equalTo(searchWithSearchAfterResults.getNextSearchAfter())); verify(acknowledgementSet).complete(); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); + } + + private void mockTimerCallable() { + doAnswer(a -> { + a.getArgument(0).run(); + return null; + }).when(indexProcessingTimeTimer).record(any(Runnable.class)); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java index eac270e3d2..718dec6bb5 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorkerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -84,16 +87,35 @@ public class PitWorkerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock(lenient = true) + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + + @Mock + private Counter documentsProcessedCounter; + + @Mock + private Counter indicesProcessedCounter; + + @Mock + private Counter processingErrorsCounter; + + @Mock + private Timer indexProcessingTimeTimer; + private ExecutorService executorService; @BeforeEach void setup() { executorService = Executors.newSingleThreadExecutor(); lenient().when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourcePluginMetrics.getDocumentsProcessedCounter()).thenReturn(documentsProcessedCounter); + when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); + when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); + when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); } private PitWorker createObjectUnderTest() { - return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + return new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -112,6 +134,8 @@ void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrup @Test void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -188,10 +212,15 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_pit_ assertThat(deletePointInTimeRequest.getPitId(), equalTo(pitId)); verifyNoInteractions(acknowledgementSetManager); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_partition() throws Exception { + mockTimerCallable(); final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); AtomicReference numEventsAdded = new AtomicReference<>(0); @@ -283,10 +312,16 @@ void run_with_acknowledgments_enabled_creates_and_deletes_pit_and_closes_that_pa assertThat(deletePointInTimeRequest.getPitId(), equalTo(pitId)); verify(acknowledgementSet).complete(); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create_another_point_in_time() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -342,10 +377,16 @@ void run_with_getNextPartition_with_valid_existing_point_in_time_does_not_create verify(searchAccessor, never()).createPit(any(CreatePointInTimeRequest.class)); verify(searchAccessor, times(2)).searchWithPit(any(SearchPointInTimeRequest.class)); verify(sourceCoordinator, times(2)).saveProgressStateForPartition(eq(partitionKey), eq(openSearchIndexProgressState)); + + verify(documentsProcessedCounter, times(3)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test - void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitException() throws InterruptedException { + void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -353,7 +394,7 @@ void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitE when(searchAccessor.createPit(any(CreatePointInTimeRequest.class))).thenThrow(SearchContextLimitException.class); - when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)); + when(sourceCoordinator.getNextPartition(openSearchIndexPartitionCreationSupplier)).thenReturn(Optional.of(sourcePartition)).thenReturn(Optional.empty()); final Future future = executorService.submit(() -> createObjectUnderTest().run()); @@ -367,10 +408,16 @@ void run_gives_up_partitions_and_waits_when_createPit_throws_SearchContextLimitE verify(searchAccessor, never()).deletePit(any(DeletePointInTimeRequest.class)); verify(sourceCoordinator).giveUpPartitions(); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verify(processingErrorsCounter).increment(); } @Test - void run_completes_partitions_when_createPit_throws_IndexNotFoundException() throws InterruptedException { + void run_completes_partitions_when_createPit_throws_IndexNotFoundException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -393,5 +440,16 @@ void run_completes_partitions_when_createPit_throws_IndexNotFoundException() thr verify(searchAccessor, never()).deletePit(any(DeletePointInTimeRequest.class)); verify(sourceCoordinator).completePartition(partitionKey); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verifyNoInteractions(processingErrorsCounter); + } + + private void mockTimerCallable() { + doAnswer(a -> { + a.getArgument(0).run(); + return null; + }).when(indexProcessingTimeTimer).record(any(Runnable.class)); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java index b28c140af6..f97838b187 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorkerTest.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.source.opensearch.worker; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -22,6 +24,7 @@ import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SchedulingParameterConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration; +import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException; import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.SearchContextLimitException; @@ -56,6 +59,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.opensearch.worker.ScrollWorker.SCROLL_TIME_PER_BATCH; @@ -82,16 +86,35 @@ public class ScrollWorkerTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock(lenient = true) + private OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics; + + @Mock + private Counter documentsProcessedCounter; + + @Mock + private Counter indicesProcessedCounter; + + @Mock + private Counter processingErrorsCounter; + + @Mock + private Timer indexProcessingTimeTimer; + private ExecutorService executorService; @BeforeEach void setup() { executorService = Executors.newSingleThreadExecutor(); lenient().when(openSearchSourceConfiguration.isAcknowledgmentsEnabled()).thenReturn(false); + when(openSearchSourcePluginMetrics.getDocumentsProcessedCounter()).thenReturn(documentsProcessedCounter); + when(openSearchSourcePluginMetrics.getIndicesProcessedCounter()).thenReturn(indicesProcessedCounter); + when(openSearchSourcePluginMetrics.getProcessingErrorsCounter()).thenReturn(processingErrorsCounter); + when(openSearchSourcePluginMetrics.getIndexProcessingTimeTimer()).thenReturn(indexProcessingTimeTimer); } private ScrollWorker createObjectUnderTest() { - return new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager); + return new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics); } @Test @@ -110,6 +133,8 @@ void run_with_getNextPartition_returning_empty_will_sleep_and_exit_when_interrup @Test void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scroll_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -181,10 +206,16 @@ void run_with_getNextPartition_with_non_empty_partition_creates_and_deletes_scro final DeleteScrollRequest deleteScrollRequest = deleteRequestArgumentCaptor.getValue(); assertThat(deleteScrollRequest, notNullValue()); assertThat(deleteScrollRequest.getScrollId(), equalTo(scrollId)); + + verify(documentsProcessedCounter, times(5)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_and_closes_that_partition() throws Exception { + mockTimerCallable(); + final AcknowledgementSet acknowledgementSet = mock(AcknowledgementSet.class); AtomicReference numEventsAdded = new AtomicReference<>(0); doAnswer(a -> { @@ -272,10 +303,16 @@ void run_with_getNextPartition_with_acknowledgments_creates_and_deletes_scroll_a assertThat(deleteScrollRequest.getScrollId(), equalTo(scrollId)); verify(acknowledgementSet).complete(); + + verify(documentsProcessedCounter, times(5)).increment(); + verify(indicesProcessedCounter).increment(); + verifyNoInteractions(processingErrorsCounter); } @Test - void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLimitException() throws InterruptedException { + void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLimitException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -300,11 +337,17 @@ void run_gives_up_partitions_and_waits_when_createScroll_throws_SearchContextLim verifyNoMoreInteractions(searchAccessor); verify(sourceCoordinator).giveUpPartitions(); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verify(processingErrorsCounter).increment(); } @Test - void run_completes_partitions_createScroll_throws_IndexNotFoundException() throws InterruptedException { + void run_completes_partitions_createScroll_throws_IndexNotFoundException() throws Exception { + mockTimerCallable(); + final SourcePartition sourcePartition = mock(SourcePartition.class); final String partitionKey = UUID.randomUUID().toString(); when(sourcePartition.getPartitionKey()).thenReturn(partitionKey); @@ -330,6 +373,17 @@ void run_completes_partitions_createScroll_throws_IndexNotFoundException() throw verifyNoMoreInteractions(searchAccessor); verify(sourceCoordinator).completePartition(partitionKey); verify(sourceCoordinator, never()).closePartition(anyString(), any(Duration.class), anyInt()); + + verifyNoInteractions(documentsProcessedCounter); + verifyNoInteractions(indicesProcessedCounter); + verifyNoInteractions(processingErrorsCounter); + } + + private void mockTimerCallable() { + doAnswer(a -> { + a.getArgument(0).run(); + return null; + }).when(indexProcessingTimeTimer).record(any(Runnable.class)); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java index b3a191a4e7..54497b2ce7 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java @@ -95,6 +95,33 @@ void provideElasticSearchClient_with_username_and_password() { verifyNoInteractions(awsCredentialsSupplier); } + @Test + void provideElasticSearchClient_with_aws_auth() { + when(connectionConfiguration.getCertPath()).thenReturn(null); + when(connectionConfiguration.getSocketTimeout()).thenReturn(null); + when(connectionConfiguration.getConnectTimeout()).thenReturn(null); + + final AwsAuthenticationConfiguration awsAuthenticationConfiguration = mock(AwsAuthenticationConfiguration.class); + when(awsAuthenticationConfiguration.getAwsRegion()).thenReturn(Region.US_EAST_1); + final String stsRoleArn = "arn:aws:iam::123456789012:role/my-role"; + when(awsAuthenticationConfiguration.getAwsStsRoleArn()).thenReturn(stsRoleArn); + when(awsAuthenticationConfiguration.getAwsStsHeaderOverrides()).thenReturn(Collections.emptyMap()); + when(openSearchSourceConfiguration.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationConfiguration); + + final ArgumentCaptor awsCredentialsOptionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + final AwsCredentialsProvider awsCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(awsCredentialsOptionsArgumentCaptor.capture())).thenReturn(awsCredentialsProvider); + + final ElasticsearchClient elasticsearchClient = createObjectUnderTest().provideElasticSearchClient(openSearchSourceConfiguration); + assertThat(elasticsearchClient, notNullValue()); + + final AwsCredentialsOptions awsCredentialsOptions = awsCredentialsOptionsArgumentCaptor.getValue(); + assertThat(awsCredentialsOptions, notNullValue()); + assertThat(awsCredentialsOptions.getRegion(), equalTo(Region.US_EAST_1)); + assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap())); + assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); + } + @Test void provideOpenSearchClient_with_aws_auth() { when(connectionConfiguration.getCertPath()).thenReturn(null); diff --git a/data-prepper-plugins/opensearch/build.gradle b/data-prepper-plugins/opensearch/build.gradle index 74e863f3f5..8a87f1c926 100644 --- a/data-prepper-plugins/opensearch/build.gradle +++ b/data-prepper-plugins/opensearch/build.gradle @@ -32,8 +32,8 @@ dependencies { implementation 'software.amazon.awssdk:apache-client' testImplementation testLibs.junit.vintage testImplementation 'commons-io:commons-io:2.12.0' - testImplementation 'net.bytebuddy:byte-buddy:1.14.4' - testImplementation 'net.bytebuddy:byte-buddy-agent:1.14.4' + testImplementation 'net.bytebuddy:byte-buddy:1.14.7' + testImplementation 'net.bytebuddy:byte-buddy-agent:1.14.7' testImplementation testLibs.slf4j.simple } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java index 0eb173b7e5..87c94a5b91 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/ConnectionConfiguration.java @@ -27,6 +27,7 @@ import org.opensearch.client.transport.rest_client.RestClientTransport; import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.aws.api.AwsRequestSigningApache4Interceptor; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.PreSerializedJsonpMapper; import org.slf4j.Logger; @@ -286,7 +287,7 @@ private void attachSigV4(final RestClientBuilder restClientBuilder, AwsCredentia final Aws4Signer aws4Signer = Aws4Signer.create(); final AwsCredentialsOptions awsCredentialsOptions = createAwsCredentialsOptions(); final AwsCredentialsProvider credentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); - final HttpRequestInterceptor httpRequestInterceptor = new AwsRequestSigningApacheInterceptor(AOS_SERVICE_NAME, aws4Signer, + final HttpRequestInterceptor httpRequestInterceptor = new AwsRequestSigningApache4Interceptor(AOS_SERVICE_NAME, aws4Signer, credentialsProvider, awsRegion); restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.addInterceptorLast(httpRequestInterceptor); diff --git a/data-prepper-plugins/otel-logs-source/build.gradle b/data-prepper-plugins/otel-logs-source/build.gradle index de1900539d..45e67098e9 100644 --- a/data-prepper-plugins/otel-logs-source/build.gradle +++ b/data-prepper-plugins/otel-logs-source/build.gradle @@ -27,8 +27,8 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation libs.commons.lang3 - implementation "org.bouncycastle:bcprov-jdk15on:1.69" - implementation "org.bouncycastle:bcpkix-jdk15on:1.69" + implementation libs.bouncycastle.bcprov + implementation libs.bouncycastle.bcpkix testImplementation 'org.assertj:assertj-core:3.24.2' testImplementation testLibs.mockito.inline testImplementation("commons-io:commons-io:2.10.0") diff --git a/data-prepper-plugins/otel-metrics-source/build.gradle b/data-prepper-plugins/otel-metrics-source/build.gradle index d22e42968e..59591e66d4 100644 --- a/data-prepper-plugins/otel-metrics-source/build.gradle +++ b/data-prepper-plugins/otel-metrics-source/build.gradle @@ -26,8 +26,8 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation libs.commons.lang3 - implementation "org.bouncycastle:bcprov-jdk15on:1.70" - implementation "org.bouncycastle:bcpkix-jdk15on:1.70" + implementation libs.bouncycastle.bcprov + implementation libs.bouncycastle.bcpkix testImplementation testLibs.junit.vintage testImplementation 'org.assertj:assertj-core:3.24.2' testImplementation testLibs.mockito.inline diff --git a/data-prepper-plugins/otel-trace-source/build.gradle b/data-prepper-plugins/otel-trace-source/build.gradle index a037076e27..ca39952cca 100644 --- a/data-prepper-plugins/otel-trace-source/build.gradle +++ b/data-prepper-plugins/otel-trace-source/build.gradle @@ -25,8 +25,8 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' implementation libs.commons.lang3 - implementation "org.bouncycastle:bcprov-jdk15on:1.70" - implementation "org.bouncycastle:bcpkix-jdk15on:1.70" + implementation libs.bouncycastle.bcprov + implementation libs.bouncycastle.bcpkix testImplementation testLibs.junit.vintage testImplementation 'org.assertj:assertj-core:3.24.2' testImplementation testLibs.mockito.inline diff --git a/e2e-test/build.gradle b/e2e-test/build.gradle index 95831fc8ed..d20d91f168 100644 --- a/e2e-test/build.gradle +++ b/e2e-test/build.gradle @@ -10,10 +10,17 @@ subprojects { gradlePluginPortal() } dependencies { - classpath 'com.bmuschko:gradle-docker-plugin:7.0.0' + classpath 'com.bmuschko:gradle-docker-plugin:9.3.2' } } + ext { + dataPrepperJarImageFilepath = 'bin/data-prepper/' + targetJavaVersion = project.hasProperty('endToEndJavaVersion') ? project.getProperty('endToEndJavaVersion') : '11' + targetOpenTelemetryVersion = project.hasProperty('openTelemetryVersion') ? project.getProperty('openTelemetryVersion') : "${libs.versions.opentelemetry.get()}" + dataPrepperBaseImage = "eclipse-temurin:${targetJavaVersion}-jre" + } + sourceSets { integrationTest { java { @@ -40,17 +47,10 @@ subprojects { duplicatesStrategy = DuplicatesStrategy.EXCLUDE from project(':data-prepper-main').jar.archivePath from project(':data-prepper-main').configurations.runtimeClasspath - into("${project.buildDir}/bin/data-prepper") + into("${project.buildDir}/docker/${dataPrepperJarImageFilepath}") } dependencies { testImplementation testLibs.junit.vintage } - - ext { - dataPrepperJarFilepath = "${project.buildDir.name}/bin/data-prepper/" - targetJavaVersion = project.hasProperty('endToEndJavaVersion') ? project.getProperty('endToEndJavaVersion') : '11' - targetOpenTelemetryVersion = project.hasProperty('openTelemetryVersion') ? project.getProperty('openTelemetryVersion') : "${libs.versions.opentelemetry.get()}" - dataPrepperBaseImage = "eclipse-temurin:${targetJavaVersion}-jre" - } } diff --git a/e2e-test/log/build.gradle b/e2e-test/log/build.gradle index f3516d406e..382dcc5100 100644 --- a/e2e-test/log/build.gradle +++ b/e2e-test/log/build.gradle @@ -41,16 +41,15 @@ task createDataPrepperDockerFile(type: Dockerfile) { dependsOn copyDataPrepperJar destFile = project.file('build/docker/Dockerfile') from(dataPrepperBaseImage) - workingDir("/app/data-prepper") - copyFile("${dataPrepperJarFilepath}", "/app/data-prepper/lib") + workingDir('/app/data-prepper') + copyFile("${dataPrepperJarImageFilepath}", '/app/data-prepper/lib') defaultCommand('java', '-Ddata-prepper.dir=/app/data-prepper', '-cp', '/app/data-prepper/lib/*', 'org.opensearch.dataprepper.DataPrepperExecute') } task buildDataPrepperDockerImage(type: DockerBuildImage) { dependsOn createDataPrepperDockerFile - inputDir = file(".") - dockerFile = file("build/docker/Dockerfile") - images.add("e2e-test-log-pipeline-image") + dockerFile = file('build/docker/Dockerfile') + images.add('e2e-test-log-pipeline-image') } def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int sourcePort, diff --git a/e2e-test/peerforwarder/build.gradle b/e2e-test/peerforwarder/build.gradle index a93dc88100..4a0a0aad0f 100644 --- a/e2e-test/peerforwarder/build.gradle +++ b/e2e-test/peerforwarder/build.gradle @@ -36,23 +36,29 @@ def DATA_PREPPER_CONFIG_STATIC = "data_prepper_static.yml" /** * DataPrepper Docker tasks */ + +task copyTestResources(type: Copy) { + from 'src/integrationTest/resources/' + into "${project.buildDir}/docker/test-resources" +} + task createDataPrepperDockerFile(type: Dockerfile) { dependsOn copyDataPrepperJar + dependsOn copyTestResources destFile = project.file('build/docker/Dockerfile') from(dataPrepperBaseImage) exposePort(2021) - workingDir("/app/data-prepper") - copyFile("${dataPrepperJarFilepath}", "/app/data-prepper/lib") - copyFile("src/integrationTest/resources/default_certificate.pem", "/app/data-prepper/config/default_certificate.pem") - copyFile("src/integrationTest/resources/default_private_key.pem", "/app/data-prepper/config/default_private_key.pem") + workingDir('/app/data-prepper') + copyFile("${dataPrepperJarImageFilepath}", '/app/data-prepper/lib') + copyFile('test-resources/default_certificate.pem', '/app/data-prepper/config/default_certificate.pem') + copyFile('test-resources/default_private_key.pem', '/app/data-prepper/config/default_private_key.pem') defaultCommand('java', '-Ddata-prepper.dir=/app/data-prepper', '-cp', '/app/data-prepper/lib/*', 'org.opensearch.dataprepper.DataPrepperExecute') } task buildDataPrepperDockerImage(type: DockerBuildImage) { dependsOn createDataPrepperDockerFile - inputDir = file(".") - dockerFile = file("build/docker/Dockerfile") - images.add("integ-test-pipeline-image") + dockerFile = file('build/docker/Dockerfile') + images.add('integ-test-pipeline-image') } def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int sourcePort, diff --git a/e2e-test/trace/build.gradle b/e2e-test/trace/build.gradle index bc97baf9ce..0f811a26a7 100644 --- a/e2e-test/trace/build.gradle +++ b/e2e-test/trace/build.gradle @@ -45,16 +45,15 @@ task createDataPrepperDockerFile(type: Dockerfile) { destFile = project.file('build/docker/Dockerfile') from(dataPrepperBaseImage) exposePort(21890) - workingDir("/app/data-prepper") - copyFile("${dataPrepperJarFilepath}", "/app/data-prepper/lib") + workingDir('/app/data-prepper') + copyFile("${dataPrepperJarImageFilepath}", '/app/data-prepper/lib') defaultCommand('java', '-Ddata-prepper.dir=/app/data-prepper', '-cp', '/app/data-prepper/lib/*', 'org.opensearch.dataprepper.DataPrepperExecute') } task buildDataPrepperDockerImage(type: DockerBuildImage) { dependsOn createDataPrepperDockerFile - inputDir = file(".") - dockerFile = file("build/docker/Dockerfile") - images.add("integ-test-pipeline-image") + dockerFile = file('build/docker/Dockerfile') + images.add('integ-test-pipeline-image') } def createDataPrepperDockerContainer(final String taskBaseName, final String dataPrepperName, final int grpcPort, diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 249e5832f0..7f93135c49 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ae04661ee7..ac72c34e8a 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip +networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index a69d9cb6c2..0adc8e1a53 100755 --- a/gradlew +++ b/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,13 +80,11 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,22 +131,29 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -193,6 +198,10 @@ if "$cygwin" || "$msys" ; then done fi + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + # Collect all arguments for the java command; # * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of # shell script including quotes and variable substitutions, so put them in diff --git a/gradlew.bat b/gradlew.bat index f127cfd49d..93e3f59f13 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -26,6 +26,7 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% diff --git a/performance-test/build.gradle b/performance-test/build.gradle index 6e9a8a5902..b5ffad15f1 100644 --- a/performance-test/build.gradle +++ b/performance-test/build.gradle @@ -5,7 +5,7 @@ plugins { id 'java' - id 'io.gatling.gradle' version '3.9.0' + id 'io.gatling.gradle' version '3.9.5.5' } configurations.all { @@ -19,6 +19,7 @@ repositories { } dependencies { + gatlingImplementation 'software.amazon.awssdk:auth:2.20.67' implementation 'com.fasterxml.jackson.core:jackson-core' testRuntimeOnly testLibs.junit.engine } diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/FixedClientSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/FixedClientSimulation.java index b5d8988eb2..ae03274110 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/FixedClientSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/FixedClientSimulation.java @@ -22,6 +22,7 @@ public class FixedClientSimulation extends Simulation { .during(duration) .on(Chain.sendApacheCommonLogPostRequest("Post logs with large batch", largeBatchSize)); + public FixedClientSimulation() { setUp(fixedScenario.injectOpen(CoreDsl.atOnceUsers(users))) .protocols(Protocol.httpProtocol()) diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpStaticLoadSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpStaticLoadSimulation.java index e8078b560f..ca59af336e 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpStaticLoadSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpStaticLoadSimulation.java @@ -20,6 +20,7 @@ public class HttpStaticLoadSimulation extends Simulation { .during(testDuration) .on(Chain.sendApacheCommonLogPostRequest("Average log post request", 20)); + public HttpStaticLoadSimulation() { setUp(httpStaticLoad.injectOpen( CoreDsl.rampUsers(10).during(Duration.ofSeconds(10)), diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpsStaticLoadSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpsStaticLoadSimulation.java index 3cc36f52d5..69764979a9 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpsStaticLoadSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/HttpsStaticLoadSimulation.java @@ -20,6 +20,7 @@ public class HttpsStaticLoadSimulation extends Simulation { .during(testDuration) .on(Chain.sendApacheCommonLogPostRequest("Average log post request", 20)); + public HttpsStaticLoadSimulation() { setUp(httpStaticLoad.injectOpen( CoreDsl.rampUsers(10).during(Duration.ofSeconds(10)), diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/RampUpSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/RampUpSimulation.java index 6a942cc848..f7a5fdf18e 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/RampUpSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/RampUpSimulation.java @@ -23,6 +23,7 @@ public class RampUpSimulation extends Simulation { .during(peakLoadTime) .on(Chain.sendApacheCommonLogPostRequest("Post logs with large batch", largeBatchSize)); + public RampUpSimulation() { setUp( rampUpScenario.injectOpen( diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SingleRequestSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SingleRequestSimulation.java index 07160336bb..6815f055d2 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SingleRequestSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SingleRequestSimulation.java @@ -10,21 +10,23 @@ import io.gatling.javaapi.core.ScenarioBuilder; import io.gatling.javaapi.core.Simulation; import io.gatling.javaapi.http.HttpDsl; +import org.opensearch.dataprepper.test.performance.tools.PathTarget; import org.opensearch.dataprepper.test.performance.tools.Protocol; public class SingleRequestSimulation extends Simulation { ChainBuilder sendSingleLogFile = CoreDsl.exec( HttpDsl.http("Post log") - .post("/log/ingest") + .post(PathTarget.getPath()) .body(CoreDsl.ElFileBody("bodies/singleLog.json")) .asJson() - .check(HttpDsl.status().is(200), CoreDsl.responseTimeInMillis().lt(200)) - ); + .check(HttpDsl.status().is(200), CoreDsl.responseTimeInMillis().lt(500)) + ); ScenarioBuilder basicScenario = CoreDsl.scenario("Post static json log file") .exec(sendSingleLogFile); + public SingleRequestSimulation() { setUp( @@ -32,7 +34,7 @@ public class SingleRequestSimulation extends Simulation { ).protocols( Protocol.httpProtocol() ).assertions( - CoreDsl.global().responseTime().mean().lt(100), + CoreDsl.global().responseTime().max().lt(1000), CoreDsl.global().successfulRequests().percent().is(100.0) ); } diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SlowBurnSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SlowBurnSimulation.java index 9fd06673c3..10611565d4 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SlowBurnSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/SlowBurnSimulation.java @@ -23,6 +23,7 @@ public class SlowBurnSimulation extends Simulation { .forever() .on(Chain.sendApacheCommonLogPostRequest("Post logs with large batch", largeBatchSize)); + public SlowBurnSimulation() { setUp( rampUpScenario.injectOpen( diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/TargetRpsSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/TargetRpsSimulation.java index 73f9790d52..ea4fcea203 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/TargetRpsSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/TargetRpsSimulation.java @@ -43,6 +43,7 @@ private static PopulationBuilder runScenarioWithTargetRps(final ScenarioBuilder ).protocols(Protocol.httpProtocol()); } + public TargetRpsSimulation() { setUp( runScenarioWithTargetRps(smallBatchScenario, 400), diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/VariousGrokPatternsSimulation.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/VariousGrokPatternsSimulation.java index 36677de41c..a94662b290 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/VariousGrokPatternsSimulation.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/VariousGrokPatternsSimulation.java @@ -13,6 +13,7 @@ import io.gatling.javaapi.core.Session; import io.gatling.javaapi.core.Simulation; import io.gatling.javaapi.http.HttpDsl; +import org.opensearch.dataprepper.test.performance.tools.PathTarget; import org.opensearch.dataprepper.test.performance.tools.Protocol; import org.opensearch.dataprepper.test.performance.tools.Templates; @@ -52,7 +53,7 @@ private static Map logObject(final String logValue) { ChainBuilder sendMultipleGrokPatterns = CoreDsl.exec( HttpDsl.http("Http multiple grok pattern request") - .post("/log/ingest") + .post(PathTarget.getPath()) .asJson() .body(CoreDsl.StringBody(VariousGrokPatternsSimulation.multipleGrokPatterns))); @@ -60,6 +61,7 @@ private static Map logObject(final String logValue) { .during(testDuration) .on(sendMultipleGrokPatterns); + public VariousGrokPatternsSimulation() { setUp(sendMultipleGrokPatternsScenario.injectOpen( CoreDsl.rampUsers(rampUsers).during(rampUpTime) diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/AwsRequestSigner.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/AwsRequestSigner.java new file mode 100644 index 0000000000..377f58f589 --- /dev/null +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/AwsRequestSigner.java @@ -0,0 +1,134 @@ +package org.opensearch.dataprepper.test.performance.tools; + +import io.gatling.http.client.Request; +import io.netty.handler.codec.http.HttpHeaders; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.signer.Aws4Signer; +import software.amazon.awssdk.auth.signer.AwsSignerExecutionAttribute; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpMethod; +import software.amazon.awssdk.regions.Region; + +import java.io.ByteArrayInputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Function; + +public class AwsRequestSigner implements Consumer { + static final String SIGNER_NAME = "aws_sigv4"; + + /** + * Constant to check content-length + */ + private static final String CONTENT_LENGTH = "content-length"; + /** + * Constant to check Zero content length + */ + private static final String ZERO_CONTENT_LENGTH = "0"; + /** + * Constant to check if host is the endpoint + */ + private static final String HOST = "host"; + + private static final String REGION_PROPERTY = "aws_region"; + private static final String SERVICE_NAME_PROPERTY = "aws_service"; + + + private final Aws4Signer awsSigner; + private final AwsCredentialsProvider credentialsProvider; + private final Region region; + private final String service; + + public AwsRequestSigner() { + region = getRequiredProperty(REGION_PROPERTY, Region::of); + service = getRequiredProperty(SERVICE_NAME_PROPERTY, Function.identity()); + + awsSigner = Aws4Signer.create(); + credentialsProvider = DefaultCredentialsProvider.create(); + } + + private static T getRequiredProperty(String propertyName, Function transform) { + String inputString = System.getProperty(propertyName); + if(inputString == null) { + throw new RuntimeException("Using " + SIGNER_NAME + " authentication requires providing the " + propertyName + " system property."); + } + + try { + return transform.apply(inputString); + } catch (Exception ex) { + throw new RuntimeException("Unable to process property " + propertyName + " with error: " + ex.getMessage()); + } + } + + @Override + public void accept(Request request) { + ExecutionAttributes attributes = new ExecutionAttributes(); + attributes.putAttribute(AwsSignerExecutionAttribute.AWS_CREDENTIALS, credentialsProvider.resolveCredentials()); + attributes.putAttribute(AwsSignerExecutionAttribute.SERVICE_SIGNING_NAME, service); + attributes.putAttribute(AwsSignerExecutionAttribute.SIGNING_REGION, region); + + SdkHttpFullRequest incomingSdkRequest = convertIncomingRequest(request); + + SdkHttpFullRequest signedRequest = awsSigner.sign(incomingSdkRequest, attributes); + + modifyOutgoingRequest(request, signedRequest); + } + + private SdkHttpFullRequest convertIncomingRequest(Request request) { + URI uri; + try { + uri = request.getUri().toJavaNetURI(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + SdkHttpFullRequest.Builder requestBuilder = SdkHttpFullRequest.builder() + .method(SdkHttpMethod.fromValue(request.getMethod().name())) + .uri(uri); + + requestBuilder.contentStreamProvider(() -> new ByteArrayInputStream(request.getBody().getBytes())); + requestBuilder.headers(headerArrayToMap(request.getHeaders())); + + return requestBuilder.build(); + } + + private static Map> headerArrayToMap(final HttpHeaders headers) { + Map> headersMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + for (Map.Entry header : headers) { + if (!skipHeader(header)) { + headersMap.put(header.getKey(), headersMap + .getOrDefault(header.getKey(), + new LinkedList<>(Collections.singletonList(header.getValue())))); + } + } + return headersMap; + } + + private static boolean skipHeader(final Map.Entry header) { + return (CONTENT_LENGTH.equalsIgnoreCase(header.getKey()) + && ZERO_CONTENT_LENGTH.equals(header.getValue())) // Strip Content-Length: 0 + || HOST.equalsIgnoreCase(header.getKey()); // Host comes from endpoint + } + + private void modifyOutgoingRequest(Request request, SdkHttpFullRequest signedRequest) { + resetHeaders(request, signedRequest); + } + + private void resetHeaders(Request request, SdkHttpFullRequest signedRequest) { + request.getHeaders().clear(); + + for (Map.Entry> headerEntry : signedRequest.headers().entrySet()) { + for (String value : headerEntry.getValue()) { + request.getHeaders().add(headerEntry.getKey(), value); + } + } + } +} diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Chain.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Chain.java index f99a24b4b4..1f98c0031a 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Chain.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Chain.java @@ -20,7 +20,7 @@ public static ChainBuilder sendApacheCommonLogPostRequest(final int batchSize) { public static ChainBuilder sendApacheCommonLogPostRequest(final String name, final int batchSize) { return CoreDsl.exec( HttpDsl.http(name) - .post("/log/ingest") + .post(PathTarget.getPath()) .body(CoreDsl.StringBody(Templates.apacheCommonLogTemplate(batchSize)))); } } diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/PathTarget.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/PathTarget.java new file mode 100644 index 0000000000..835f71707b --- /dev/null +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/PathTarget.java @@ -0,0 +1,10 @@ +package org.opensearch.dataprepper.test.performance.tools; + +public class PathTarget { + private static final String PATH_PROPERTY_NAME = "path"; + private static final String DEFAULT_PATH = "/log/ingest"; + + public static String getPath() { + return System.getProperty(PATH_PROPERTY_NAME, DEFAULT_PATH); + } +} diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Protocol.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Protocol.java index 9b1439b986..43f33c2646 100644 --- a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Protocol.java +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/Protocol.java @@ -12,11 +12,13 @@ public final class Protocol { private Protocol() { } - private static final String http = "http"; - private static final String https = "https"; + private static final String HTTP = "http"; + private static final String HTTPS = "https"; - public static final String localhost = "localhost"; - private static final String host = System.getProperty("host", localhost); + private static final String HTTP_PROTOCOL = System.getProperty("protocol", HTTP); + + public static final String LOCALHOST = "localhost"; + private static final String HOST = System.getProperty("host", LOCALHOST); private static final Integer defaultPort = 2021; private static final Integer port = Integer.getInteger("port", defaultPort); @@ -26,36 +28,17 @@ private static String asUrl(final String protocol, final String host, final Inte } public static HttpProtocolBuilder httpProtocol() { - return httpProtocol(http, host, port); - } - - public static HttpProtocolBuilder httpProtocol(final String host) { - return httpProtocol(http, host, port); - } - - public static HttpProtocolBuilder httpsProtocol() { - return httpProtocol(https, host, port); - } - - public static HttpProtocolBuilder httpsProtocol(final String host) { - return httpProtocol(https, host, port); + return httpProtocol(HTTP_PROTOCOL, HOST, port); } public static HttpProtocolBuilder httpsProtocol(final Integer port) { - return httpProtocol(https, host, port); - } - - public static HttpProtocolBuilder httpsProtocol(final String host, final Integer port) { - return httpProtocol(https, host, port); - } - - public static HttpProtocolBuilder httpProtocol(final String protocol, final String host) { - return httpProtocol(protocol, host, port); + return httpProtocol(HTTPS, HOST, port); } - public static HttpProtocolBuilder httpProtocol(final String protocol, final String host, final Integer port) { + private static HttpProtocolBuilder httpProtocol(final String protocol, final String host, final Integer port) { return HttpDsl.http .baseUrl(asUrl(protocol, host, port)) + .sign(SignerProvider.getSigner()) .acceptHeader("application/json") .header("Content-Type", "application/json"); } diff --git a/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/SignerProvider.java b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/SignerProvider.java new file mode 100644 index 0000000000..87c85fe86c --- /dev/null +++ b/performance-test/src/gatling/java/org/opensearch/dataprepper/test/performance/tools/SignerProvider.java @@ -0,0 +1,19 @@ +package org.opensearch.dataprepper.test.performance.tools; + +import io.gatling.http.client.Request; + +import java.util.function.Consumer; + +public class SignerProvider { + private static final Consumer NO_OP_SIGNER = r -> { }; + + public static Consumer getSigner() { + String authentication = System.getProperty("authentication"); + + if(AwsRequestSigner.SIGNER_NAME.equals(authentication)) { + return new AwsRequestSigner(); + } + + return NO_OP_SIGNER; + } +} diff --git a/release/archives/build.gradle b/release/archives/build.gradle index dcb19752f8..327a84ec16 100644 --- a/release/archives/build.gradle +++ b/release/archives/build.gradle @@ -13,11 +13,24 @@ subprojects { apply plugin: 'de.undercouch.download' apply plugin: 'distribution' + configurations { + allDependencyJarsFromMain { + canBeConsumed = false + canBeResolved = true + } + } + dependencies { /* required to resolve below issue with aws sdk JAXB is unavailable. Will fallback to SDK implementation which may be less performant */ implementation 'javax.xml.bind:jaxb-api:2.3.1' + + /** + * Creates a configuration on the data-prepper-main runtimeClasspath. + * https://docs.gradle.org/current/userguide/cross_project_publications.html + */ + allDependencyJarsFromMain(project(path: ':data-prepper-main', configuration: 'allDependencyJars')) } ext { @@ -111,14 +124,14 @@ subprojects { dependsOn tarTask.name file = tarTask.archiveFile.get().asFile.absolutePath bucket = awsS3Bucket - key = "${destinationKeyPath}/${tarTask.archiveName}" + key = "${destinationKeyPath}/${tarTask.archiveFileName.get()}" } tasks.create(name: "upload${platformWithArchitecture}TarWithJDKToS3", type: S3Upload) { dependsOn tarWithJDKTask.name file = tarWithJDKTask.archiveFile.get().asFile.absolutePath bucket = awsS3Bucket - key = "${destinationKeyPath}/${tarWithJDKTask.archiveName}" + key = "${destinationKeyPath}/${tarWithJDKTask.archiveFileName.get()}" } } @@ -135,7 +148,9 @@ CopySpec archiveToTar() { duplicatesStrategy = DuplicatesStrategy.EXCLUDE into('lib') { from project(':data-prepper-main').jar.archivePath - from project(':data-prepper-main').configurations.runtimeClasspath + from configurations.runtimeClasspath + //from allDependencyJarsFromMain(project(path: ':data-prepper-main', configuration: 'allDependencyJars')).runtimeClasspath + //from project(':data-prepper-main').configurations.allDependencyJars fileMode 0755 } into('examples') { diff --git a/release/docker/build.gradle b/release/docker/build.gradle index faf10b0b44..be96705aaf 100644 --- a/release/docker/build.gradle +++ b/release/docker/build.gradle @@ -12,8 +12,8 @@ docker { tag "${project.rootProject.name}", "${project.version}" files project(':release:archives:linux').tasks.getByName('linuxx64DistTar').archivePath files "${project.projectDir}/config/default-data-prepper-config.yaml", "${project.projectDir}/config/default-keystore.p12" - buildArgs(['ARCHIVE_FILE' : project(':release:archives:linux').tasks.getByName('linuxx64DistTar').archiveName, - 'ARCHIVE_FILE_UNPACKED' : project(':release:archives:linux').tasks.getByName('linuxx64DistTar').archiveName.replace('.tar.gz', ''), + buildArgs(['ARCHIVE_FILE' : project(':release:archives:linux').tasks.getByName('linuxx64DistTar').archiveFileName.get(), + 'ARCHIVE_FILE_UNPACKED' : project(':release:archives:linux').tasks.getByName('linuxx64DistTar').archiveFileName.get().replace('.tar.gz', ''), 'CONFIG_FILEPATH' : '/usr/share/data-prepper/config/data-prepper-config.yaml', 'PIPELINE_FILEPATH' : '/usr/share/data-prepper/pipelines/pipelines.yaml']) dockerfile file('Dockerfile') diff --git a/release/smoke-tests/otel-span-exporter/requirements.txt b/release/smoke-tests/otel-span-exporter/requirements.txt index 631dcdfab3..6b6caec0f0 100644 --- a/release/smoke-tests/otel-span-exporter/requirements.txt +++ b/release/smoke-tests/otel-span-exporter/requirements.txt @@ -1,9 +1,9 @@ backoff==1.10.0 -certifi==2022.12.7 +certifi==2023.7.22 charset-normalizer==2.0.9 Deprecated==1.2.13 googleapis-common-protos==1.53.0 -grpcio==1.50.0 +grpcio==1.53.0 idna==3.3 opentelemetry-api==1.7.1 opentelemetry-exporter-otlp==1.7.1 diff --git a/release/staging-resources-cdk/package-lock.json b/release/staging-resources-cdk/package-lock.json index 2475d21ab5..83b8d7a0bd 100644 --- a/release/staging-resources-cdk/package-lock.json +++ b/release/staging-resources-cdk/package-lock.json @@ -8,7 +8,7 @@ "name": "staging-resources-cdk", "version": "0.1.0", "dependencies": { - "aws-cdk-lib": "2.80.0", + "aws-cdk-lib": "2.88.0", "constructs": "^10.0.0", "source-map-support": "^0.5.16" }, @@ -42,9 +42,9 @@ } }, "node_modules/@aws-cdk/asset-awscli-v1": { - "version": "2.2.199", - "resolved": "https://registry.npmjs.org/@aws-cdk/asset-awscli-v1/-/asset-awscli-v1-2.2.199.tgz", - "integrity": "sha512-zNdD2OxALdsdQaRZBpTfMTuudxV+4jLMznJIvVj6O+OqCru4m5UtgVQmyApW1z2H9s4/06ovVt20aXe2G8Ta+w==" + "version": "2.2.200", + "resolved": "https://registry.npmjs.org/@aws-cdk/asset-awscli-v1/-/asset-awscli-v1-2.2.200.tgz", + "integrity": "sha512-Kf5J8DfJK4wZFWT2Myca0lhwke7LwHcHBo+4TvWOGJrFVVKVuuiLCkzPPRBQQVDj0Vtn2NBokZAz8pfMpAqAKg==" }, "node_modules/@aws-cdk/asset-kubectl-v20": { "version": "2.1.2", @@ -108,9 +108,9 @@ } }, "node_modules/@babel/core/node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -165,9 +165,9 @@ } }, "node_modules/@babel/helper-compilation-targets/node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -1718,9 +1718,9 @@ } }, "node_modules/aws-cdk-lib": { - "version": "2.80.0", - "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.80.0.tgz", - "integrity": "sha512-PoqD3Yms5I0ajuTi071nTW/hpkH3XsdyZzn5gYsPv0qD7mqP3h6Qr+6RiGx+yQ1KcVFyxWdX15uK+DsC0KwvcQ==", + "version": "2.88.0", + "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.88.0.tgz", + "integrity": "sha512-bmhokh30HVeqlotWaoEmK7mKB9SJbJwpbsiVgmYe3JcMu8DposHQqaIPI7LnC+dg015tZaxUsExxOYBEw+vntQ==", "bundleDependencies": [ "@balena/dockerignore", "case", @@ -1734,9 +1734,9 @@ "yaml" ], "dependencies": { - "@aws-cdk/asset-awscli-v1": "^2.2.177", - "@aws-cdk/asset-kubectl-v20": "^2.1.1", - "@aws-cdk/asset-node-proxy-agent-v5": "^2.0.148", + "@aws-cdk/asset-awscli-v1": "^2.2.200", + "@aws-cdk/asset-kubectl-v20": "^2.1.2", + "@aws-cdk/asset-node-proxy-agent-v5": "^2.0.165", "@balena/dockerignore": "^1.0.2", "case": "1.6.3", "fs-extra": "^11.1.1", @@ -1744,7 +1744,7 @@ "jsonschema": "^1.4.1", "minimatch": "^3.1.2", "punycode": "^2.3.0", - "semver": "^7.5.1", + "semver": "^7.5.4", "table": "^6.8.1", "yaml": "1.10.2" }, @@ -1960,7 +1960,7 @@ } }, "node_modules/aws-cdk-lib/node_modules/semver": { - "version": "7.5.1", + "version": "7.5.4", "inBundle": true, "license": "ISC", "dependencies": { @@ -2112,9 +2112,9 @@ } }, "node_modules/babel-plugin-istanbul/node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -4235,9 +4235,9 @@ } }, "node_modules/istanbul-lib-instrument/node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -5046,9 +5046,9 @@ } }, "node_modules/make-dir/node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -5280,9 +5280,9 @@ } }, "node_modules/normalize-package-data/node_modules/semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", "dev": true, "bin": { "semver": "bin/semver" @@ -6353,9 +6353,9 @@ } }, "node_modules/sane/node_modules/semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", "dev": true, "bin": { "semver": "bin/semver" @@ -6420,9 +6420,9 @@ } }, "node_modules/semver": { - "version": "7.5.3", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.3.tgz", - "integrity": "sha512-QBlUtyVk/5EeHbi7X0fw6liDZc7BBmEaSYn01fMU1OUYbf6GPsbTtd8WmnqbI20SeycoHSeiybkE/q1Q+qlThQ==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dev": true, "dependencies": { "lru-cache": "^6.0.0" @@ -7175,9 +7175,9 @@ } }, "node_modules/tough-cookie": { - "version": "4.1.2", - "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-4.1.2.tgz", - "integrity": "sha512-G9fqXWoYFZgTc2z8Q5zaHy/vJMjm+WV0AkAeHxVCQiEB1b+dGvWzFW6QV07cY5jQ5gRkeid2qIkzkxUnmoQZUQ==", + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-4.1.3.tgz", + "integrity": "sha512-aX/y5pVRkfRnfmuX+OdbSdXvPe6ieKX/G2s7e98f4poJHnqH3281gDPm/metm6E/WRamfx7WC4HUqkWHfQHprw==", "dev": true, "dependencies": { "psl": "^1.1.33", @@ -7822,9 +7822,9 @@ } }, "@aws-cdk/asset-awscli-v1": { - "version": "2.2.199", - "resolved": "https://registry.npmjs.org/@aws-cdk/asset-awscli-v1/-/asset-awscli-v1-2.2.199.tgz", - "integrity": "sha512-zNdD2OxALdsdQaRZBpTfMTuudxV+4jLMznJIvVj6O+OqCru4m5UtgVQmyApW1z2H9s4/06ovVt20aXe2G8Ta+w==" + "version": "2.2.200", + "resolved": "https://registry.npmjs.org/@aws-cdk/asset-awscli-v1/-/asset-awscli-v1-2.2.200.tgz", + "integrity": "sha512-Kf5J8DfJK4wZFWT2Myca0lhwke7LwHcHBo+4TvWOGJrFVVKVuuiLCkzPPRBQQVDj0Vtn2NBokZAz8pfMpAqAKg==" }, "@aws-cdk/asset-kubectl-v20": { "version": "2.1.2", @@ -7875,9 +7875,9 @@ }, "dependencies": { "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true } } @@ -7921,9 +7921,9 @@ }, "dependencies": { "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true } } @@ -9097,13 +9097,13 @@ } }, "aws-cdk-lib": { - "version": "2.80.0", - "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.80.0.tgz", - "integrity": "sha512-PoqD3Yms5I0ajuTi071nTW/hpkH3XsdyZzn5gYsPv0qD7mqP3h6Qr+6RiGx+yQ1KcVFyxWdX15uK+DsC0KwvcQ==", + "version": "2.88.0", + "resolved": "https://registry.npmjs.org/aws-cdk-lib/-/aws-cdk-lib-2.88.0.tgz", + "integrity": "sha512-bmhokh30HVeqlotWaoEmK7mKB9SJbJwpbsiVgmYe3JcMu8DposHQqaIPI7LnC+dg015tZaxUsExxOYBEw+vntQ==", "requires": { - "@aws-cdk/asset-awscli-v1": "^2.2.177", - "@aws-cdk/asset-kubectl-v20": "^2.1.1", - "@aws-cdk/asset-node-proxy-agent-v5": "^2.0.148", + "@aws-cdk/asset-awscli-v1": "^2.2.200", + "@aws-cdk/asset-kubectl-v20": "^2.1.2", + "@aws-cdk/asset-node-proxy-agent-v5": "^2.0.165", "@balena/dockerignore": "^1.0.2", "case": "1.6.3", "fs-extra": "^11.1.1", @@ -9111,7 +9111,7 @@ "jsonschema": "^1.4.1", "minimatch": "^3.1.2", "punycode": "^2.3.0", - "semver": "^7.5.1", + "semver": "^7.5.4", "table": "^6.8.1", "yaml": "1.10.2" }, @@ -9248,7 +9248,7 @@ "bundled": true }, "semver": { - "version": "7.5.1", + "version": "7.5.4", "bundled": true, "requires": { "lru-cache": "^6.0.0" @@ -9354,9 +9354,9 @@ } }, "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true } } @@ -10983,9 +10983,9 @@ }, "dependencies": { "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true } } @@ -11616,9 +11616,9 @@ }, "dependencies": { "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true } } @@ -11810,9 +11810,9 @@ }, "dependencies": { "semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", "dev": true } } @@ -12608,9 +12608,9 @@ "dev": true }, "semver": { - "version": "5.7.1", - "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.1.tgz", - "integrity": "sha512-sauaDf/PZdVgrLTNYHRtpXa1iRiKcaebiKQ1BJdpQlWH2lCvexQdX55snPFyK7QzpudqbCI0qXFfOasHdyNDGQ==", + "version": "5.7.2", + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.2.tgz", + "integrity": "sha512-cBznnQ9KjJqU67B52RMC65CMarK2600WFnbkcaiwWq3xy/5haFJlshgnpjovMVJ+Hff49d8GEn0b87C5pDQ10g==", "dev": true }, "shebang-command": { @@ -12659,9 +12659,9 @@ } }, "semver": { - "version": "7.5.3", - "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.3.tgz", - "integrity": "sha512-QBlUtyVk/5EeHbi7X0fw6liDZc7BBmEaSYn01fMU1OUYbf6GPsbTtd8WmnqbI20SeycoHSeiybkE/q1Q+qlThQ==", + "version": "7.5.4", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.5.4.tgz", + "integrity": "sha512-1bCSESV6Pv+i21Hvpxp3Dx+pSD8lIPt8uVjRrxAUt/nbswYc+tK6Y2btiULjd4+fnq15PX+nqQDC7Oft7WkwcA==", "dev": true, "requires": { "lru-cache": "^6.0.0" @@ -13268,9 +13268,9 @@ } }, "tough-cookie": { - "version": "4.1.2", - "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-4.1.2.tgz", - "integrity": "sha512-G9fqXWoYFZgTc2z8Q5zaHy/vJMjm+WV0AkAeHxVCQiEB1b+dGvWzFW6QV07cY5jQ5gRkeid2qIkzkxUnmoQZUQ==", + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-4.1.3.tgz", + "integrity": "sha512-aX/y5pVRkfRnfmuX+OdbSdXvPe6ieKX/G2s7e98f4poJHnqH3281gDPm/metm6E/WRamfx7WC4HUqkWHfQHprw==", "dev": true, "requires": { "psl": "^1.1.33", diff --git a/release/staging-resources-cdk/package.json b/release/staging-resources-cdk/package.json index b293856d63..d9e237b71e 100644 --- a/release/staging-resources-cdk/package.json +++ b/release/staging-resources-cdk/package.json @@ -25,7 +25,7 @@ "typescript": "~3.9.7" }, "dependencies": { - "aws-cdk-lib": "2.80.0", + "aws-cdk-lib": "2.88.0", "constructs": "^10.0.0", "source-map-support": "^0.5.16" } diff --git a/settings.gradle b/settings.gradle index bbbcfb471c..43a1c5c7e6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -34,6 +34,9 @@ dependencyResolutionManagement { version('spring', '5.3.28') library('spring-core', 'org.springframework', 'spring-core').versionRef('spring') library('spring-context', 'org.springframework', 'spring-context').versionRef('spring') + version('bouncycastle', '1.76') + library('bouncycastle-bcprov', 'org.bouncycastle', 'bcprov-jdk18on').versionRef('bouncycastle') + library('bouncycastle-bcpkix', 'org.bouncycastle', 'bcpkix-jdk18on').versionRef('bouncycastle') version('guava', '32.0.1-jre') library('guava-core', 'com.google.guava', 'guava').versionRef('guava') library('commons-lang3', 'org.apache.commons', 'commons-lang3').version('3.13.0')