Skip to content

Commit

Permalink
Moved exclude_identification_keys to under peer_forwarder config
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Oct 29, 2024
1 parent 286e822 commit 28e4ef7
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,9 @@ private void buildPipelineFromConfiguration(
if (!processors.isEmpty() && processors.get(0) instanceof RequiresPeerForwarding) {
return PeerForwardingProcessorDecorator.decorateProcessors(
processors, peerForwarderProvider, pipelineName, processorComponentList.get(0).getName(),
dataPrepperConfiguration.getExcludeIdentificationKeys(),
pipelineConfiguration.getWorkers()
dataPrepperConfiguration.getPeerForwarderConfiguration() != null ?
dataPrepperConfiguration.getPeerForwarderConfiguration().getExcludeIdentificationKeys() : null,
pipelineConfiguration.getWorkers()
);
}
return processors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@

import java.time.Duration;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Objects;

/**
Expand Down Expand Up @@ -58,7 +56,6 @@ public class DataPrepperConfiguration implements ExtensionsConfiguration, EventC
private Duration processorShutdownTimeout;
private Duration sinkShutdownTimeout;
private PipelineExtensions pipelineExtensions;
private List<Set<String>> excludeIdentificationKeys;

public static final DataPrepperConfiguration DEFAULT_CONFIG = new DataPrepperConfiguration();

Expand All @@ -76,8 +73,6 @@ public DataPrepperConfiguration(
@JsonProperty("private_key_password")
@JsonAlias("privateKeyPassword")
final String privateKeyPassword,
@JsonProperty("exclude_identification_keys")
final List<Set<String>> excludeIdentificationKeys,
@JsonProperty("server_port")
@JsonAlias("serverPort")
final String serverPort,
Expand Down Expand Up @@ -132,7 +127,6 @@ public DataPrepperConfiguration(
throw new IllegalArgumentException("sinkShutdownTimeout must be non-negative.");
}
this.pipelineExtensions = pipelineExtensions;
this.excludeIdentificationKeys = excludeIdentificationKeys;
}

public int getServerPort() {
Expand All @@ -143,13 +137,6 @@ public boolean ssl() {
return ssl;
}

public Set<Set<String>> getExcludeIdentificationKeys() {
if (excludeIdentificationKeys == null) {
return null;
}
return excludeIdentificationKeys.stream().collect(Collectors.toSet());
}

public String getKeyStoreFilePath() {
return keyStoreFilePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Class to hold configuration for Core Peer Forwarder in {@link DataPrepperConfiguration},
Expand Down Expand Up @@ -65,6 +67,7 @@ public class PeerForwarderConfiguration {
private Integer forwardingBatchQueueDepth = 1;
private Duration forwardingBatchTimeout = DEFAULT_FORWARDING_BATCH_TIMEOUT;
private boolean binaryCodec = true;
private List<Set<String>> excludeIdentificationKeys;

public PeerForwarderConfiguration() {}

Expand All @@ -76,6 +79,7 @@ public PeerForwarderConfiguration (
@JsonProperty("server_thread_count") final Integer serverThreadCount,
@JsonProperty("max_connection_count") final Integer maxConnectionCount,
@JsonProperty("max_pending_requests") final Integer maxPendingRequests,
@JsonProperty("exclude_identification_keys") final List<Set<String>> excludeIdentificationKeys,
@JsonProperty("ssl") final Boolean ssl,
@JsonProperty("ssl_certificate_file") final String sslCertificateFile,
@JsonProperty("ssl_key_file") final String sslKeyFile,
Expand Down Expand Up @@ -139,6 +143,7 @@ public PeerForwarderConfiguration (
setBinaryCodec(binaryCodec == null || binaryCodec);
checkForCertAndKeyFileInS3();
validateSslAndAuthentication();
this.excludeIdentificationKeys = excludeIdentificationKeys;
}

public int getServerPort() {
Expand Down Expand Up @@ -169,6 +174,13 @@ public boolean isSsl() {
return ssl;
}

public Set<Set<String>> getExcludeIdentificationKeys() {
if (excludeIdentificationKeys == null) {
return null;
}
return excludeIdentificationKeys.stream().collect(Collectors.toSet());
}

public String getSslCertificateFile() {
return sslCertificateFile;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Set;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand Down Expand Up @@ -60,15 +59,13 @@ public void testSomeDefaultConfig() throws IOException {
makeConfig(TestDataProvider.VALID_DATA_PREPPER_SOME_DEFAULT_CONFIG_FILE);
Assert.assertEquals(DataPrepperConfiguration.DEFAULT_CONFIG.getServerPort(), dataPrepperConfiguration.getServerPort());
Assert.assertNull(dataPrepperConfiguration.getPipelineExtensions());
Assert.assertNull(dataPrepperConfiguration.getExcludeIdentificationKeys());
}

@Test
public void testDefaultMetricsRegistry() {
final DataPrepperConfiguration dataPrepperConfiguration = DataPrepperConfiguration.DEFAULT_CONFIG;
assertThat(dataPrepperConfiguration.getMetricRegistryTypes().size(), Matchers.equalTo(1));
assertThat(dataPrepperConfiguration.getMetricRegistryTypes(), Matchers.hasItem(MetricRegistryType.Prometheus));
Assert.assertNull(dataPrepperConfiguration.getExcludeIdentificationKeys());
}

@Test
Expand All @@ -88,13 +85,6 @@ public void testMultipleMetricsRegistry() throws IOException {
assertThat(dataPrepperConfiguration.getMetricRegistryTypes(), Matchers.hasItem(MetricRegistryType.CloudWatch));
}

@Test
public void testExcludeIdentificationKeys() throws IOException {
final DataPrepperConfiguration dataPrepperConfiguration =
makeConfig(TestDataProvider.VALID_DATA_PREPPER_CONFIG_FILE);
Assert.assertEquals(Set.of(Set.of("key1"), Set.of("key2", "key3")), dataPrepperConfiguration.getExcludeIdentificationKeys());
}

@Test
void testConfigurationWithHttpBasic() throws IOException {
final DataPrepperConfiguration dataPrepperConfiguration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Set;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -123,6 +124,14 @@ void testValidPeerForwarderConfig_with_InsecureTls() throws IOException {

assertThat(peerForwarderConfiguration.isSsl(), equalTo(true));
assertThat(peerForwarderConfiguration.isSslDisableVerification(), equalTo(true));
assertThat(peerForwarderConfiguration.getExcludeIdentificationKeys(), equalTo(null));
}

@Test
public void testExcludeIdentificationKeys() throws IOException {
final PeerForwarderConfiguration peerForwarderConfiguration =
makeConfig("src/test/resources/valid_peer_forwarder_config.yml");
assertThat(Set.of(Set.of("key1"), Set.of("key2", "key3")), equalTo(peerForwarderConfiguration.getExcludeIdentificationKeys()));
}

@Test
Expand All @@ -131,6 +140,7 @@ void testValidPeerForwarderConfig_with_FingerprintTls() throws IOException {

assertThat(peerForwarderConfiguration.isSsl(), equalTo(true));
assertThat(peerForwarderConfiguration.isSslFingerprintVerificationOnly(), equalTo(true));
assertThat(peerForwarderConfiguration.getExcludeIdentificationKeys(), equalTo(null));
}

@Test
Expand Down Expand Up @@ -191,4 +201,4 @@ void test_cert_paths_with_ssl() throws IOException {
void invalid_InvalidPeerForwarderConfig_test(final String filePath) {
assertThrows(ValueInstantiationException.class, () -> makeConfig(filePath));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ private PeerForwarderConfiguration createConfiguration(
200,
500,
1024,
null,
ssl,
sslCertificateFile,
sslKeyFile,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
server_port: 5678
exclude_identification_keys: [ ["key1"], ["key2", "key3"]]
ssl: false
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ client_timeout: 50
server_thread_count: 100
max_connection_count: 100
max_pending_requests: 512
exclude_identification_keys: [ ["key1"], ["key2", "key3"] ]
ssl: false
use_acm_certificate_for_ssl: false
discovery_mode: static
Expand All @@ -15,4 +16,4 @@ failed_forwarding_requests_local_write_timeout: 15
forwarding_batch_size: 2500
forwarding_batch_queue_depth: 3
forwarding_batch_timeout: 5s
binary_codec: false
binary_codec: false

0 comments on commit 28e4ef7

Please sign in to comment.