From 51c6765cb126ed413b1b3320f2da63982605e1d1 Mon Sep 17 00:00:00 2001 From: Ivan Tse Date: Thu, 1 Aug 2024 13:59:15 -0700 Subject: [PATCH 1/5] PersonalizeSink: add client and configuration classes Signed-off-by: Ivan Tse --- .../personalize-sink/build.gradle | 66 ++++++ .../sink/personalize/ClientFactory.java | 45 ++++ .../sink/personalize/PersonalizeSink.java | 102 +++++++++ .../personalize/PersonalizeSinkService.java | 69 ++++++ .../AwsAuthenticationOptions.java | 76 +++++++ .../PersonalizeSinkConfiguration.java | 140 ++++++++++++ .../dataset/DatasetTypeOptions.java | 33 +++ .../sink/personalize/ClientFactoryTest.java | 98 +++++++++ .../sink/personalize/PersonalizeSinkTest.java | 85 ++++++++ .../AwsAuthenticationOptionsTest.java | 121 +++++++++++ .../PersonalizeSinkConfigurationTest.java | 205 ++++++++++++++++++ .../dataset/DatasetTypeOptionsTest.java | 38 ++++ settings.gradle | 1 + 13 files changed, 1079 insertions(+) create mode 100644 data-prepper-plugins/personalize-sink/build.gradle create mode 100644 data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java create mode 100644 data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java create mode 100644 data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java create mode 100644 data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java create mode 100644 data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java create mode 100644 data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptions.java create mode 100644 data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java create mode 100644 data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java create mode 100644 data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java create mode 100644 data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfigurationTest.java create mode 100644 data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptionsTest.java diff --git a/data-prepper-plugins/personalize-sink/build.gradle b/data-prepper-plugins/personalize-sink/build.gradle new file mode 100644 index 0000000000..1c4e408a5f --- /dev/null +++ b/data-prepper-plugins/personalize-sink/build.gradle @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +dependencies { + implementation project(':data-prepper-api') + implementation project(path: ':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:aws-plugin-api') + implementation 'io.micrometer:micrometer-core' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation libs.commons.compress + implementation 'joda-time:joda-time:2.12.7' + implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' + implementation 'software.amazon.awssdk:personalizeevents' + implementation 'software.amazon.awssdk:sts' + implementation 'software.amazon.awssdk:arns' + implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.9.22' + implementation libs.avro.core + implementation(libs.hadoop.common) { + exclude group: 'org.eclipse.jetty' + exclude group: 'org.apache.hadoop', module: 'hadoop-auth' + exclude group: 'org.apache.zookeeper', module: 'zookeeper' + } + implementation libs.parquet.avro + implementation 'software.amazon.awssdk:apache-client' + implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.9.22' + implementation libs.commons.lang3 + testImplementation project(':data-prepper-test-common') + testImplementation testLibs.slf4j.simple +} + +test { + useJUnitPlatform() +} + +sourceSets { + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + srcDir file('src/integrationTest/java') + } + resources.srcDir file('src/integrationTest/resources') + } +} + +configurations { + integrationTestImplementation.extendsFrom testImplementation + integrationTestRuntime.extendsFrom testRuntime +} + +task integrationTest(type: Test) { + group = 'verification' + testClassesDirs = sourceSets.integrationTest.output.classesDirs + + useJUnitPlatform() + + classpath = sourceSets.integrationTest.runtimeClasspath + + filter { + includeTestsMatching '*IT' + } +} diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java new file mode 100644 index 0000000000..44575bb491 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.personalize; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClient; + +public final class ClientFactory { + private ClientFactory() { } + + static PersonalizeEventsClient createPersonalizeEventsClient(final PersonalizeSinkConfiguration personalizeSinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsOptions awsCredentialsOptions = convertToCredentialsOptions(personalizeSinkConfig.getAwsAuthenticationOptions()); + final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); + + return PersonalizeEventsClient.builder() + .region(personalizeSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .credentialsProvider(awsCredentialsProvider) + .overrideConfiguration(createOverrideConfiguration(personalizeSinkConfig)).build(); + } + + private static ClientOverrideConfiguration createOverrideConfiguration(final PersonalizeSinkConfiguration personalizeSinkConfig) { + final RetryPolicy retryPolicy = RetryPolicy.builder().numRetries(personalizeSinkConfig.getMaxRetries()).build(); + return ClientOverrideConfiguration.builder() + .retryPolicy(retryPolicy) + .build(); + } + + private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) { + return AwsCredentialsOptions.builder() + .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) + .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) + .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) + .build(); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java new file mode 100644 index 0000000000..b55c944b25 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.personalize; + +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.AbstractSink; +import org.opensearch.dataprepper.model.sink.Sink; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; +import org.opensearch.dataprepper.plugins.sink.personalize.dataset.DatasetTypeOptions; +import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; + +/** + * Implementation class of personalize-sink plugin. It is responsible for receiving the collection of + * {@link Event} and uploading to amazon personalize. + */ +@DataPrepperPlugin(name = "personalize", pluginType = Sink.class, pluginConfigurationType = PersonalizeSinkConfiguration.class) +public class PersonalizeSink extends AbstractSink> { + + private static final Logger LOG = LoggerFactory.getLogger(PersonalizeSink.class); + + private final PersonalizeSinkConfiguration personalizeSinkConfig; + private volatile boolean sinkInitialized; + private final PersonalizeSinkService personalizeSinkService; + private final SinkContext sinkContext; + + /** + * @param pluginSetting dp plugin settings. + * @param personalizeSinkConfig personalize sink configurations. + * @param sinkContext sink context + * @param awsCredentialsSupplier aws credentials + * @param pluginFactory dp plugin factory. + */ + @DataPrepperPluginConstructor + public PersonalizeSink(final PluginSetting pluginSetting, + final PersonalizeSinkConfiguration personalizeSinkConfig, + final PluginFactory pluginFactory, + final SinkContext sinkContext, + final AwsCredentialsSupplier awsCredentialsSupplier) { + super(pluginSetting); + this.personalizeSinkConfig = personalizeSinkConfig; + this.sinkContext = sinkContext; + + sinkInitialized = Boolean.FALSE; + + final PersonalizeEventsClient personalizeEventsClient = ClientFactory.createPersonalizeEventsClient(personalizeSinkConfig, awsCredentialsSupplier); + + personalizeSinkService = new PersonalizeSinkService(personalizeSinkConfig, pluginMetrics); + } + + @Override + public boolean isReady() { + return sinkInitialized; + } + + @Override + public void doInitialize() { + try { + doInitializeInternal(); + } catch (InvalidPluginConfigurationException e) { + LOG.error("Invalid plugin configuration, Hence failed to initialize personalize-sink plugin."); + this.shutdown(); + throw e; + } catch (Exception e) { + LOG.error("Failed to initialize personalize-sink plugin."); + this.shutdown(); + throw e; + } + } + + /** + * Initialize {@link PersonalizeSinkService} + */ + private void doInitializeInternal() { + sinkInitialized = Boolean.TRUE; + } + + /** + * @param records Records to be output + */ + @Override + public void doOutput(final Collection> records) { + personalizeSinkService.output(records); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java new file mode 100644 index 0000000000..027efbd669 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.personalize; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Timer; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Collection; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Class responsible for creating PersonalizeEventsClient object, check thresholds, + * get new buffer and write records into buffer. + */ +public class PersonalizeSinkService { + + private static final Logger LOG = LoggerFactory.getLogger(PersonalizeSinkService.class); + public static final String RECORDS_SUCCEEDED = "personalizeRecordsSucceeded"; + public static final String RECORDS_FAILED = "personalizeRecordsFailed"; + public static final String RECORDS_INVALID = "personalizeRecordsInvalid"; + public static final String REQUESTS_THROTTLED = "personalizeRequestsThrottled"; + public static final String REQUEST_LATENCY = "personalizeRequestLatency"; + + private final PersonalizeSinkConfiguration personalizeSinkConfig; + private final Lock reentrantLock; + private final int maxRetries; + private final Counter recordsSucceededCounter; + private final Counter recordsFailedCounter; + private final Counter recordsInvalidCounter; + private final Counter requestsThrottledCounter; + private final Timer requestLatencyTimer; + + /** + * @param personalizeSinkConfig personalize sink related configuration. + * @param pluginMetrics metrics. + */ + public PersonalizeSinkService(final PersonalizeSinkConfiguration personalizeSinkConfig, + final PluginMetrics pluginMetrics) { + this.personalizeSinkConfig = personalizeSinkConfig; + reentrantLock = new ReentrantLock(); + + maxRetries = personalizeSinkConfig.getMaxRetries(); + + recordsSucceededCounter = pluginMetrics.counter(RECORDS_SUCCEEDED); + recordsFailedCounter = pluginMetrics.counter(RECORDS_FAILED); + recordsInvalidCounter = pluginMetrics.counter(RECORDS_INVALID); + requestsThrottledCounter = pluginMetrics.counter(REQUESTS_THROTTLED); + requestLatencyTimer = pluginMetrics.timer(REQUEST_LATENCY); + } + + /** + * @param records received records and add into buffer. + */ + void output(Collection> records) { + LOG.info("{} records received", records.size()); + return; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java new file mode 100644 index 0000000000..e7abc5c980 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.personalize.configuration; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.GroupSequence; +import jakarta.validation.constraints.AssertTrue; +import jakarta.validation.constraints.Size; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.arns.Arn; + +import java.util.Map; +import java.util.Optional; + +@GroupSequence({AwsAuthenticationOptions.class, PersonalizeAdvancedValidation.class}) +public class AwsAuthenticationOptions { + private static final String AWS_IAM_ROLE = "role"; + private static final String AWS_IAM = "iam"; + + @JsonProperty("region") + @Size(min = 1, message = "Region cannot be empty string") + private String awsRegion; + + @JsonProperty("sts_role_arn") + @Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters") + private String awsStsRoleArn; + + @JsonProperty("sts_external_id") + @Size(min = 2, max = 1224, message = "awsStsExternalId length should be between 2 and 1224 characters") + private String awsStsExternalId; + + @JsonProperty("sts_header_overrides") + @Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override") + private Map awsStsHeaderOverrides; + + @AssertTrue(message = "sts_role_arn must be an IAM Role", groups = PersonalizeAdvancedValidation.class) + boolean isValidStsRoleArn() { + final Arn arn = getArn(); + boolean status = true; + if (!AWS_IAM.equals(arn.service())) { + status = false; + } + final Optional resourceType = arn.resource().resourceType(); + if (resourceType.isEmpty() || !resourceType.get().equals(AWS_IAM_ROLE)) { + status = false; + } + return status; + } + + private Arn getArn() { + try { + return Arn.fromString(awsStsRoleArn); + } catch (final Exception e) { + throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn)); + } + } + + public Region getAwsRegion() { + return awsRegion != null ? Region.of(awsRegion) : null; + } + + public String getAwsStsRoleArn() { + return awsStsRoleArn; + } + + public String getAwsStsExternalId() { + return awsStsExternalId; + } + + public Map getAwsStsHeaderOverrides() { + return awsStsHeaderOverrides; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java new file mode 100644 index 0000000000..2ceec177fe --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.dataprepper.plugins.sink.personalize.configuration; + +import jakarta.validation.GroupSequence; +import org.opensearch.dataprepper.plugins.sink.personalize.dataset.DatasetTypeOptions; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import jakarta.validation.constraints.AssertTrue; +import software.amazon.awssdk.arns.Arn; + +import java.util.List; +import java.util.Optional; + +interface PersonalizeAdvancedValidation {} + +/** + * personalize sink configuration class contains properties, used to read yaml configuration. + */ +@GroupSequence({PersonalizeSinkConfiguration.class, PersonalizeAdvancedValidation.class}) +public class PersonalizeSinkConfiguration { + private static final int DEFAULT_RETRIES = 10; + private static final String AWS_PERSONALIZE = "personalize"; + private static final String AWS_PERSONALIZE_DATASET = "dataset"; + private static final List DATASET_ARN_REQUIRED_LIST = List.of(DatasetTypeOptions.USERS, DatasetTypeOptions.ITEMS); + + @JsonProperty("aws") + @NotNull + @Valid + private AwsAuthenticationOptions awsAuthenticationOptions; + + @JsonProperty("dataset_type") + @NotNull + @Valid + private DatasetTypeOptions datasetType; + + @JsonProperty("dataset_arn") + private String datasetArn; + + @JsonProperty("tracking_id") + private String trackingId; + + @JsonProperty("document_root_key") + private String documentRootKey; + + @JsonProperty("max_retries") + private int maxRetries = DEFAULT_RETRIES; + + @AssertTrue(message = "A dataset arn is required for items and users datasets.", groups = PersonalizeAdvancedValidation.class) + boolean isDatasetArnProvidedWhenNeeded() { + if (DATASET_ARN_REQUIRED_LIST.contains(datasetType)) { + return datasetArn != null; + } + return true; + } + + @AssertTrue(message = "dataset_arn must be a Personalize Dataset arn", groups = PersonalizeAdvancedValidation.class) + boolean isValidDatasetArn() { + if (datasetArn == null) { + return true; + } + final Arn arn = getArn(); + boolean status = true; + if (!AWS_PERSONALIZE.equals(arn.service())) { + status = false; + } + final Optional resourceType = arn.resource().resourceType(); + if (resourceType.isEmpty() || !resourceType.get().equals(AWS_PERSONALIZE_DATASET)) { + status = false; + } + return status; + } + + private Arn getArn() { + try { + return Arn.fromString(datasetArn); + } catch (final Exception e) { + throw new IllegalArgumentException(String.format("Invalid ARN format for datasetArn. Check the format of %s", datasetArn), e); + } + } + + @AssertTrue(message = "A tracking id is required for interactions dataset.", groups = PersonalizeAdvancedValidation.class) + boolean isTrackingIdProvidedWhenNeeded() { + if (DatasetTypeOptions.INTERACTIONS.equals(datasetType)) { + return trackingId != null; + } + return true; + } + + /** + * Aws Authentication configuration Options. + * @return aws authentication options. + */ + public AwsAuthenticationOptions getAwsAuthenticationOptions() { + return awsAuthenticationOptions; + } + + /** + * Dataset type configuration Options. + * @return dataset type option object. + */ + public DatasetTypeOptions getDatasetType() { + return datasetType; + } + + /** + * Dataset arn for Personalize Dataset. + * @return dataset arn string. + */ + public String getDatasetArn() { + return datasetArn; + } + + /** + * Tracking id for Personalize Event Tracker. + * @return tracking id string. + */ + public String getTrackingId() { + return trackingId; + } + + /** + * Tracking id for Personalize Event Tracker. + * @return document root key string. + */ + public String getDocumentRootKey() { + return documentRootKey; + } + + /** + * Personalize client retries configuration Options. + * @return maximum retries value. + */ + public int getMaxRetries() { + return maxRetries; + } +} \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptions.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptions.java new file mode 100644 index 0000000000..cc6791f0a6 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptions.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.personalize.dataset; + +import com.fasterxml.jackson.annotation.JsonCreator; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Defines all the dataset types enumerations. + */ +public enum DatasetTypeOptions { + USERS("users"), + ITEMS("items"), + INTERACTIONS("interactions"); + + private final String option; + private static final Map OPTIONS_MAP = Arrays.stream(DatasetTypeOptions.values()) + .collect(Collectors.toMap(value -> value.option, value -> value)); + + DatasetTypeOptions(final String option) { + this.option = option.toLowerCase(); + } + + @JsonCreator + static DatasetTypeOptions fromOptionValue(final String option) { + return OPTIONS_MAP.get(option); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java new file mode 100644 index 0000000000..7b2b0f5b33 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java @@ -0,0 +1,98 @@ +package org.opensearch.dataprepper.plugins.sink.personalize; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.AwsAuthenticationOptions; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClient; +import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClientBuilder; + +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ClientFactoryTest { + @Mock + private PersonalizeSinkConfiguration personalizeSinkConfig; + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private AwsAuthenticationOptions awsAuthenticationOptions; + + @BeforeEach + void setUp() { + when(personalizeSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + } + + @Test + void createPersonalizeEventsClient_with_real_PersonalizeEventsClient() { + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + final PersonalizeEventsClient personalizeEventsClient = ClientFactory.createPersonalizeEventsClient(personalizeSinkConfig, awsCredentialsSupplier); + + assertThat(personalizeEventsClient, notNullValue()); + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void createPersonalizeEventsClient_provides_correct_inputs(final String regionString) { + final Region region = Region.of(regionString); + final String stsRoleArn = UUID.randomUUID().toString(); + final String externalId = UUID.randomUUID().toString(); + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); + when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); + when(awsAuthenticationOptions.getAwsStsExternalId()).thenReturn(externalId); + when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); + + final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); + + final PersonalizeEventsClientBuilder personalizeEventsClientBuilder = mock(PersonalizeEventsClientBuilder.class); + when(personalizeEventsClientBuilder.region(region)).thenReturn(personalizeEventsClientBuilder); + when(personalizeEventsClientBuilder.credentialsProvider(any())).thenReturn(personalizeEventsClientBuilder); + when(personalizeEventsClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(personalizeEventsClientBuilder); + try(final MockedStatic personalizeEventsClientMockedStatic = mockStatic(PersonalizeEventsClient.class)) { + personalizeEventsClientMockedStatic.when(PersonalizeEventsClient::builder) + .thenReturn(personalizeEventsClientBuilder); + ClientFactory.createPersonalizeEventsClient(personalizeSinkConfig, awsCredentialsSupplier); + } + + final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); + verify(personalizeEventsClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); + + final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); + + assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider)); + + final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); + + final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); + assertThat(actualCredentialsOptions.getRegion(), equalTo(region)); + assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); + assertThat(actualCredentialsOptions.getStsExternalId(), equalTo(externalId)); + assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(stsHeaderOverrides)); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java new file mode 100644 index 0000000000..556efbb81a --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java @@ -0,0 +1,85 @@ +package org.opensearch.dataprepper.plugins.sink.personalize; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.sink.SinkContext; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.AwsAuthenticationOptions; +import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; +import org.opensearch.dataprepper.plugins.sink.personalize.dataset.DatasetTypeOptions; +import software.amazon.awssdk.regions.Region; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class PersonalizeSinkTest { + public static final int MAX_RETRIES = 10; + public static final String REGION = "us-east-1"; + public static final String SINK_PLUGIN_NAME = "personalize"; + public static final String SINK_PIPELINE_NAME = "personalize-sink-pipeline"; + public static final String DATASET_ARN = "arn:aws:iam::123456789012:dataset/test"; + public static final String TRACKING_ID = "1233513241"; + private PersonalizeSinkConfiguration personalizeSinkConfig; + private PersonalizeSink personalizeSink; + private PluginSetting pluginSetting; + private PluginFactory pluginFactory; + private AwsCredentialsSupplier awsCredentialsSupplier; + private SinkContext sinkContext; + + @BeforeEach + void setup() { + personalizeSinkConfig = mock(PersonalizeSinkConfiguration.class); + sinkContext = mock(SinkContext.class); + AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class); + pluginSetting = mock(PluginSetting.class); + pluginFactory = mock(PluginFactory.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + + when(personalizeSinkConfig.getMaxRetries()).thenReturn(MAX_RETRIES); + when(personalizeSinkConfig.getDatasetArn()).thenReturn(DATASET_ARN); + when(personalizeSinkConfig.getDatasetType()).thenReturn(DatasetTypeOptions.USERS); + when(personalizeSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(REGION)); + when(pluginSetting.getName()).thenReturn(SINK_PLUGIN_NAME); + when(pluginSetting.getPipelineName()).thenReturn(SINK_PIPELINE_NAME); + } + + private PersonalizeSink createObjectUnderTest() { + return new PersonalizeSink(pluginSetting, personalizeSinkConfig, pluginFactory, sinkContext, awsCredentialsSupplier); + } + + @Test + void test_personalize_sink_plugin_isReady_positive() { + personalizeSink = createObjectUnderTest(); + Assertions.assertNotNull(personalizeSink); + personalizeSink.doInitialize(); + assertTrue(personalizeSink.isReady(), "personalize sink is not initialized and not ready to work"); + } + + @Test + void test_personalize_Sink_plugin_isReady_negative() { + personalizeSink = createObjectUnderTest(); + Assertions.assertNotNull(personalizeSink); + assertFalse(personalizeSink.isReady(), "personalize sink is initialized and ready to work"); + } + + @Test + void test_doOutput_with_empty_records() { + personalizeSink = createObjectUnderTest(); + Assertions.assertNotNull(personalizeSink); + personalizeSink.doInitialize(); + Collection> records = new ArrayList<>(); + personalizeSink.doOutput(records); + } +} diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java new file mode 100644 index 0000000000..33b9757375 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java @@ -0,0 +1,121 @@ +package org.opensearch.dataprepper.plugins.sink.personalize.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.regions.Region; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; + +class AwsAuthenticationOptionsTest { + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + } + + @ParameterizedTest + @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) + void getAwsRegion_returns_Region_of(final String regionString) { + final Region expectedRegionObject = Region.of(regionString); + final Map jsonMap = Map.of("region", regionString); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsRegion(), equalTo(expectedRegionObject)); + } + + @Test + void getAwsRegion_returns_null_when_region_is_null() { + final Map jsonMap = Collections.emptyMap(); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsRegion(), nullValue()); + } + + @Test + void getAwsStsRoleArn_returns_value_from_deserialized_JSON() { + final String stsRoleArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("sts_role_arn", stsRoleArn); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsRoleArn(), equalTo(stsRoleArn)); + } + + @Test + void getAwsStsRoleArn_returns_null_if_not_in_JSON() { + final Map jsonMap = Collections.emptyMap(); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsRoleArn(), nullValue()); + } + + @Test + void isValidStsRoleArn_returns_true_for_valid_IAM_role() { + final String stsRoleArn = "arn:aws:iam::123456789012:role/test"; + final Map jsonMap = Map.of("sts_role_arn", stsRoleArn); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertTrue(objectUnderTest.isValidStsRoleArn()); + } + + @Test + void isValidStsRoleArn_returns_false_when_arn_service_is_not_IAM() { + final String stsRoleArn = "arn:aws:personalize::123456789012:role/test"; + final Map jsonMap = Map.of("sts_role_arn", stsRoleArn); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertFalse(objectUnderTest.isValidStsRoleArn()); + } + + @Test + void isValidStsRoleArn_returns_false_when_arn_resource_is_not_role() { + final String stsRoleArn = "arn:aws:iam::123456789012:dataset/test"; + final Map jsonMap = Map.of("sts_role_arn", stsRoleArn); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertFalse(objectUnderTest.isValidStsRoleArn()); + } + + @Test + void isValidStsRoleArn_invalid_arn_throws_IllegalArgumentException() { + final String stsRoleArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("sts_role_arn", stsRoleArn); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.isValidStsRoleArn()); + } + + @Test + void getAwsStsExternalId_returns_value_from_deserialized_JSON() { + final String stsExternalId = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("sts_external_id", stsExternalId); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsExternalId(), equalTo(stsExternalId)); + } + + @Test + void getAwsStsExternalId_returns_null_if_not_in_JSON() { + final Map jsonMap = Collections.emptyMap(); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsExternalId(), nullValue()); + } + + @Test + void getAwsStsHeaderOverrides_returns_value_from_deserialized_JSON() { + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + final Map jsonMap = Map.of("sts_header_overrides", stsHeaderOverrides); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsHeaderOverrides(), equalTo(stsHeaderOverrides)); + } + + @Test + void getAwsStsHeaderOverrides_returns_null_if_not_in_JSON() { + final Map jsonMap = Collections.emptyMap(); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertThat(objectUnderTest.getAwsStsHeaderOverrides(), nullValue()); + } +} diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfigurationTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfigurationTest.java new file mode 100644 index 0000000000..67bc690623 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfigurationTest.java @@ -0,0 +1,205 @@ +package org.opensearch.dataprepper.plugins.sink.personalize.configuration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.sink.personalize.dataset.DatasetTypeOptions; + +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class PersonalizeSinkConfigurationTest { + private static final int DEFAULT_RETRIES = 10; + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + objectMapper = new ObjectMapper(); + } + + @Test + void getDatasetType_returns_value_from_deserialized_JSON() { + final String datasetType = "users"; + final Map jsonMap = Map.of("dataset_type", datasetType); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getDatasetType(), equalTo(DatasetTypeOptions.USERS)); + } + + @Test + void getDatasetArn_returns_null_when_datasetArn_is_null() { + final Map jsonMap = Collections.emptyMap(); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getDatasetArn(), nullValue()); + } + + @Test + void getDatasetArn_returns_value_from_deserialized_JSON() { + final String datasetArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("dataset_arn", datasetArn); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getDatasetArn(), equalTo(datasetArn)); + } + + @Test + void isDatasetArnProvidedWhenNeeded_returns_true_when_datasetType_is_interactions_and_datasetArn_is_null() { + final String datasetType = "interactions"; + final Map jsonMap = Map.of("dataset_type", datasetType); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertTrue(objectUnderTest.isDatasetArnProvidedWhenNeeded()); + } + + @Test + void isDatasetArnProvidedWhenNeeded_returns_true_when_datasetType_is_users_and_datasetArn_is_provided() { + final String datasetType = "users"; + final String datasetArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("dataset_type", datasetType, "dataset_arn", datasetArn); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertTrue(objectUnderTest.isDatasetArnProvidedWhenNeeded()); + } + + @Test + void isDatasetArnProvidedWhenNeeded_returns_false_when_datasetType_is_users_and_datasetArn_is_not_provided() { + final String datasetType = "users"; + final Map jsonMap = Map.of("dataset_type", datasetType); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertFalse(objectUnderTest.isDatasetArnProvidedWhenNeeded()); + } + + @Test + void isDatasetArnProvidedWhenNeeded_returns_true_when_datasetType_is_items_and_datasetArn_is_provided() { + final String datasetType = "items"; + final String datasetArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("dataset_type", datasetType, "dataset_arn", datasetArn); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertTrue(objectUnderTest.isDatasetArnProvidedWhenNeeded()); + } + + @Test + void isDatasetArnProvidedWhenNeeded_returns_false_when_datasetType_is_items_and_datasetArn_is_not_provided() { + final String datasetType = "items"; + final Map jsonMap = Map.of("dataset_type", datasetType); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertFalse(objectUnderTest.isDatasetArnProvidedWhenNeeded()); + } + + @Test + void isValidDatasetArn_returns_true_for_valid_dataset_arn() { + final String datasetArn = "arn:aws:personalize::123456789012:dataset/test"; + final Map jsonMap = Map.of("dataset_arn", datasetArn); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertTrue(objectUnderTest.isValidDatasetArn()); + } + + @Test + void isValidDatasetArn_returns_false_when_arn_service_is_not_personalize() { + final String datasetArn = "arn:aws:iam::123456789012:dataset/test"; + final Map jsonMap = Map.of("dataset_arn", datasetArn); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertFalse(objectUnderTest.isValidDatasetArn()); + } + + @Test + void isValidDatasetArn_returns_false_when_arn_resource_is_not_dataset() { + final String datasetArn = "arn:aws:personalize::123456789012:role/test"; + final Map jsonMap = Map.of("dataset_arn", datasetArn); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertFalse(objectUnderTest.isValidDatasetArn()); + } + + @Test + void isValidStsRoleArn_invalid_arn_throws_IllegalArgumentException() { + final String datasetArn = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("dataset_arn", datasetArn); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThrows(IllegalArgumentException.class, () -> objectUnderTest.isValidDatasetArn()); + } + + + + @Test + void getTrackingId_returns_null_when_trackingId_is_null() { + final Map jsonMap = Collections.emptyMap(); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getTrackingId(), nullValue()); + } + + @Test + void getTrackingId_returns_value_from_deserialized_JSON() { + final String trackingId = UUID.randomUUID().toString();; + final Map jsonMap = Map.of("tracking_id", trackingId); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getTrackingId(), equalTo(trackingId)); + } + + @Test + void isTrackingIdProvidedWhenNeeded_returns_false_when_datasetType_is_interactions_and_trackingId_is_not_provided() { + final String datasetType = "interactions"; + final Map jsonMap = Map.of("dataset_type", datasetType); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertFalse(objectUnderTest.isTrackingIdProvidedWhenNeeded()); + } + + @Test + void isTrackingIdProvidedWhenNeeded_returns_true_when_datasetType_is_interactions_and_trackingId_is_provided() { + final String datasetType = "interactions"; + final String trackingId = UUID.randomUUID().toString(); + final Map jsonMap = Map.of("dataset_type", datasetType, "tracking_id", trackingId); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertTrue(objectUnderTest.isTrackingIdProvidedWhenNeeded()); + } + + @Test + void isTrackingIdProvidedWhenNeeded_returns_true_when_datasetType_is_users_and_trackingId_is_not_provided() { + final String datasetType = "users"; + final Map jsonMap = Map.of("dataset_type", datasetType); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertTrue(objectUnderTest.isTrackingIdProvidedWhenNeeded()); + } + + @Test + void isTrackingIdProvidedWhenNeeded_returns_true_when_datasetType_is_items_and_trackingId_is_not_provided() { + final String datasetType = "items"; + final Map jsonMap = Map.of("dataset_type", datasetType); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertTrue(objectUnderTest.isTrackingIdProvidedWhenNeeded()); + } + + + @Test + void getDocumentRootKey_returns_null_when_documentRootKey_is_null() { + final Map jsonMap = Collections.emptyMap(); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getDocumentRootKey(), nullValue()); + } + + @Test + void getDocumentRootKey_returns_value_from_deserialized_JSON() { + final String documentRootKey = UUID.randomUUID().toString();; + final Map jsonMap = Map.of("document_root_key", documentRootKey); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getDocumentRootKey(), equalTo(documentRootKey)); + } + + @Test + void getMaxRetries_returns_default_when_maxRetries_is_null() { + final Map jsonMap = Collections.emptyMap(); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getMaxRetries(), equalTo(DEFAULT_RETRIES)); + } + + @Test + void getMaxRetries_returns_value_from_deserialized_JSON() { + final int maxRetries = 3; + final Map jsonMap = Map.of("max_retries", maxRetries); + final PersonalizeSinkConfiguration objectUnderTest = objectMapper.convertValue(jsonMap, PersonalizeSinkConfiguration.class); + assertThat(objectUnderTest.getMaxRetries(), equalTo(maxRetries)); + } +} diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptionsTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptionsTest.java new file mode 100644 index 0000000000..40b1821d0a --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/dataset/DatasetTypeOptionsTest.java @@ -0,0 +1,38 @@ +package org.opensearch.dataprepper.plugins.sink.personalize.dataset; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@ExtendWith(MockitoExtension.class) +class DatasetTypeOptionsTest { + @Test + void notNull_test() { + assertNotNull(DatasetTypeOptions.ITEMS); + } + + @Test + void fromOptionValue_users_test() { + DatasetTypeOptions datasetTypeOptions = DatasetTypeOptions.fromOptionValue("users"); + assertNotNull(datasetTypeOptions); + assertThat(datasetTypeOptions.toString(), equalTo("USERS")); + } + + @Test + void fromOptionValue_items_test() { + DatasetTypeOptions datasetTypeOptions = DatasetTypeOptions.fromOptionValue("items"); + assertNotNull(datasetTypeOptions); + assertThat(datasetTypeOptions.toString(), equalTo("ITEMS")); + } + + @Test + void fromOptionValue_interactions_test() { + DatasetTypeOptions datasetTypeOptions = DatasetTypeOptions.fromOptionValue("interactions"); + assertNotNull(datasetTypeOptions); + assertThat(datasetTypeOptions.toString(), equalTo("INTERACTIONS")); + } +} diff --git a/settings.gradle b/settings.gradle index cb7e888c53..618ebc5f38 100644 --- a/settings.gradle +++ b/settings.gradle @@ -169,6 +169,7 @@ include 'data-prepper-plugins:buffer-common' //include 'data-prepper-plugins:http-sink' //include 'data-prepper-plugins:sns-sink' //include 'data-prepper-plugins:prometheus-sink' +//include 'data-prepper-plugins:personalize-sink' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' include 'data-prepper-plugins:decompress-processor' From b928bf0dd4a77e5b73c5090834f727a52e4a5d8a Mon Sep 17 00:00:00 2001 From: Ivan Tse Date: Fri, 2 Aug 2024 14:02:41 -0700 Subject: [PATCH 2/5] Addressed review comments and cleaned up build.gradle file Signed-off-by: Ivan Tse --- .../personalize-sink/build.gradle | 18 ------------------ .../sink/personalize/ClientFactory.java | 2 +- .../sink/personalize/PersonalizeSink.java | 10 +++++----- .../personalize/PersonalizeSinkService.java | 4 ++-- .../sink/personalize/PersonalizeSinkTest.java | 4 ++-- settings.gradle | 2 +- 6 files changed, 11 insertions(+), 29 deletions(-) diff --git a/data-prepper-plugins/personalize-sink/build.gradle b/data-prepper-plugins/personalize-sink/build.gradle index 1c4e408a5f..bf408a04b8 100644 --- a/data-prepper-plugins/personalize-sink/build.gradle +++ b/data-prepper-plugins/personalize-sink/build.gradle @@ -10,32 +10,14 @@ dependencies { implementation 'io.micrometer:micrometer-core' implementation 'com.fasterxml.jackson.core:jackson-core' implementation 'com.fasterxml.jackson.core:jackson-databind' - implementation libs.commons.compress - implementation 'joda-time:joda-time:2.12.7' implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final' - implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' implementation 'software.amazon.awssdk:personalizeevents' implementation 'software.amazon.awssdk:sts' implementation 'software.amazon.awssdk:arns' - implementation 'org.jetbrains.kotlin:kotlin-stdlib:1.9.22' - implementation libs.avro.core - implementation(libs.hadoop.common) { - exclude group: 'org.eclipse.jetty' - exclude group: 'org.apache.hadoop', module: 'hadoop-auth' - exclude group: 'org.apache.zookeeper', module: 'zookeeper' - } - implementation libs.parquet.avro - implementation 'software.amazon.awssdk:apache-client' - implementation 'org.jetbrains.kotlin:kotlin-stdlib-common:1.9.22' - implementation libs.commons.lang3 testImplementation project(':data-prepper-test-common') testImplementation testLibs.slf4j.simple } -test { - useJUnitPlatform() -} - sourceSets { integrationTest { java { diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java index 44575bb491..97e5ed05be 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java @@ -14,7 +14,7 @@ import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClient; -public final class ClientFactory { +final class ClientFactory { private ClientFactory() { } static PersonalizeEventsClient createPersonalizeEventsClient(final PersonalizeSinkConfiguration personalizeSinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java index b55c944b25..e7fb522041 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java @@ -31,7 +31,7 @@ * Implementation class of personalize-sink plugin. It is responsible for receiving the collection of * {@link Event} and uploading to amazon personalize. */ -@DataPrepperPlugin(name = "personalize", pluginType = Sink.class, pluginConfigurationType = PersonalizeSinkConfiguration.class) +@DataPrepperPlugin(name = "aws_personalize", pluginType = Sink.class, pluginConfigurationType = PersonalizeSinkConfiguration.class) public class PersonalizeSink extends AbstractSink> { private static final Logger LOG = LoggerFactory.getLogger(PersonalizeSink.class); @@ -58,7 +58,7 @@ public PersonalizeSink(final PluginSetting pluginSetting, this.personalizeSinkConfig = personalizeSinkConfig; this.sinkContext = sinkContext; - sinkInitialized = Boolean.FALSE; + sinkInitialized = false; final PersonalizeEventsClient personalizeEventsClient = ClientFactory.createPersonalizeEventsClient(personalizeSinkConfig, awsCredentialsSupplier); @@ -75,11 +75,11 @@ public void doInitialize() { try { doInitializeInternal(); } catch (InvalidPluginConfigurationException e) { - LOG.error("Invalid plugin configuration, Hence failed to initialize personalize-sink plugin."); + LOG.error("The personalize sink has an invalid configuration and cannot initialize."); this.shutdown(); throw e; } catch (Exception e) { - LOG.error("Failed to initialize personalize-sink plugin."); + LOG.error("Failed to initialize personalize sink."); this.shutdown(); throw e; } @@ -89,7 +89,7 @@ public void doInitialize() { * Initialize {@link PersonalizeSinkService} */ private void doInitializeInternal() { - sinkInitialized = Boolean.TRUE; + sinkInitialized = true; } /** diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java index 027efbd669..dbb69914f4 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java @@ -23,7 +23,7 @@ * Class responsible for creating PersonalizeEventsClient object, check thresholds, * get new buffer and write records into buffer. */ -public class PersonalizeSinkService { +class PersonalizeSinkService { private static final Logger LOG = LoggerFactory.getLogger(PersonalizeSinkService.class); public static final String RECORDS_SUCCEEDED = "personalizeRecordsSucceeded"; @@ -63,7 +63,7 @@ public PersonalizeSinkService(final PersonalizeSinkConfiguration personalizeSink * @param records received records and add into buffer. */ void output(Collection> records) { - LOG.info("{} records received", records.size()); + LOG.trace("{} records received", records.size()); return; } } \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java index 556efbb81a..3a112bb70b 100644 --- a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java @@ -64,14 +64,14 @@ void test_personalize_sink_plugin_isReady_positive() { personalizeSink = createObjectUnderTest(); Assertions.assertNotNull(personalizeSink); personalizeSink.doInitialize(); - assertTrue(personalizeSink.isReady(), "personalize sink is not initialized and not ready to work"); + assertTrue(personalizeSink.isReady(), "Expected the personalize sink to be ready, but it is reporting it is not ready."); } @Test void test_personalize_Sink_plugin_isReady_negative() { personalizeSink = createObjectUnderTest(); Assertions.assertNotNull(personalizeSink); - assertFalse(personalizeSink.isReady(), "personalize sink is initialized and ready to work"); + assertFalse(personalizeSink.isReady(), "Expected the personalize sink to report that it is not ready, but it is reporting it is ready."); } @Test diff --git a/settings.gradle b/settings.gradle index 618ebc5f38..12d19c11cb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -169,7 +169,7 @@ include 'data-prepper-plugins:buffer-common' //include 'data-prepper-plugins:http-sink' //include 'data-prepper-plugins:sns-sink' //include 'data-prepper-plugins:prometheus-sink' -//include 'data-prepper-plugins:personalize-sink' +include 'data-prepper-plugins:personalize-sink' include 'data-prepper-plugins:dissect-processor' include 'data-prepper-plugins:dynamodb-source' include 'data-prepper-plugins:decompress-processor' From 78c3d338047cea57e7ba7b5d2a92b9616dbd9c33 Mon Sep 17 00:00:00 2001 From: Ivan Tse Date: Wed, 7 Aug 2024 09:44:05 -0700 Subject: [PATCH 3/5] Fix checkstyle Signed-off-by: Ivan Tse --- .../plugins/sink/personalize/PersonalizeSink.java | 13 +------------ .../sink/personalize/PersonalizeSinkService.java | 1 - .../PersonalizeAdvancedValidation.java | 4 ++++ .../configuration/PersonalizeSinkConfiguration.java | 2 -- .../sink/personalize/PersonalizeSinkTest.java | 1 - 5 files changed, 5 insertions(+), 16 deletions(-) create mode 100644 data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeAdvancedValidation.java diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java index e7fb522041..6e362cd20a 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java @@ -6,7 +6,6 @@ package org.opensearch.dataprepper.plugins.sink.personalize; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; @@ -18,14 +17,11 @@ import org.opensearch.dataprepper.model.sink.Sink; import org.opensearch.dataprepper.model.sink.SinkContext; import org.opensearch.dataprepper.plugins.sink.personalize.configuration.PersonalizeSinkConfiguration; -import org.opensearch.dataprepper.plugins.sink.personalize.dataset.DatasetTypeOptions; import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.Collection; -import java.util.List; /** * Implementation class of personalize-sink plugin. It is responsible for receiving the collection of @@ -73,7 +69,7 @@ public boolean isReady() { @Override public void doInitialize() { try { - doInitializeInternal(); + sinkInitialized = true; } catch (InvalidPluginConfigurationException e) { LOG.error("The personalize sink has an invalid configuration and cannot initialize."); this.shutdown(); @@ -85,13 +81,6 @@ public void doInitialize() { } } - /** - * Initialize {@link PersonalizeSinkService} - */ - private void doInitializeInternal() { - sinkInitialized = true; - } - /** * @param records Records to be output */ diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java index dbb69914f4..80ea94bcf1 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkService.java @@ -14,7 +14,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; import java.util.Collection; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeAdvancedValidation.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeAdvancedValidation.java new file mode 100644 index 0000000000..f48c1d9466 --- /dev/null +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeAdvancedValidation.java @@ -0,0 +1,4 @@ +package org.opensearch.dataprepper.plugins.sink.personalize.configuration; + +interface PersonalizeAdvancedValidation { +} diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java index 2ceec177fe..6c698daff8 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java @@ -15,8 +15,6 @@ import java.util.List; import java.util.Optional; -interface PersonalizeAdvancedValidation {} - /** * personalize sink configuration class contains properties, used to read yaml configuration. */ diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java index 3a112bb70b..09098bc9ca 100644 --- a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java @@ -4,7 +4,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; -import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginFactory; From 3656651d4de251e9803cfb7de17b246d3d1cba27 Mon Sep 17 00:00:00 2001 From: Ivan Tse Date: Wed, 7 Aug 2024 15:36:56 -0700 Subject: [PATCH 4/5] Remove unnecessary try-catch Signed-off-by: Ivan Tse --- .../plugins/sink/personalize/PersonalizeSink.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java index 6e362cd20a..a93e58875c 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSink.java @@ -10,7 +10,6 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; @@ -68,17 +67,7 @@ public boolean isReady() { @Override public void doInitialize() { - try { - sinkInitialized = true; - } catch (InvalidPluginConfigurationException e) { - LOG.error("The personalize sink has an invalid configuration and cannot initialize."); - this.shutdown(); - throw e; - } catch (Exception e) { - LOG.error("Failed to initialize personalize sink."); - this.shutdown(); - throw e; - } + sinkInitialized = true; } /** From 09892e0766d79dcc0c6280eb2ba87afc70a8d759 Mon Sep 17 00:00:00 2001 From: Ivan Tse Date: Thu, 8 Aug 2024 19:51:13 -0700 Subject: [PATCH 5/5] Change AwsAuthenticationOptions to be optional Signed-off-by: Ivan Tse --- .../sink/personalize/ClientFactory.java | 17 +++++++- .../AwsAuthenticationOptions.java | 8 +++- .../PersonalizeSinkConfiguration.java | 1 - .../sink/personalize/ClientFactoryTest.java | 41 ++++++++++++++++++- .../sink/personalize/PersonalizeSinkTest.java | 3 +- .../AwsAuthenticationOptionsTest.java | 12 +++++- 6 files changed, 72 insertions(+), 10 deletions(-) diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java index 97e5ed05be..2c93fc991b 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactory.java @@ -12,6 +12,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClient; final class ClientFactory { @@ -22,7 +23,7 @@ static PersonalizeEventsClient createPersonalizeEventsClient(final PersonalizeSi final AwsCredentialsProvider awsCredentialsProvider = awsCredentialsSupplier.getProvider(awsCredentialsOptions); return PersonalizeEventsClient.builder() - .region(personalizeSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) + .region(getRegion(personalizeSinkConfig, awsCredentialsSupplier)) .credentialsProvider(awsCredentialsProvider) .overrideConfiguration(createOverrideConfiguration(personalizeSinkConfig)).build(); } @@ -35,11 +36,23 @@ private static ClientOverrideConfiguration createOverrideConfiguration(final Per } private static AwsCredentialsOptions convertToCredentialsOptions(final AwsAuthenticationOptions awsAuthenticationOptions) { + if (awsAuthenticationOptions == null) { + return AwsCredentialsOptions.builder().build(); + } return AwsCredentialsOptions.builder() - .withRegion(awsAuthenticationOptions.getAwsRegion()) + .withRegion(awsAuthenticationOptions.getAwsRegion().orElse(null)) .withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn()) .withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId()) .withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides()) .build(); } + + private static Region getRegion(final PersonalizeSinkConfiguration personalizeSinkConfig, final AwsCredentialsSupplier awsCredentialsSupplier) { + Region defaultRegion = awsCredentialsSupplier.getDefaultRegion().orElse(null); + if (personalizeSinkConfig.getAwsAuthenticationOptions() == null) { + return defaultRegion; + } else { + return personalizeSinkConfig.getAwsAuthenticationOptions().getAwsRegion().orElse(defaultRegion); + } + } } \ No newline at end of file diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java index e7abc5c980..ba7e96d43d 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptions.java @@ -38,6 +38,9 @@ public class AwsAuthenticationOptions { @AssertTrue(message = "sts_role_arn must be an IAM Role", groups = PersonalizeAdvancedValidation.class) boolean isValidStsRoleArn() { + if (awsStsRoleArn == null) { + return true; + } final Arn arn = getArn(); boolean status = true; if (!AWS_IAM.equals(arn.service())) { @@ -58,8 +61,9 @@ private Arn getArn() { } } - public Region getAwsRegion() { - return awsRegion != null ? Region.of(awsRegion) : null; + public Optional getAwsRegion() { + Region region = awsRegion != null ? Region.of(awsRegion) : null; + return Optional.ofNullable(region); } public String getAwsStsRoleArn() { diff --git a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java index 6c698daff8..95c9f1d5c9 100644 --- a/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java +++ b/data-prepper-plugins/personalize-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/PersonalizeSinkConfiguration.java @@ -26,7 +26,6 @@ public class PersonalizeSinkConfiguration { private static final List DATASET_ARN_REQUIRED_LIST = List.of(DatasetTypeOptions.USERS, DatasetTypeOptions.ITEMS); @JsonProperty("aws") - @NotNull @Valid private AwsAuthenticationOptions awsAuthenticationOptions; diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java index 7b2b0f5b33..6b1ad7f80a 100644 --- a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/ClientFactoryTest.java @@ -19,11 +19,14 @@ import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClient; import software.amazon.awssdk.services.personalizeevents.PersonalizeEventsClientBuilder; +import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -48,12 +51,46 @@ void setUp() { @Test void createPersonalizeEventsClient_with_real_PersonalizeEventsClient() { - when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.US_EAST_1); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Optional.of(Region.US_EAST_1)); final PersonalizeEventsClient personalizeEventsClient = ClientFactory.createPersonalizeEventsClient(personalizeSinkConfig, awsCredentialsSupplier); assertThat(personalizeEventsClient, notNullValue()); } + @Test + void createPersonalizeEventsClient_provides_correct_inputs_for_null_awsAuthenticationOptions() { + when(personalizeSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); + final AwsCredentialsProvider expectedCredentialsProvider = mock(AwsCredentialsProvider.class); + when(awsCredentialsSupplier.getProvider(any())).thenReturn(expectedCredentialsProvider); + + final PersonalizeEventsClientBuilder personalizeEventsClientBuilder = mock(PersonalizeEventsClientBuilder.class); + when(personalizeEventsClientBuilder.region(any())).thenReturn(personalizeEventsClientBuilder); + when(personalizeEventsClientBuilder.credentialsProvider(any())).thenReturn(personalizeEventsClientBuilder); + when(personalizeEventsClientBuilder.overrideConfiguration(any(ClientOverrideConfiguration.class))).thenReturn(personalizeEventsClientBuilder); + try(final MockedStatic personalizeEventsClientMockedStatic = mockStatic(PersonalizeEventsClient.class)) { + personalizeEventsClientMockedStatic.when(PersonalizeEventsClient::builder) + .thenReturn(personalizeEventsClientBuilder); + ClientFactory.createPersonalizeEventsClient(personalizeSinkConfig, awsCredentialsSupplier); + } + + final ArgumentCaptor credentialsProviderArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsProvider.class); + verify(personalizeEventsClientBuilder).credentialsProvider(credentialsProviderArgumentCaptor.capture()); + + final AwsCredentialsProvider actualCredentialsProvider = credentialsProviderArgumentCaptor.getValue(); + + assertThat(actualCredentialsProvider, equalTo(expectedCredentialsProvider)); + + final ArgumentCaptor optionsArgumentCaptor = ArgumentCaptor.forClass(AwsCredentialsOptions.class); + verify(awsCredentialsSupplier).getProvider(optionsArgumentCaptor.capture()); + + final AwsCredentialsOptions actualCredentialsOptions = optionsArgumentCaptor.getValue(); + assertThat(actualCredentialsOptions, is(notNullValue())); + assertThat(actualCredentialsOptions.getRegion(), equalTo(null)); + assertThat(actualCredentialsOptions.getStsRoleArn(), equalTo(null)); + assertThat(actualCredentialsOptions.getStsExternalId(), equalTo(null)); + assertThat(actualCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap())); + } + @ParameterizedTest @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) void createPersonalizeEventsClient_provides_correct_inputs(final String regionString) { @@ -61,7 +98,7 @@ void createPersonalizeEventsClient_provides_correct_inputs(final String regionSt final String stsRoleArn = UUID.randomUUID().toString(); final String externalId = UUID.randomUUID().toString(); final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); - when(awsAuthenticationOptions.getAwsRegion()).thenReturn(region); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Optional.of(region)); when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn); when(awsAuthenticationOptions.getAwsStsExternalId()).thenReturn(externalId); when(awsAuthenticationOptions.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java index 09098bc9ca..852e75630f 100644 --- a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/PersonalizeSinkTest.java @@ -16,6 +16,7 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -49,7 +50,7 @@ void setup() { when(personalizeSinkConfig.getDatasetArn()).thenReturn(DATASET_ARN); when(personalizeSinkConfig.getDatasetType()).thenReturn(DatasetTypeOptions.USERS); when(personalizeSinkConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions); - when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Region.of(REGION)); + when(awsAuthenticationOptions.getAwsRegion()).thenReturn(Optional.of(Region.of(REGION))); when(pluginSetting.getName()).thenReturn(SINK_PLUGIN_NAME); when(pluginSetting.getPipelineName()).thenReturn(SINK_PIPELINE_NAME); } diff --git a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java index 33b9757375..29be309622 100644 --- a/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java +++ b/data-prepper-plugins/personalize-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/personalize/configuration/AwsAuthenticationOptionsTest.java @@ -9,6 +9,7 @@ import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; @@ -29,7 +30,7 @@ void setUp() { @ParameterizedTest @ValueSource(strings = {"us-east-1", "us-west-2", "eu-central-1"}) void getAwsRegion_returns_Region_of(final String regionString) { - final Region expectedRegionObject = Region.of(regionString); + final Optional expectedRegionObject = Optional.of(Region.of(regionString)); final Map jsonMap = Map.of("region", regionString); final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); assertThat(objectUnderTest.getAwsRegion(), equalTo(expectedRegionObject)); @@ -39,7 +40,7 @@ void getAwsRegion_returns_Region_of(final String regionString) { void getAwsRegion_returns_null_when_region_is_null() { final Map jsonMap = Collections.emptyMap(); final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); - assertThat(objectUnderTest.getAwsRegion(), nullValue()); + assertThat(objectUnderTest.getAwsRegion(), equalTo(Optional.empty())); } @Test @@ -65,6 +66,13 @@ void isValidStsRoleArn_returns_true_for_valid_IAM_role() { assertTrue(objectUnderTest.isValidStsRoleArn()); } + @Test + void isValidStsRoleArn_returns_true_for_null() { + final Map jsonMap = Collections.emptyMap(); + final AwsAuthenticationOptions objectUnderTest = objectMapper.convertValue(jsonMap, AwsAuthenticationOptions.class); + assertTrue(objectUnderTest.isValidStsRoleArn()); + } + @Test void isValidStsRoleArn_returns_false_when_arn_service_is_not_IAM() { final String stsRoleArn = "arn:aws:personalize::123456789012:role/test";