From 8fd743d8748ab51e8a22606197a66df502f82a57 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Wed, 6 Nov 2024 16:58:33 -0800 Subject: [PATCH] Pass the pipeline identifier as part of the data prepper config. (#5131) * Pass the pipeline identifier as part of the data prepper config. Signed-off-by: Souvik Bose * Address review comments and modify the tests Signed-off-by: Souvik Bose --------- Signed-off-by: Souvik Bose Co-authored-by: Souvik Bose --- .../kinesis/extension/KinesisLeaseConfig.java | 3 + .../kinesis/source/KinesisService.java | 13 ++- .../KinesisLeaseConfigSupplierTest.java | 5 ++ .../extension/KinesisLeaseConfigTest.java | 2 + .../kinesis/source/KinesisServiceTest.java | 79 ++++++++++++++++++- .../kinesis/source/KinesisSourceTest.java | 4 +- .../simple_pipeline_with_extensions.yaml | 1 + 7 files changed, 100 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java index 68981c5cba..e0f37d3597 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfig.java @@ -17,4 +17,7 @@ public class KinesisLeaseConfig { @JsonProperty("lease_coordination") private KinesisLeaseCoordinationTableConfig leaseCoordinationTable; + + @JsonProperty("pipeline_identifier") + private String pipelineIdentifier; } diff --git a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java index 1b4d90d895..10a33b071d 100644 --- a/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java +++ b/data-prepper-plugins/kinesis-source/src/main/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisService.java @@ -11,6 +11,7 @@ package org.opensearch.dataprepper.plugins.kinesis.source; import com.amazonaws.SdkClientException; +import lombok.Getter; import lombok.Setter; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -41,6 +42,7 @@ import software.amazon.kinesis.processor.ShardRecordProcessorFactory; import software.amazon.kinesis.retrieval.polling.PollingConfig; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,10 +59,11 @@ public class KinesisService { private final PluginMetrics pluginMetrics; private final PluginFactory pluginFactory; + @Getter private final String applicationName; + private final String tableName; private final String kclMetricsNamespaceName; - private final String pipelineName; private final AcknowledgementSetManager acknowledgementSetManager; private final KinesisSourceConfig kinesisSourceConfig; private final KinesisAsyncClient kinesisClient; @@ -96,8 +99,12 @@ public KinesisService(final KinesisSourceConfig kinesisSourceConfig, this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion()); this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion()); - this.pipelineName = pipelineDescription.getPipelineName(); - this.applicationName = pipelineName; + final String pipelineIdentifier = kinesisLeaseConfig.getPipelineIdentifier(); + if (Objects.isNull(pipelineIdentifier) || pipelineIdentifier.isEmpty()) { + this.applicationName = pipelineDescription.getPipelineName(); + } else { + this.applicationName = kinesisLeaseConfig.getPipelineIdentifier(); + } this.workerIdentifierGenerator = workerIdentifierGenerator; this.executorService = Executors.newFixedThreadPool(1); final PluginModel codecConfiguration = kinesisSourceConfig.getCodec(); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplierTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplierTest.java index 4cfc323ed5..382c48366d 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplierTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigSupplierTest.java @@ -16,6 +16,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.Optional; +import java.util.UUID; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -24,6 +25,7 @@ @ExtendWith(MockitoExtension.class) public class KinesisLeaseConfigSupplierTest { private static final String LEASE_COORDINATION_TABLE = "lease-table"; + @Mock KinesisLeaseConfig kinesisLeaseConfig; @@ -36,12 +38,15 @@ private KinesisLeaseConfigSupplier createObjectUnderTest() { @Test void testGetters() { + final String pipelineIdentifier = UUID.randomUUID().toString(); + when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(pipelineIdentifier); when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig); when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn(LEASE_COORDINATION_TABLE); when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1"); KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier = createObjectUnderTest(); assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable().getTableName(), equalTo(LEASE_COORDINATION_TABLE)); assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable().getRegion(), equalTo("us-east-1")); + assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getPipelineIdentifier(), equalTo(pipelineIdentifier)); } @Test diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java index 311e33cc9c..afab150bfc 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/extension/KinesisLeaseConfigTest.java @@ -56,6 +56,8 @@ void testConfigWithTestExtension() throws IOException { final KinesisLeaseConfig kinesisLeaseConfig = makeConfig( "src/test/resources/simple_pipeline_with_extensions.yaml"); + assertNotNull(kinesisLeaseConfig.getPipelineIdentifier()); + assertEquals(kinesisLeaseConfig.getPipelineIdentifier(), "sample-kinesis-pipeline-0123"); assertNotNull(kinesisLeaseConfig.getLeaseCoordinationTable()); assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getTableName(), "kinesis-pipeline-kcl"); assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getRegion(), "us-east-1"); diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java index 2a95cba8c7..0bd7dfb217 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisServiceTest.java @@ -56,6 +56,8 @@ import java.util.concurrent.TimeoutException; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -67,7 +69,6 @@ import static org.mockito.Mockito.when; public class KinesisServiceTest { - private final String PIPELINE_NAME = "kinesis-pipeline-test"; private final String streamId = "stream-1"; private static final String codec_plugin_name = "json"; @@ -133,8 +134,11 @@ public class KinesisServiceTest { @Mock WorkerIdentifierGenerator workerIdentifierGenerator; + private String pipelineName; + @BeforeEach void setup() { + pipelineName = UUID.randomUUID().toString(); awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); kinesisSourceConfig = mock(KinesisSourceConfig.class); kinesisStreamConfig = mock(KinesisStreamConfig.class); @@ -206,7 +210,7 @@ void setup() { when(kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(cloudWatchClient); when(kinesisClient.serviceClientConfiguration()).thenReturn(KinesisServiceClientConfiguration.builder().region(Region.US_EAST_1).build()); when(scheduler.startGracefulShutdown()).thenReturn(CompletableFuture.completedFuture(true)); - when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME); + when(pipelineDescription.getPipelineName()).thenReturn(pipelineName); when(workerIdentifierGenerator.generate()).thenReturn(UUID.randomUUID().toString()); } @@ -220,6 +224,7 @@ void testServiceStart() { KinesisService kinesisService = createObjectUnderTest(); kinesisService.start(buffer); assertNotNull(kinesisService.getScheduler(buffer)); + assertEquals(kinesisService.getApplicationName(), pipelineName); verify(workerIdentifierGenerator, times(1)).generate(); } @@ -245,6 +250,8 @@ void testCreateScheduler() { assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); assertNotNull(schedulerObjectUnderTest.processorConfig()); assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + assertNotNull(kinesisService.getApplicationName()); + assertEquals(kinesisService.getApplicationName(), pipelineName); verify(workerIdentifierGenerator, times(1)).generate(); } @@ -255,6 +262,7 @@ void testCreateSchedulerWithPollingStrategy() { pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + assertEquals(kinesisService.getApplicationName(), pipelineName); assertNotNull(schedulerObjectUnderTest); assertNotNull(schedulerObjectUnderTest.checkpointConfig()); assertNotNull(schedulerObjectUnderTest.leaseManagementConfig()); @@ -267,12 +275,12 @@ void testCreateSchedulerWithPollingStrategy() { verify(workerIdentifierGenerator, times(1)).generate(); } - @Test void testServiceStartNullBufferThrows() { KinesisService kinesisService = createObjectUnderTest(); assertThrows(IllegalStateException.class, () -> kinesisService.start(null)); + assertEquals(kinesisService.getApplicationName(), pipelineName); verify(scheduler, times(0)).run(); } @@ -283,6 +291,7 @@ void testServiceStartNullStreams() { KinesisService kinesisService = createObjectUnderTest(); assertThrows(InvalidPluginConfigurationException.class, () -> kinesisService.start(buffer)); + assertEquals(kinesisService.getApplicationName(), pipelineName); verify(scheduler, times(0)).run(); } @@ -293,6 +302,7 @@ void testServiceStartEmptyStreams() { KinesisService kinesisService = createObjectUnderTest(); assertThrows(InvalidPluginConfigurationException.class, () -> kinesisService.start(buffer)); + assertEquals(kinesisService.getApplicationName(), pipelineName); verify(scheduler, times(0)).run(); } @@ -352,4 +362,67 @@ public void testShutdownExecutorServiceInterruptedException() { verify(scheduler).shutdown(); } + @Test + void testCreateSchedulerWithValidPipelineId() { + final String pipelineIdentifier = UUID.randomUUID().toString(); + when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(pipelineIdentifier); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.checkpointConfig()); + assertNotNull(schedulerObjectUnderTest.leaseManagementConfig()); + assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + assertNotNull(schedulerObjectUnderTest.lifecycleConfig()); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); + assertNotNull(schedulerObjectUnderTest.processorConfig()); + assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + assertNotNull(kinesisService.getApplicationName()); + assertEquals(kinesisService.getApplicationName(), pipelineIdentifier); + assertNotEquals(kinesisService.getApplicationName(), pipelineName); + verify(workerIdentifierGenerator, times(1)).generate(); + } + + @Test + void testCreateSchedulerWithNullPipelineId() { + when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(null); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.checkpointConfig()); + assertNotNull(schedulerObjectUnderTest.leaseManagementConfig()); + assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + assertNotNull(schedulerObjectUnderTest.lifecycleConfig()); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); + assertNotNull(schedulerObjectUnderTest.processorConfig()); + assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + assertNotNull(kinesisService.getApplicationName()); + assertEquals(kinesisService.getApplicationName(), pipelineName); + verify(workerIdentifierGenerator, times(1)).generate(); + } + @Test + void testCreateSchedulerWithEmptyPipelineId() { + when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(""); + KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory, + pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator); + Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer); + + assertNotNull(schedulerObjectUnderTest); + assertNotNull(schedulerObjectUnderTest.checkpointConfig()); + assertNotNull(schedulerObjectUnderTest.leaseManagementConfig()); + assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON); + assertNotNull(schedulerObjectUnderTest.lifecycleConfig()); + assertNotNull(schedulerObjectUnderTest.metricsConfig()); + assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED); + assertNotNull(schedulerObjectUnderTest.processorConfig()); + assertNotNull(schedulerObjectUnderTest.retrievalConfig()); + assertNotNull(kinesisService.getApplicationName()); + assertEquals(kinesisService.getApplicationName(), pipelineName); + verify(workerIdentifierGenerator, times(1)).generate(); + } } diff --git a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java index fad335dd63..2b19c31481 100644 --- a/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java +++ b/data-prepper-plugins/kinesis-source/src/test/java/org/opensearch/dataprepper/plugins/kinesis/source/KinesisSourceTest.java @@ -53,7 +53,7 @@ public class KinesisSourceTest { private final String PIPELINE_NAME = "kinesis-pipeline-test"; private final String streamId = "stream-1"; private static final String codec_plugin_name = "json"; - + private String pipelineIdentifier; @Mock private PluginMetrics pluginMetrics; @@ -90,6 +90,7 @@ public class KinesisSourceTest { @BeforeEach void setup() { + pipelineIdentifier = UUID.randomUUID().toString(); pluginMetrics = mock(PluginMetrics.class); pluginFactory = mock(PluginFactory.class); kinesisSourceConfig = mock(KinesisSourceConfig.class); @@ -110,6 +111,7 @@ void setup() { kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class); kinesisLeaseConfig = mock(KinesisLeaseConfig.class); + when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(pipelineIdentifier); kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class); when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig); when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("table-name"); diff --git a/data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml b/data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml index 4f964cae7f..022e705f37 100644 --- a/data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml +++ b/data-prepper-plugins/kinesis-source/src/test/resources/simple_pipeline_with_extensions.yaml @@ -1,5 +1,6 @@ extensions: kinesis: + pipeline_identifier: "sample-kinesis-pipeline-0123" lease_coordination: table_name: "kinesis-pipeline-kcl" region: "us-east-1" \ No newline at end of file