Skip to content

Commit

Permalink
Pass the pipeline identifier as part of the data prepper config. (ope…
Browse files Browse the repository at this point in the history
…nsearch-project#5131)

* Pass the pipeline identifier as part of the data prepper config.

Signed-off-by: Souvik Bose <[email protected]>

* Address review comments and modify the tests

Signed-off-by: Souvik Bose <[email protected]>

---------

Signed-off-by: Souvik Bose <[email protected]>
Co-authored-by: Souvik Bose <[email protected]>
  • Loading branch information
sb2k16 and sbose2k21 authored Nov 7, 2024
1 parent cb159de commit 8fd743d
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@
public class KinesisLeaseConfig {
@JsonProperty("lease_coordination")
private KinesisLeaseCoordinationTableConfig leaseCoordinationTable;

@JsonProperty("pipeline_identifier")
private String pipelineIdentifier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package org.opensearch.dataprepper.plugins.kinesis.source;

import com.amazonaws.SdkClientException;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
Expand Down Expand Up @@ -41,6 +42,7 @@
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -57,10 +59,11 @@ public class KinesisService {
private final PluginMetrics pluginMetrics;
private final PluginFactory pluginFactory;

@Getter
private final String applicationName;

private final String tableName;
private final String kclMetricsNamespaceName;
private final String pipelineName;
private final AcknowledgementSetManager acknowledgementSetManager;
private final KinesisSourceConfig kinesisSourceConfig;
private final KinesisAsyncClient kinesisClient;
Expand Down Expand Up @@ -96,8 +99,12 @@ public KinesisService(final KinesisSourceConfig kinesisSourceConfig,
this.dynamoDbClient = kinesisClientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.kinesisClient = kinesisClientFactory.buildKinesisAsyncClient(kinesisSourceConfig.getAwsAuthenticationConfig().getAwsRegion());
this.cloudWatchClient = kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
final String pipelineIdentifier = kinesisLeaseConfig.getPipelineIdentifier();
if (Objects.isNull(pipelineIdentifier) || pipelineIdentifier.isEmpty()) {
this.applicationName = pipelineDescription.getPipelineName();
} else {
this.applicationName = kinesisLeaseConfig.getPipelineIdentifier();
}
this.workerIdentifierGenerator = workerIdentifierGenerator;
this.executorService = Executors.newFixedThreadPool(1);
final PluginModel codecConfiguration = kinesisSourceConfig.getCodec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -24,6 +25,7 @@
@ExtendWith(MockitoExtension.class)
public class KinesisLeaseConfigSupplierTest {
private static final String LEASE_COORDINATION_TABLE = "lease-table";

@Mock
KinesisLeaseConfig kinesisLeaseConfig;

Expand All @@ -36,12 +38,15 @@ private KinesisLeaseConfigSupplier createObjectUnderTest() {

@Test
void testGetters() {
final String pipelineIdentifier = UUID.randomUUID().toString();
when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(pipelineIdentifier);
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn(LEASE_COORDINATION_TABLE);
when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1");
KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier = createObjectUnderTest();
assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable().getTableName(), equalTo(LEASE_COORDINATION_TABLE));
assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable().getRegion(), equalTo("us-east-1"));
assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getPipelineIdentifier(), equalTo(pipelineIdentifier));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ void testConfigWithTestExtension() throws IOException {
final KinesisLeaseConfig kinesisLeaseConfig = makeConfig(
"src/test/resources/simple_pipeline_with_extensions.yaml");

assertNotNull(kinesisLeaseConfig.getPipelineIdentifier());
assertEquals(kinesisLeaseConfig.getPipelineIdentifier(), "sample-kinesis-pipeline-0123");
assertNotNull(kinesisLeaseConfig.getLeaseCoordinationTable());
assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getTableName(), "kinesis-pipeline-kcl");
assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getRegion(), "us-east-1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -67,7 +69,6 @@
import static org.mockito.Mockito.when;

public class KinesisServiceTest {
private final String PIPELINE_NAME = "kinesis-pipeline-test";
private final String streamId = "stream-1";
private static final String codec_plugin_name = "json";

Expand Down Expand Up @@ -133,8 +134,11 @@ public class KinesisServiceTest {
@Mock
WorkerIdentifierGenerator workerIdentifierGenerator;

private String pipelineName;

@BeforeEach
void setup() {
pipelineName = UUID.randomUUID().toString();
awsAuthenticationConfig = mock(AwsAuthenticationConfig.class);
kinesisSourceConfig = mock(KinesisSourceConfig.class);
kinesisStreamConfig = mock(KinesisStreamConfig.class);
Expand Down Expand Up @@ -206,7 +210,7 @@ void setup() {
when(kinesisClientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(cloudWatchClient);
when(kinesisClient.serviceClientConfiguration()).thenReturn(KinesisServiceClientConfiguration.builder().region(Region.US_EAST_1).build());
when(scheduler.startGracefulShutdown()).thenReturn(CompletableFuture.completedFuture(true));
when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME);
when(pipelineDescription.getPipelineName()).thenReturn(pipelineName);
when(workerIdentifierGenerator.generate()).thenReturn(UUID.randomUUID().toString());
}

Expand All @@ -220,6 +224,7 @@ void testServiceStart() {
KinesisService kinesisService = createObjectUnderTest();
kinesisService.start(buffer);
assertNotNull(kinesisService.getScheduler(buffer));
assertEquals(kinesisService.getApplicationName(), pipelineName);
verify(workerIdentifierGenerator, times(1)).generate();
}

Expand All @@ -245,6 +250,8 @@ void testCreateScheduler() {
assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED);
assertNotNull(schedulerObjectUnderTest.processorConfig());
assertNotNull(schedulerObjectUnderTest.retrievalConfig());
assertNotNull(kinesisService.getApplicationName());
assertEquals(kinesisService.getApplicationName(), pipelineName);
verify(workerIdentifierGenerator, times(1)).generate();
}

Expand All @@ -255,6 +262,7 @@ void testCreateSchedulerWithPollingStrategy() {
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator);
Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer);

assertEquals(kinesisService.getApplicationName(), pipelineName);
assertNotNull(schedulerObjectUnderTest);
assertNotNull(schedulerObjectUnderTest.checkpointConfig());
assertNotNull(schedulerObjectUnderTest.leaseManagementConfig());
Expand All @@ -267,12 +275,12 @@ void testCreateSchedulerWithPollingStrategy() {
verify(workerIdentifierGenerator, times(1)).generate();
}


@Test
void testServiceStartNullBufferThrows() {
KinesisService kinesisService = createObjectUnderTest();
assertThrows(IllegalStateException.class, () -> kinesisService.start(null));

assertEquals(kinesisService.getApplicationName(), pipelineName);
verify(scheduler, times(0)).run();
}

Expand All @@ -283,6 +291,7 @@ void testServiceStartNullStreams() {
KinesisService kinesisService = createObjectUnderTest();
assertThrows(InvalidPluginConfigurationException.class, () -> kinesisService.start(buffer));

assertEquals(kinesisService.getApplicationName(), pipelineName);
verify(scheduler, times(0)).run();
}

Expand All @@ -293,6 +302,7 @@ void testServiceStartEmptyStreams() {
KinesisService kinesisService = createObjectUnderTest();
assertThrows(InvalidPluginConfigurationException.class, () -> kinesisService.start(buffer));

assertEquals(kinesisService.getApplicationName(), pipelineName);
verify(scheduler, times(0)).run();
}

Expand Down Expand Up @@ -352,4 +362,67 @@ public void testShutdownExecutorServiceInterruptedException() {
verify(scheduler).shutdown();
}

@Test
void testCreateSchedulerWithValidPipelineId() {
final String pipelineIdentifier = UUID.randomUUID().toString();
when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(pipelineIdentifier);
KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator);
Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer);

assertNotNull(schedulerObjectUnderTest);
assertNotNull(schedulerObjectUnderTest.checkpointConfig());
assertNotNull(schedulerObjectUnderTest.leaseManagementConfig());
assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
assertNotNull(schedulerObjectUnderTest.lifecycleConfig());
assertNotNull(schedulerObjectUnderTest.metricsConfig());
assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED);
assertNotNull(schedulerObjectUnderTest.processorConfig());
assertNotNull(schedulerObjectUnderTest.retrievalConfig());
assertNotNull(kinesisService.getApplicationName());
assertEquals(kinesisService.getApplicationName(), pipelineIdentifier);
assertNotEquals(kinesisService.getApplicationName(), pipelineName);
verify(workerIdentifierGenerator, times(1)).generate();
}

@Test
void testCreateSchedulerWithNullPipelineId() {
when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(null);
KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator);
Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer);

assertNotNull(schedulerObjectUnderTest);
assertNotNull(schedulerObjectUnderTest.checkpointConfig());
assertNotNull(schedulerObjectUnderTest.leaseManagementConfig());
assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
assertNotNull(schedulerObjectUnderTest.lifecycleConfig());
assertNotNull(schedulerObjectUnderTest.metricsConfig());
assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED);
assertNotNull(schedulerObjectUnderTest.processorConfig());
assertNotNull(schedulerObjectUnderTest.retrievalConfig());
assertNotNull(kinesisService.getApplicationName());
assertEquals(kinesisService.getApplicationName(), pipelineName);
verify(workerIdentifierGenerator, times(1)).generate();
}
@Test
void testCreateSchedulerWithEmptyPipelineId() {
when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn("");
KinesisService kinesisService = new KinesisService(kinesisSourceConfig, kinesisClientFactory, pluginMetrics, pluginFactory,
pipelineDescription, acknowledgementSetManager, kinesisLeaseConfigSupplier, workerIdentifierGenerator);
Scheduler schedulerObjectUnderTest = kinesisService.createScheduler(buffer);

assertNotNull(schedulerObjectUnderTest);
assertNotNull(schedulerObjectUnderTest.checkpointConfig());
assertNotNull(schedulerObjectUnderTest.leaseManagementConfig());
assertSame(schedulerObjectUnderTest.leaseManagementConfig().initialPositionInStream().getInitialPositionInStream(), InitialPositionInStream.TRIM_HORIZON);
assertNotNull(schedulerObjectUnderTest.lifecycleConfig());
assertNotNull(schedulerObjectUnderTest.metricsConfig());
assertSame(schedulerObjectUnderTest.metricsConfig().metricsLevel(), MetricsLevel.DETAILED);
assertNotNull(schedulerObjectUnderTest.processorConfig());
assertNotNull(schedulerObjectUnderTest.retrievalConfig());
assertNotNull(kinesisService.getApplicationName());
assertEquals(kinesisService.getApplicationName(), pipelineName);
verify(workerIdentifierGenerator, times(1)).generate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class KinesisSourceTest {
private final String PIPELINE_NAME = "kinesis-pipeline-test";
private final String streamId = "stream-1";
private static final String codec_plugin_name = "json";

private String pipelineIdentifier;
@Mock
private PluginMetrics pluginMetrics;

Expand Down Expand Up @@ -90,6 +90,7 @@ public class KinesisSourceTest {

@BeforeEach
void setup() {
pipelineIdentifier = UUID.randomUUID().toString();
pluginMetrics = mock(PluginMetrics.class);
pluginFactory = mock(PluginFactory.class);
kinesisSourceConfig = mock(KinesisSourceConfig.class);
Expand All @@ -110,6 +111,7 @@ void setup() {

kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class);
kinesisLeaseConfig = mock(KinesisLeaseConfig.class);
when(kinesisLeaseConfig.getPipelineIdentifier()).thenReturn(pipelineIdentifier);
kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class);
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("table-name");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
extensions:
kinesis:
pipeline_identifier: "sample-kinesis-pipeline-0123"
lease_coordination:
table_name: "kinesis-pipeline-kcl"
region: "us-east-1"

0 comments on commit 8fd743d

Please sign in to comment.