Skip to content

Commit

Permalink
address PR comments to use Duration
Browse files Browse the repository at this point in the history
Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Sep 12, 2023
1 parent 2c0adf1 commit 8219819
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaClusterAuthConfig;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
Expand All @@ -22,11 +23,11 @@ public class KafkaConnectConfig implements KafkaClusterAuthConfig {
@JsonProperty("worker_properties")
private WorkerProperties workerProperties = new WorkerProperties();

@JsonProperty("connect_timeout_ms")
private Long connectTimeoutMs = CONNECT_TIMEOUT_MS;
@JsonProperty("connect_start_timeout")
private Duration connectStartTimeout = Duration.ofMillis(CONNECT_TIMEOUT_MS);

@JsonProperty("connector_timeout_ms")
private Long connectorTimeoutMs = CONNECTOR_TIMEOUT_MS;
@JsonProperty("connector_start_timeout")
private Duration connectorStartTimeout = Duration.ofMillis(CONNECTOR_TIMEOUT_MS);

@JsonProperty("bootstrap_servers")
private List<String> bootstrapServers;
Expand All @@ -35,12 +36,12 @@ public class KafkaConnectConfig implements KafkaClusterAuthConfig {
private EncryptionConfig encryptionConfig;
private AwsConfig awsConfig;

public Long getConnectTimeoutMs() {
return connectTimeoutMs;
public Duration getConnectStartTimeout() {
return connectStartTimeout;
}

public Long getConnectorTimeoutMs() {
return connectorTimeoutMs;
public Duration getConnectorStartTimeout() {
return connectorStartTimeout;
}

public void setBootstrapServers(final List<String> bootstrapServers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.kafka.connect.runtime.WorkerConfig;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -48,16 +49,16 @@ public class WorkerProperties {
private String clientId = DEFAULT_CLIENT_ID;
@JsonProperty("offset_storage_partitions")
private Integer offsetStoragePartitions = OFFSET_STORAGE_PARTITIONS;
@JsonProperty("offset_flush_interval_ms")
private Long offsetFlushIntervalMs = OFFSET_FLUSH_INTERVAL_MS;
@JsonProperty("offset_flush_timeout_ms")
private Long offsetFlushTimeoutMs = OFFSET_FLUSH_TIMEOUT_MS;
@JsonProperty("offset_flush_interval")
private Duration offsetFlushInterval = Duration.ofMillis(OFFSET_FLUSH_INTERVAL_MS);
@JsonProperty("offset_flush_timeout")
private Duration offsetFlushTimeout = Duration.ofMillis(OFFSET_FLUSH_TIMEOUT_MS);
@JsonProperty("status_storage_partitions")
private Integer statusStoragePartitions = STATUS_STORAGE_PARTITIONS;
@JsonProperty("heartbeat_interval_ms")
private Long heartBeatIntervalMs = HEARTBEAT_INTERVAL_MS;
@JsonProperty("session_timeout_ms")
private Long sessionTimeoutMs = SESSION_TIMEOUT_MS;
@JsonProperty("heartbeat_interval")
private Duration heartBeatInterval = Duration.ofMillis(HEARTBEAT_INTERVAL_MS);
@JsonProperty("session_timeout")
private Duration sessionTimeout = Duration.ofMillis(SESSION_TIMEOUT_MS);
private String keyConverterSchemaRegistryUrl;
private String valueConverterSchemaRegistryUrl;
private String bootstrapServers;
Expand Down Expand Up @@ -91,24 +92,24 @@ public Integer getOffsetStoragePartitions() {
return offsetStoragePartitions;
}

public Long getOffsetFlushIntervalMs() {
return offsetFlushIntervalMs;
public Long getOffsetFlushInterval() {
return offsetFlushInterval.toMillis();
}

public Long getOffsetFlushTimeoutMs() {
return offsetFlushTimeoutMs;
public Long getOffsetFlushTimeout() {
return offsetFlushTimeout.toMillis();
}

public Integer getStatusStoragePartitions() {
return statusStoragePartitions;
}

public Long getHeartBeatIntervalMs() {
return heartBeatIntervalMs;
public Long getHeartBeatInterval() {
return heartBeatInterval.toMillis();
}

public Long getSessionTimeoutMs() {
return sessionTimeoutMs;
public Long getSessionTimeout() {
return sessionTimeout.toMillis();
}

public String getBootstrapServers() {
Expand Down Expand Up @@ -193,11 +194,11 @@ public Map<String, String> buildKafkaConnectPropertyMap() {
workerProps.put("value.converter.schema.registry.url", this.getValueConverterSchemaRegistryUrl());
}
workerProps.put("offset.storage.partitions", this.getOffsetStoragePartitions().toString());
workerProps.put("offset.flush.interval.ms", this.getOffsetFlushIntervalMs().toString());
workerProps.put("offset.flush.timeout.ms", this.getOffsetFlushTimeoutMs().toString());
workerProps.put("offset.flush.interval.ms", this.getOffsetFlushInterval().toString());
workerProps.put("offset.flush.timeout.ms", this.getOffsetFlushTimeout().toString());
workerProps.put("status.storage.partitions", this.getStatusStoragePartitions().toString());
workerProps.put("heartbeat.interval.ms", this.getHeartBeatIntervalMs().toString());
workerProps.put("session.timeout.ms", this.getSessionTimeoutMs().toString());
workerProps.put("heartbeat.interval.ms", this.getHeartBeatInterval().toString());
workerProps.put("session.timeout.ms", this.getSessionTimeout().toString());
return workerProps;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public KafkaConnectSource(final ConnectorConfig connectorConfig,
this.kafkaConnect = KafkaConnect.getPipelineInstance(
pipelineName,
pluginMetrics,
kafkaConnectConfig.getConnectTimeoutMs(),
kafkaConnectConfig.getConnectorTimeoutMs());
kafkaConnectConfig.getConnectStartTimeout(),
kafkaConnectConfig.getConnectorStartTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.net.URI;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -71,12 +72,12 @@ public class KafkaConnect {
private final long connectorTimeoutMs; // 30 seconds

private KafkaConnect(final PluginMetrics pluginMetrics,
final long connectTimeoutMs,
final long connectorTimeoutMs) {
final Duration connectTimeout,
final Duration connectorTimeout) {
this.connectorMap = new HashMap<>();
this.kafkaConnectMetrics = new KafkaConnectMetrics(pluginMetrics);
this.connectTimeoutMs = connectTimeoutMs;
this.connectorTimeoutMs = connectorTimeoutMs;
this.connectTimeoutMs = connectTimeout.toMillis();
this.connectorTimeoutMs = connectorTimeout.toMillis();
}

/**
Expand All @@ -97,12 +98,12 @@ public KafkaConnect(final DistributedHerder herder,

public static KafkaConnect getPipelineInstance(final String pipelineName,
final PluginMetrics pluginMetrics,
final long connectTimeoutMs,
final long connectorTimeoutMs) {
final Duration connectTimeout,
final Duration connectorTimeout) {
KafkaConnect instance = instanceMap.get(pipelineName);
if (instance == null) {
synchronized (KafkaConnect.class) {
instance = new KafkaConnect(pluginMetrics, connectTimeoutMs, connectorTimeoutMs);
instance = new KafkaConnect(pluginMetrics, connectTimeout, connectorTimeout);
instanceMap.put(pipelineName, instance);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public void test_config_setter_getter() throws IOException {
assertThat(testConfig.getAwsConfig(), is(awsConfig));
assertThat(testConfig.getEncryptionConfig(), is(encryptionConfig));
assertThat(testConfig.getBootStrapServers(), is(String.join(",", bootstrapServer)));
assertThat(testConfig.getConnectorTimeoutMs(), is(3000L));
assertThat(testConfig.getConnectTimeoutMs(), is(3000L));
assertThat(testConfig.getConnectorStartTimeout().getSeconds(), is(3L));
assertThat(testConfig.getConnectStartTimeout().getSeconds(), is(3L));
}

@Test
Expand Down Expand Up @@ -105,8 +105,8 @@ public void test_config_get_worker_properties() throws IOException {
public void test_config_default_worker_properties() throws IOException {
KafkaConnectConfig testConfig = makeConfig("src/test/resources/sample-data-prepper-config-with-default-kafka-connect-config-extension.yaml");
assertThat(testConfig, notNullValue());
assertThat(testConfig.getConnectTimeoutMs(), is(60000L));
assertThat(testConfig.getConnectorTimeoutMs(), is(30000L));
assertThat(testConfig.getConnectStartTimeout().getSeconds(), is(60L));
assertThat(testConfig.getConnectorStartTimeout().getSeconds(), is(30L));
assertThat(testConfig.getBootStrapServers(), nullValue());
WorkerProperties testWorkerProperties = testConfig.getWorkerProperties();
assertThat(testWorkerProperties, notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void testStartKafkaConnectSource() {
doNothing().when(kafkaConnect).start();
doNothing().when(kafkaConnect).stop();
// Set up the mock behavior for the static method getInstance()
mockedStatic.when(() -> KafkaConnect.getPipelineInstance(any(), any(), anyLong(), anyLong())).thenReturn(kafkaConnect);
mockedStatic.when(() -> KafkaConnect.getPipelineInstance(any(), any(), any(), any())).thenReturn(kafkaConnect);
kafkaConnectSource = createSourceUnderTest();
kafkaConnectSource.start(buffer);
verify(kafkaConnect).addConnectors(any());
Expand All @@ -125,7 +125,7 @@ void testStartKafkaConnectSourceError() {
mockedSecurityConfigurer.when(() -> KafkaSourceSecurityConfigurer.setAuthProperties(any(), any(), any())).thenAnswer((Answer<Void>) invocation -> null);
kafkaConnect = mock(KafkaConnect.class);
// Set up the mock behavior for the static method getInstance()
mockedStatic.when(() -> KafkaConnect.getPipelineInstance(any(), any(), anyLong(), anyLong())).thenReturn(kafkaConnect);
mockedStatic.when(() -> KafkaConnect.getPipelineInstance(any(), any(), any(), any())).thenReturn(kafkaConnect);
kafkaConnectSource = createSourceUnderTest();
assertThrows(IllegalArgumentException.class, () -> kafkaConnectSource.start(buffer));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.net.URI;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -64,6 +65,8 @@ public class KafkaConnectTest {
private static final WorkerProperties DEFAULT_WORDER_PROPERTY = new WorkerProperties();
private static final long TEST_CONNECTOR_TIMEOUT_MS = 30000L; // 30 seconds
private static final long TEST_CONNECT_TIMEOUT_MS = 60000L; // 60 seconds
private static final Duration TEST_CONNECTOR_TIMEOUT = Duration.ofMillis(TEST_CONNECTOR_TIMEOUT_MS);
private static final Duration TEST_CONNECT_TIMEOUT = Duration.ofMillis(TEST_CONNECT_TIMEOUT_MS);
@Mock
private KafkaConnectMetrics kafkaConnectMetrics;

Expand Down Expand Up @@ -114,11 +117,11 @@ void setUp() throws Exception {

@Test
void testInitializeKafkaConnectWithSingletonForSamePipeline() {
final KafkaConnect kafkaConnect = KafkaConnect.getPipelineInstance(TEST_PIPELINE_NAME, pluginMetrics, TEST_CONNECT_TIMEOUT_MS, TEST_CONNECTOR_TIMEOUT_MS);
final KafkaConnect sameConnect = KafkaConnect.getPipelineInstance(TEST_PIPELINE_NAME, pluginMetrics, TEST_CONNECT_TIMEOUT_MS, TEST_CONNECTOR_TIMEOUT_MS);
final KafkaConnect kafkaConnect = KafkaConnect.getPipelineInstance(TEST_PIPELINE_NAME, pluginMetrics, TEST_CONNECT_TIMEOUT, TEST_CONNECTOR_TIMEOUT);
final KafkaConnect sameConnect = KafkaConnect.getPipelineInstance(TEST_PIPELINE_NAME, pluginMetrics, TEST_CONNECT_TIMEOUT, TEST_CONNECTOR_TIMEOUT);
assertThat(sameConnect, is(kafkaConnect));
final String anotherPipeline = "anotherPipeline";
final KafkaConnect anotherKafkaConnect = KafkaConnect.getPipelineInstance(anotherPipeline, pluginMetrics, TEST_CONNECT_TIMEOUT_MS, TEST_CONNECTOR_TIMEOUT_MS);
final KafkaConnect anotherKafkaConnect = KafkaConnect.getPipelineInstance(anotherPipeline, pluginMetrics, TEST_CONNECT_TIMEOUT, TEST_CONNECTOR_TIMEOUT);
assertThat(anotherKafkaConnect, not(kafkaConnect));
}

Expand Down Expand Up @@ -150,7 +153,7 @@ void testInitializeKafkaConnect() {
doNothing().when(mock).configure(any());
})
) {
final KafkaConnect kafkaConnect = KafkaConnect.getPipelineInstance(TEST_PIPELINE_NAME, pluginMetrics, TEST_CONNECT_TIMEOUT_MS, TEST_CONNECTOR_TIMEOUT_MS);
final KafkaConnect kafkaConnect = KafkaConnect.getPipelineInstance(TEST_PIPELINE_NAME, pluginMetrics, TEST_CONNECT_TIMEOUT, TEST_CONNECTOR_TIMEOUT);
kafkaConnect.initialize(workerProps);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ extensions:
kafka_connect_config:
bootstrap_servers:
- test:123
connect_timeout_ms: 3000
connector_timeout_ms: 3000
connect_start_timeout: 3000ms
connector_start_timeout: 3s
worker_properties:
group_id: test-group
client_id: test-client
config_storage_topic: test-configs
offset_storage_topic: test-offsets
status_storage_topic: test-status
offset_storage_partitions: 2 #optional and default is 25
offset_flush_interval_ms: 6000 #optional and default is 60000 (60s)
offset_flush_timeout_ms: 500 #optional and default is 5000 (5s)
offset_flush_interval: 6s #optional and default is 60000 (60s)
offset_flush_timeout: 500ms #optional and default is 5000 (5s)
status_storage_partitions: 1 #optional and default is 5
heartbeat_interval_ms: 300 #optional and default is 3000 (3s)
session_timeout_ms: 3000 #optional and default is 30000 (30s)
heartbeat_interval: 300ms #optional and default is 3000 (3s)
session_timeout: 3s #optional and default is 30000 (30s)

0 comments on commit 8219819

Please sign in to comment.