Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into feature-httpsink-refresht…
Browse files Browse the repository at this point in the history
…oken
  • Loading branch information
mallikagogoi7 authored Sep 8, 2023
2 parents 8456dee + cd194c1 commit 7000f31
Show file tree
Hide file tree
Showing 54 changed files with 941 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.opensearch.dataprepper.model.configuration.DataPrepperVersion;
import org.opensearch.dataprepper.model.configuration.PipelineModel;
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -14,9 +15,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -35,18 +35,8 @@ public PipelinesDataflowModelParser(final String pipelineConfigurationFileLocati
}

public PipelinesDataFlowModel parseConfiguration() {
try (final InputStream mergedPipelineConfigurationFiles = mergePipelineConfigurationFiles()) {
final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(mergedPipelineConfigurationFiles,
PipelinesDataFlowModel.class);

final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion();
validateDataPrepperVersion(version);

return pipelinesDataFlowModel;
} catch (IOException e) {
LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e);
}
final List<PipelinesDataFlowModel> pipelinesDataFlowModels = parsePipelineConfigurationFiles();
return mergePipelinesDataModels(pipelinesDataFlowModels);
}

private void validateDataPrepperVersion(final DataPrepperVersion version) {
Expand All @@ -57,38 +47,61 @@ private void validateDataPrepperVersion(final DataPrepperVersion version) {
}
}

