diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java index 374b9629c9..77cf649535 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/parser/PipelineTransformer.java @@ -203,7 +203,10 @@ private void buildPipelineFromConfiguration( final List processors = processorComponentList.stream().map(IdentifiedComponent::getComponent).collect(Collectors.toList()); if (!processors.isEmpty() && processors.get(0) instanceof RequiresPeerForwarding) { return PeerForwardingProcessorDecorator.decorateProcessors( - processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName(), pipelineConfiguration.getWorkers() + processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName(), + dataPrepperConfiguration.getPeerForwarderConfiguration() != null ? + dataPrepperConfiguration.getPeerForwarderConfiguration().getExcludeIdentificationKeys() : null, + pipelineConfiguration.getWorkers() ); } return processors; diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfiguration.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfiguration.java index 27a0e3c83a..068847ebe1 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfiguration.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfiguration.java @@ -14,8 +14,10 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; +import java.util.stream.Collectors; import java.util.List; import java.util.Map; +import java.util.Set; /** * Class to hold configuration for Core Peer Forwarder in {@link DataPrepperConfiguration}, @@ -65,6 +67,7 @@ public class PeerForwarderConfiguration { private Integer forwardingBatchQueueDepth = 1; private Duration forwardingBatchTimeout = DEFAULT_FORWARDING_BATCH_TIMEOUT; private boolean binaryCodec = true; + private List> excludeIdentificationKeys; public PeerForwarderConfiguration() {} @@ -76,6 +79,7 @@ public PeerForwarderConfiguration ( @JsonProperty("server_thread_count") final Integer serverThreadCount, @JsonProperty("max_connection_count") final Integer maxConnectionCount, @JsonProperty("max_pending_requests") final Integer maxPendingRequests, + @JsonProperty("exclude_identification_keys") final List> excludeIdentificationKeys, @JsonProperty("ssl") final Boolean ssl, @JsonProperty("ssl_certificate_file") final String sslCertificateFile, @JsonProperty("ssl_key_file") final String sslKeyFile, @@ -139,6 +143,7 @@ public PeerForwarderConfiguration ( setBinaryCodec(binaryCodec == null || binaryCodec); checkForCertAndKeyFileInS3(); validateSslAndAuthentication(); + this.excludeIdentificationKeys = excludeIdentificationKeys; } public int getServerPort() { @@ -169,6 +174,13 @@ public boolean isSsl() { return ssl; } + public Set> getExcludeIdentificationKeys() { + if (excludeIdentificationKeys == null) { + return null; + } + return excludeIdentificationKeys.stream().collect(Collectors.toSet()); + } + public String getSslCertificateFile() { return sslCertificateFile; } diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java index e81c2a9730..a49aed7b03 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessorDecorator.java @@ -24,12 +24,14 @@ public class PeerForwardingProcessorDecorator implements Processor, Record> { private final PeerForwarder peerForwarder; private final Processor innerProcessor; + private final boolean peerForwardingDisabled; public static List decorateProcessors( final List processors, final PeerForwarderProvider peerForwarderProvider, final String pipelineName, final String pluginId, + final Set> excludeIdentificationKeys, final Integer pipelineWorkerThreads) { Set identificationKeys; @@ -69,13 +71,27 @@ public static List decorateProcessors( final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, firstInnerProcessor, pluginId, identificationKeys, pipelineWorkerThreads); - return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor)) - .collect(Collectors.toList()); + return processors.stream().map(processor -> + new PeerForwardingProcessorDecorator(peerForwarder, processor, isPeerForwardingDisabled(processor, excludeIdentificationKeys)) + ).collect(Collectors.toList()); } - private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) { + private static boolean isPeerForwardingDisabled(Processor processor, Set> excludeIdentificationKeysSet) { + if (processor instanceof RequiresPeerForwarding && excludeIdentificationKeysSet != null && excludeIdentificationKeysSet.size() > 0) { + Set identificationKeys = new HashSet(((RequiresPeerForwarding) processor).getIdentificationKeys()); + return excludeIdentificationKeysSet.contains(identificationKeys); + } + return false; + } + + private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor, final boolean peerForwardingDisabled) { this.peerForwarder = peerForwarder; this.innerProcessor = innerProcessor; + this.peerForwardingDisabled = peerForwardingDisabled; + } + + boolean isPeerForwardingDisabled() { + return peerForwardingDisabled; } @Override diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfigurationTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfigurationTest.java index 1d469a0975..c6ee2d28f5 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfigurationTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarderConfigurationTest.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Set; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; @@ -123,6 +124,14 @@ void testValidPeerForwarderConfig_with_InsecureTls() throws IOException { assertThat(peerForwarderConfiguration.isSsl(), equalTo(true)); assertThat(peerForwarderConfiguration.isSslDisableVerification(), equalTo(true)); + assertThat(peerForwarderConfiguration.getExcludeIdentificationKeys(), equalTo(null)); + } + + @Test + public void testExcludeIdentificationKeys() throws IOException { + final PeerForwarderConfiguration peerForwarderConfiguration = + makeConfig("src/test/resources/valid_peer_forwarder_config.yml"); + assertThat(Set.of(Set.of("key1"), Set.of("key2", "key3")), equalTo(peerForwarderConfiguration.getExcludeIdentificationKeys())); } @Test @@ -131,6 +140,7 @@ void testValidPeerForwarderConfig_with_FingerprintTls() throws IOException { assertThat(peerForwarderConfiguration.isSsl(), equalTo(true)); assertThat(peerForwarderConfiguration.isSslFingerprintVerificationOnly(), equalTo(true)); + assertThat(peerForwarderConfiguration.getExcludeIdentificationKeys(), equalTo(null)); } @Test @@ -191,4 +201,4 @@ void test_cert_paths_with_ssl() throws IOException { void invalid_InvalidPeerForwarderConfig_test(final String filePath) { assertThrows(ValueInstantiationException.class, () -> makeConfig(filePath)); } -} \ No newline at end of file +} diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarder_ClientServerIT.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarder_ClientServerIT.java index 929b98f0dc..b9dd594926 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarder_ClientServerIT.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwarder_ClientServerIT.java @@ -519,6 +519,7 @@ private PeerForwarderConfiguration createConfiguration( 200, 500, 1024, + null, ssl, sslCertificateFile, sslKeyFile, diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java index d33563bac9..8ab9dd04f7 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/peerforwarder/PeerForwardingProcessingDecoratorTest.java @@ -32,6 +32,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.Mockito.lenient; @@ -72,7 +73,11 @@ record = mock(Record.class); } private List createObjectUnderTestDecoratedProcessors(final List processors) { - return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, PIPELINE_WORKER_THREADS); + return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, null, PIPELINE_WORKER_THREADS); + } + + private List createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(final List processors, Set> excludeIdentificationKeys) { + return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, excludeIdentificationKeys, PIPELINE_WORKER_THREADS); } @Test @@ -101,6 +106,29 @@ void decorateProcessors_with_different_identification_key_should_throw() { assertThrows(RuntimeException.class, () -> createObjectUnderTestDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy))); } + @Test + void decorateProcessors_with_excludeIdentificationKeys() { + Set identificationKeys = Set.of("key1", "key2"); + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys); + final List processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(Collections.singletonList((Processor) requiresPeerForwarding), Set.of(identificationKeys)); + assertThat(processors.size(), equalTo(1)); + for (final Processor processor: processors) { + assertTrue(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled()); + } + } + + @Test + void decorateProcessors_with_notmatching_excludeIdentificationKeys() { + Set identificationKeys = Set.of("key1", "key2"); + Set notMatchingIdentificationKeys = Set.of("key1", "key3"); + when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys); + final List processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(Collections.singletonList((Processor) requiresPeerForwarding), Set.of(notMatchingIdentificationKeys)); + assertThat(processors.size(), equalTo(1)); + for (final Processor processor: processors) { + assertFalse(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled()); + } + } + @Test void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() { final List processors = createObjectUnderTestDecoratedProcessors(Collections.emptyList()); @@ -170,6 +198,9 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() { final List processors = createObjectUnderTestDecoratedProcessors(processorList); assertThat(processors.size(), equalTo(2)); + for (final Processor processor: processors) { + assertFalse(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled()); + } verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS); verifyNoMoreInteractions(peerForwarderProvider); Collection> result = processors.get(0).execute(testData); @@ -219,6 +250,9 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() { final List processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding)); assertThat(processors.size(), equalTo(1)); + for (final Processor processor: processors) { + assertFalse(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled()); + } final Collection> records = processors.get(0).execute(forwardTestData); verify(requiresPeerForwarding, times(2)).getIdentificationKeys(); diff --git a/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml b/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml index 2457bacfda..3e700dbc6c 100644 --- a/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml +++ b/data-prepper-core/src/test/resources/valid_peer_forwarder_config.yml @@ -4,6 +4,7 @@ client_timeout: 50 server_thread_count: 100 max_connection_count: 100 max_pending_requests: 512 +exclude_identification_keys: [ ["key1"], ["key2", "key3"] ] ssl: false use_acm_certificate_for_ssl: false discovery_mode: static @@ -15,4 +16,4 @@ failed_forwarding_requests_local_write_timeout: 15 forwarding_batch_size: 2500 forwarding_batch_queue_depth: 3 forwarding_batch_timeout: 5s -binary_codec: false \ No newline at end of file +binary_codec: false