Skip to content

Commit

Permalink
Add support to skip remote peer forwarding based on configuration (#5127
Browse files Browse the repository at this point in the history
)

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Moved exclude_identification_keys to under peer_forwarder config

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka authored Oct 30, 2024
1 parent 894fe9e commit 25a3224
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ private void buildPipelineFromConfiguration(
final List<Processor> processors = processorComponentList.stream().map(IdentifiedComponent::getComponent).collect(Collectors.toList());
if (!processors.isEmpty() && processors.get(0) instanceof RequiresPeerForwarding) {
return PeerForwardingProcessorDecorator.decorateProcessors(
processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName(), pipelineConfiguration.getWorkers()
processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName(),
dataPrepperConfiguration.getPeerForwarderConfiguration() != null ?
dataPrepperConfiguration.getPeerForwarderConfiguration().getExcludeIdentificationKeys() : null,
pipelineConfiguration.getWorkers()
);
}
return processors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Class to hold configuration for Core Peer Forwarder in {@link DataPrepperConfiguration},
Expand Down Expand Up @@ -65,6 +67,7 @@ public class PeerForwarderConfiguration {
private Integer forwardingBatchQueueDepth = 1;
private Duration forwardingBatchTimeout = DEFAULT_FORWARDING_BATCH_TIMEOUT;
private boolean binaryCodec = true;
private List<Set<String>> excludeIdentificationKeys;

public PeerForwarderConfiguration() {}

Expand All @@ -76,6 +79,7 @@ public PeerForwarderConfiguration (
@JsonProperty("server_thread_count") final Integer serverThreadCount,
@JsonProperty("max_connection_count") final Integer maxConnectionCount,
@JsonProperty("max_pending_requests") final Integer maxPendingRequests,
@JsonProperty("exclude_identification_keys") final List<Set<String>> excludeIdentificationKeys,
@JsonProperty("ssl") final Boolean ssl,
@JsonProperty("ssl_certificate_file") final String sslCertificateFile,
@JsonProperty("ssl_key_file") final String sslKeyFile,
Expand Down Expand Up @@ -139,6 +143,7 @@ public PeerForwarderConfiguration (
setBinaryCodec(binaryCodec == null || binaryCodec);
checkForCertAndKeyFileInS3();
validateSslAndAuthentication();
this.excludeIdentificationKeys = excludeIdentificationKeys;
}

public int getServerPort() {
Expand Down Expand Up @@ -169,6 +174,13 @@ public boolean isSsl() {
return ssl;
}

public Set<Set<String>> getExcludeIdentificationKeys() {
if (excludeIdentificationKeys == null) {
return null;
}
return excludeIdentificationKeys.stream().collect(Collectors.toSet());
}

public String getSslCertificateFile() {
return sslCertificateFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
public class PeerForwardingProcessorDecorator implements Processor<Record<Event>, Record<Event>> {
private final PeerForwarder peerForwarder;
private final Processor innerProcessor;
private final boolean peerForwardingDisabled;

public static List<Processor> decorateProcessors(
final List<Processor> processors,
final PeerForwarderProvider peerForwarderProvider,
final String pipelineName,
final String pluginId,
final Set<Set<String>> excludeIdentificationKeys,
final Integer pipelineWorkerThreads) {

Set<String> identificationKeys;
Expand Down Expand Up @@ -69,13 +71,27 @@ public static List<Processor> decorateProcessors(

final PeerForwarder peerForwarder = peerForwarderProvider.register(pipelineName, firstInnerProcessor, pluginId, identificationKeys, pipelineWorkerThreads);

return processors.stream().map(processor -> new PeerForwardingProcessorDecorator(peerForwarder, processor))
.collect(Collectors.toList());
return processors.stream().map(processor ->
new PeerForwardingProcessorDecorator(peerForwarder, processor, isPeerForwardingDisabled(processor, excludeIdentificationKeys))
).collect(Collectors.toList());
}

private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor) {
private static boolean isPeerForwardingDisabled(Processor processor, Set<Set<String>> excludeIdentificationKeysSet) {
if (processor instanceof RequiresPeerForwarding && excludeIdentificationKeysSet != null && excludeIdentificationKeysSet.size() > 0) {
Set<String> identificationKeys = new HashSet<String>(((RequiresPeerForwarding) processor).getIdentificationKeys());
return excludeIdentificationKeysSet.contains(identificationKeys);
}
return false;
}

private PeerForwardingProcessorDecorator(final PeerForwarder peerForwarder, final Processor innerProcessor, final boolean peerForwardingDisabled) {
this.peerForwarder = peerForwarder;
this.innerProcessor = innerProcessor;
this.peerForwardingDisabled = peerForwardingDisabled;
}

boolean isPeerForwardingDisabled() {
return peerForwardingDisabled;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Set;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -123,6 +124,14 @@ void testValidPeerForwarderConfig_with_InsecureTls() throws IOException {

assertThat(peerForwarderConfiguration.isSsl(), equalTo(true));
assertThat(peerForwarderConfiguration.isSslDisableVerification(), equalTo(true));
assertThat(peerForwarderConfiguration.getExcludeIdentificationKeys(), equalTo(null));
}

@Test
public void testExcludeIdentificationKeys() throws IOException {
final PeerForwarderConfiguration peerForwarderConfiguration =
makeConfig("src/test/resources/valid_peer_forwarder_config.yml");
assertThat(Set.of(Set.of("key1"), Set.of("key2", "key3")), equalTo(peerForwarderConfiguration.getExcludeIdentificationKeys()));
}

@Test
Expand All @@ -131,6 +140,7 @@ void testValidPeerForwarderConfig_with_FingerprintTls() throws IOException {

assertThat(peerForwarderConfiguration.isSsl(), equalTo(true));
assertThat(peerForwarderConfiguration.isSslFingerprintVerificationOnly(), equalTo(true));
assertThat(peerForwarderConfiguration.getExcludeIdentificationKeys(), equalTo(null));
}

@Test
Expand Down Expand Up @@ -191,4 +201,4 @@ void test_cert_paths_with_ssl() throws IOException {
void invalid_InvalidPeerForwarderConfig_test(final String filePath) {
assertThrows(ValueInstantiationException.class, () -> makeConfig(filePath));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ private PeerForwarderConfiguration createConfiguration(
200,
500,
1024,
null,
ssl,
sslCertificateFile,
sslKeyFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.Mockito.lenient;
Expand Down Expand Up @@ -72,7 +73,11 @@ record = mock(Record.class);
}

private List<Processor> createObjectUnderTestDecoratedProcessors(final List<Processor> processors) {
return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, PIPELINE_WORKER_THREADS);
return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, null, PIPELINE_WORKER_THREADS);
}

private List<Processor> createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(final List<Processor> processors, Set<Set<String>> excludeIdentificationKeys) {
return PeerForwardingProcessorDecorator.decorateProcessors(processors, peerForwarderProvider, pipelineName, pluginId, excludeIdentificationKeys, PIPELINE_WORKER_THREADS);
}

@Test
Expand Down Expand Up @@ -101,6 +106,29 @@ void decorateProcessors_with_different_identification_key_should_throw() {
assertThrows(RuntimeException.class, () -> createObjectUnderTestDecoratedProcessors(List.of(((Processor) requiresPeerForwarding), (Processor) requiresPeerForwardingCopy)));
}

@Test
void decorateProcessors_with_excludeIdentificationKeys() {
Set<String> identificationKeys = Set.of("key1", "key2");
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
final List<Processor> processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(Collections.singletonList((Processor) requiresPeerForwarding), Set.of(identificationKeys));
assertThat(processors.size(), equalTo(1));
for (final Processor processor: processors) {
assertTrue(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());
}
}

@Test
void decorateProcessors_with_notmatching_excludeIdentificationKeys() {
Set<String> identificationKeys = Set.of("key1", "key2");
Set<String> notMatchingIdentificationKeys = Set.of("key1", "key3");
when(requiresPeerForwarding.getIdentificationKeys()).thenReturn(identificationKeys);
final List<Processor> processors = createObjectUnderTestDecoratedProcessorsWithExcludeIdentificationKeys(Collections.singletonList((Processor) requiresPeerForwarding), Set.of(notMatchingIdentificationKeys));
assertThat(processors.size(), equalTo(1));
for (final Processor processor: processors) {
assertFalse(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());
}
}

@Test
void decorateProcessors_with_empty_processors_should_return_empty_list_of_processors() {
final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.emptyList());
Expand Down Expand Up @@ -170,6 +198,9 @@ void PeerForwardingProcessingDecorator_with_localProcessingOnly() {

final List<Processor> processors = createObjectUnderTestDecoratedProcessors(processorList);
assertThat(processors.size(), equalTo(2));
for (final Processor processor: processors) {
assertFalse(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());
}
verify(peerForwarderProvider, times(1)).register(pipelineName, processor, pluginId, identificationKeys, PIPELINE_WORKER_THREADS);
verifyNoMoreInteractions(peerForwarderProvider);
Collection<Record<Event>> result = processors.get(0).execute(testData);
Expand Down Expand Up @@ -219,6 +250,9 @@ void PeerForwardingProcessingDecorator_execute_should_receiveRecords() {

final List<Processor> processors = createObjectUnderTestDecoratedProcessors(Collections.singletonList((Processor) requiresPeerForwarding));
assertThat(processors.size(), equalTo(1));
for (final Processor processor: processors) {
assertFalse(((PeerForwardingProcessorDecorator)processor).isPeerForwardingDisabled());
}
final Collection<Record<Event>> records = processors.get(0).execute(forwardTestData);

verify(requiresPeerForwarding, times(2)).getIdentificationKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ client_timeout: 50
server_thread_count: 100
max_connection_count: 100
max_pending_requests: 512
exclude_identification_keys: [ ["key1"], ["key2", "key3"] ]
ssl: false
use_acm_certificate_for_ssl: false
discovery_mode: static
Expand All @@ -15,4 +16,4 @@ failed_forwarding_requests_local_write_timeout: 15
forwarding_batch_size: 2500
forwarding_batch_queue_depth: 3
forwarding_batch_timeout: 5s
binary_codec: false
binary_codec: false

0 comments on commit 25a3224

Please sign in to comment.