private InputStream mergePipelineConfigurationFiles() throws IOException {
private List<PipelinesDataFlowModel> parsePipelineConfigurationFiles() {
final File configurationLocation = new File(pipelineConfigurationFileLocation);

if (configurationLocation.isFile()) {
return new FileInputStream(configurationLocation);
return Stream.of(configurationLocation).map(this::parsePipelineConfigurationFile)
.filter(Objects::nonNull).collect(Collectors.toList());
} else if (configurationLocation.isDirectory()) {
FileFilter yamlFilter = pathname -> (pathname.getName().endsWith(".yaml") || pathname.getName().endsWith(".yml"));
List<InputStream> configurationFiles = Stream.of(configurationLocation.listFiles(yamlFilter))
.map(file -> {
InputStream inputStream;
try {
inputStream = new FileInputStream(file);
LOG.info("Reading pipeline configuration from {}", file.getName());
} catch (FileNotFoundException e) {
inputStream = null;
LOG.warn("Pipeline configuration file {} not found", file.getName());
}
return inputStream;
})
List<PipelinesDataFlowModel> pipelinesDataFlowModels = Stream.of(configurationLocation.listFiles(yamlFilter))
.map(this::parsePipelineConfigurationFile)
.filter(Objects::nonNull)
.collect(Collectors.toList());

if (configurationFiles.isEmpty()) {
if (pipelinesDataFlowModels.isEmpty()) {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(
format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
}

return new SequenceInputStream(Collections.enumeration(configurationFiles));
return pipelinesDataFlowModels;
} else {
LOG.error("Pipelines configuration file not found at {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Pipelines configuration file not found at %s", pipelineConfigurationFileLocation));
}
}

private PipelinesDataFlowModel parsePipelineConfigurationFile(final File pipelineConfigurationFile) {
try (final InputStream pipelineConfigurationInputStream = new FileInputStream(pipelineConfigurationFile)) {
LOG.info("Reading pipeline configuration from {}", pipelineConfigurationFile.getName());
final PipelinesDataFlowModel pipelinesDataFlowModel = OBJECT_MAPPER.readValue(pipelineConfigurationInputStream,
PipelinesDataFlowModel.class);

final DataPrepperVersion version = pipelinesDataFlowModel.getDataPrepperVersion();
validateDataPrepperVersion(version);

return pipelinesDataFlowModel;
} catch (IOException e) {
if (e instanceof FileNotFoundException) {
LOG.warn("Pipeline configuration file {} not found", pipelineConfigurationFile.getName());
return null;
}
LOG.error("Failed to parse the configuration file {}", pipelineConfigurationFileLocation);
throw new ParseException(format("Failed to parse the configuration file %s", pipelineConfigurationFileLocation), e);
}
}

private PipelinesDataFlowModel mergePipelinesDataModels(
final List<PipelinesDataFlowModel> pipelinesDataFlowModels) {
final Map<String, PipelineModel> pipelinesDataFlowModelMap = pipelinesDataFlowModels.stream()
.map(PipelinesDataFlowModel::getPipelines)
.flatMap(pipelines -> pipelines.entrySet().stream())
.collect(Collectors.toUnmodifiableMap(
Map.Entry::getKey,
Map.Entry::getValue
));
return new PipelinesDataFlowModel(pipelinesDataFlowModelMap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.opensearch.dataprepper.model.configuration.PipelinesDataFlowModel;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -46,6 +47,7 @@ void parseConfiguration_from_directory_with_multiple_files_creates_the_correct_m
final PipelinesDataFlowModel actualPipelinesDataFlowModel = pipelinesDataflowModelParser.parseConfiguration();
assertThat(actualPipelinesDataFlowModel.getPipelines().keySet(),
equalTo(TestDataProvider.VALID_MULTIPLE_PIPELINE_NAMES));
assertThat(actualPipelinesDataFlowModel.getDataPrepperVersion(), nullValue());
}

@Test
Expand Down
18 changes: 18 additions & 0 deletions data-prepper-main/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,21 @@ jar {
'Main-Class': 'org.opensearch.dataprepper.DataPrepperExecute')
}
}

configurations {
allDependencyJars {
canBeConsumed = true
canBeResolved = true
extendsFrom runtimeClasspath
}
}

artifacts {
/**
* Shares a configuration which has all the necessary dependencies and can be used
* as part of the release to load all dependencies.
* https://docs.gradle.org/current/userguide/cross_project_publications.html
*/
allDependencyJars(jar)
}

3 changes: 2 additions & 1 deletion data-prepper-plugins/aws-plugin-api/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

dependencies {
implementation 'software.amazon.awssdk:auth'
implementation 'software.amazon.awssdk:apache-client'
}

test {
Expand All @@ -12,7 +13,7 @@ jacocoTestCoverageVerification {
violationRules {
rule {
limit {
minimum = 1.0
minimum = 0.99
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
/*
* Copyright OpenSearch Contributors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.dataprepper.plugins.sink.opensearch;
package org.opensearch.dataprepper.aws.api;

import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
Expand Down Expand Up @@ -48,7 +40,7 @@
* An {@link HttpRequestInterceptor} that signs requests using any AWS {@link Signer}
* and {@link AwsCredentialsProvider}.
*/
final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor {
public final class AwsRequestSigningApache4Interceptor implements HttpRequestInterceptor {

/**
* Constant to check content-length
Expand Down Expand Up @@ -90,10 +82,10 @@ final class AwsRequestSigningApacheInterceptor implements HttpRequestInterceptor
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final Region region) {
public AwsRequestSigningApache4Interceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final Region region) {
this.service = Objects.requireNonNull(service);
this.signer = Objects.requireNonNull(signer);
this.awsCredentialsProvider = Objects.requireNonNull(awsCredentialsProvider);
Expand All @@ -107,10 +99,10 @@ public AwsRequestSigningApacheInterceptor(final String service,
* @param awsCredentialsProvider source of AWS credentials for signing
* @param region signing region
*/
public AwsRequestSigningApacheInterceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final String region) {
public AwsRequestSigningApache4Interceptor(final String service,
final Signer signer,
final AwsCredentialsProvider awsCredentialsProvider,
final String region) {
this(service, signer, awsCredentialsProvider, Region.of(region));
}

Expand Down Expand Up @@ -177,7 +169,7 @@ private URI buildUri(final HttpContext context, URIBuilder uriBuilder) throws IO
}

return uriBuilder.build();
} catch (URISyntaxException e) {
} catch (final Exception e) {
throw new IOException("Invalid URI", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.aws.api;

import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpHost;
import org.apache.http.RequestLine;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpCoreContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.signer.Signer;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.regions.Region;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class AwsRequestSigningApache4InterceptorTest {

@Mock
private Signer signer;

@Mock
private AwsCredentialsProvider awsCredentialsProvider;

@Mock
private HttpEntityEnclosingRequest httpRequest;

@Mock
private HttpContext httpContext;

private AwsRequestSigningApache4Interceptor createObjectUnderTest() {
return new AwsRequestSigningApache4Interceptor("es", signer, awsCredentialsProvider, Region.US_EAST_1);
}

@Test
void invalidURI_throws_IOException() {

final RequestLine requestLine = mock(RequestLine.class);
when(requestLine.getUri()).thenReturn("http://invalid-uri.com/file[/].html\n");

when(httpRequest.getRequestLine()).thenReturn(requestLine);

final AwsRequestSigningApache4Interceptor objectUnderTest = new AwsRequestSigningApache4Interceptor("es", signer, awsCredentialsProvider, "us-east-1");

assertThrows(IOException.class, () -> objectUnderTest.process(httpRequest, httpContext));
}

@Test
void IOException_is_thrown_when_buildURI_throws_exception() {
final RequestLine requestLine = mock(RequestLine.class);
when(requestLine.getMethod()).thenReturn("GET");
when(requestLine.getUri()).thenReturn("http://localhost?param=test");
when(httpRequest.getRequestLine()).thenReturn(requestLine);

when(httpContext.getAttribute(HttpCoreContext.HTTP_TARGET_HOST)).thenThrow(RuntimeException.class);

final AwsRequestSigningApache4Interceptor objectUnderTest = createObjectUnderTest();

assertThrows(IOException.class, () -> objectUnderTest.process(httpRequest, httpContext));
}

@Test
void empty_contentStreamProvider_throws_IllegalStateException() throws IOException {
final RequestLine requestLine = mock(RequestLine.class);
when(requestLine.getMethod()).thenReturn("GET");
when(requestLine.getUri()).thenReturn("http://localhost?param=test");
when(httpRequest.getRequestLine()).thenReturn(requestLine);
when(httpRequest.getAllHeaders()).thenReturn(new BasicHeader[]{
new BasicHeader("test-name", "test-value"),
new BasicHeader("content-length", "0")
});

final HttpEntity httpEntity = mock(HttpEntity.class);
final InputStream inputStream = mock(InputStream.class);
when(httpEntity.getContent()).thenReturn(inputStream);

when((httpRequest).getEntity()).thenReturn(httpEntity);

final HttpHost httpHost = HttpHost.create("http://localhost?param=test");
when(httpContext.getAttribute(HttpCoreContext.HTTP_TARGET_HOST)).thenReturn(httpHost);

final SdkHttpFullRequest signedRequest = mock(SdkHttpFullRequest.class);
when(signedRequest.headers()).thenReturn(Map.of("test-name", List.of("test-value")));
when(signedRequest.contentStreamProvider()).thenReturn(Optional.empty());
when(signer.sign(any(SdkHttpFullRequest.class), any(ExecutionAttributes.class)))
.thenReturn(signedRequest);

final AwsRequestSigningApache4Interceptor objectUnderTest = createObjectUnderTest();

assertThrows(IllegalStateException.class, () -> objectUnderTest.process(httpRequest, httpContext));
}

@Test
void testHappyPath() throws IOException {
final RequestLine requestLine = mock(RequestLine.class);
when(requestLine.getMethod()).thenReturn("GET");
when(requestLine.getUri()).thenReturn("http://localhost?param=test");
when(httpRequest.getRequestLine()).thenReturn(requestLine);
when(httpRequest.getAllHeaders()).thenReturn(new BasicHeader[]{
new BasicHeader("test-name", "test-value"),
new BasicHeader("content-length", "0")
});

final HttpEntity httpEntity = mock(HttpEntity.class);
final InputStream inputStream = mock(InputStream.class);
when(httpEntity.getContent()).thenReturn(inputStream);

when((httpRequest).getEntity()).thenReturn(httpEntity);

final HttpHost httpHost = HttpHost.create("http://localhost?param=test");
when(httpContext.getAttribute(HttpCoreContext.HTTP_TARGET_HOST)).thenReturn(httpHost);

final SdkHttpFullRequest signedRequest = mock(SdkHttpFullRequest.class);
when(signedRequest.headers()).thenReturn(Map.of("test-name", List.of("test-value")));
final ContentStreamProvider contentStreamProvider = mock(ContentStreamProvider.class);
final InputStream contentInputStream = mock(InputStream.class);
when(contentStreamProvider.newStream()).thenReturn(contentInputStream);
when(signedRequest.contentStreamProvider()).thenReturn(Optional.of(contentStreamProvider));
when(signer.sign(any(SdkHttpFullRequest.class), any(ExecutionAttributes.class)))
.thenReturn(signedRequest);
createObjectUnderTest().process(httpRequest, httpContext);
}
}
4 changes: 2 additions & 2 deletions data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ dependencies {
implementation 'software.amazon.awssdk:acm'
implementation 'org.apache.commons:commons-compress:1.23.0'
implementation libs.commons.lang3
implementation "org.bouncycastle:bcprov-jdk15on:1.70"
implementation "org.bouncycastle:bcpkix-jdk15on:1.70"
implementation libs.bouncycastle.bcprov
implementation libs.bouncycastle.bcpkix
implementation 'org.reflections:reflections:0.10.2'
implementation 'io.micrometer:micrometer-core'
testImplementation testLibs.junit.vintage
Expand Down
Loading

0 comments on commit 7000f31

Please sign in to comment.