Skip to content

Commit

Permalink
expose connector_rebalance_max_delay config to customers. The default…
Browse files Browse the repository at this point in the history
… delay is 5 mins. Also increase the default connector_start_timeout to 6 mins.

Signed-off-by: Haidong <[email protected]>
  • Loading branch information
Haidong committed Sep 15, 2023
1 parent f5b66c9 commit 71ed291
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Properties;

public class KafkaConnectConfig implements KafkaClusterAuthConfig {
private static final long CONNECTOR_TIMEOUT_MS = 30000L; // 30 seconds
private static final long CONNECTOR_TIMEOUT_MS = 360000L; // 360 seconds
private static final long CONNECT_TIMEOUT_MS = 60000L; // 60 seconds

@JsonProperty("worker_properties")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class WorkerProperties {
private static final Integer STATUS_STORAGE_PARTITIONS = 5;
private static final Long HEARTBEAT_INTERVAL_MS = 3000L;
private static final Long SESSION_TIMEOUT_MS = 30000L;
private static final long CONNECTOR_REBALANCE_DELAY_MS = 300000L; // 300 seconds
private static final String DEFAULT_GROUP_ID = "localGroup";
private static final String DEFAULT_CLIENT_ID = "localClient";
private static final String DEFAULT_CONFIG_STORAGE_TOPIC = "config-storage-topic";
Expand Down Expand Up @@ -59,6 +60,8 @@ public class WorkerProperties {
private Duration heartBeatInterval = Duration.ofMillis(HEARTBEAT_INTERVAL_MS);
@JsonProperty("session_timeout")
private Duration sessionTimeout = Duration.ofMillis(SESSION_TIMEOUT_MS);
@JsonProperty("connector_rebalance_max_delay")
private Duration connectorRebalanceDelay = Duration.ofMillis(CONNECTOR_REBALANCE_DELAY_MS);
private String keyConverterSchemaRegistryUrl;
private String valueConverterSchemaRegistryUrl;
private String bootstrapServers;
Expand Down Expand Up @@ -100,6 +103,10 @@ public Long getOffsetFlushTimeout() {
return offsetFlushTimeout.toMillis();
}

public Long getRebalanceMaxDelay() {
return connectorRebalanceDelay.toMillis();
}

public Integer getStatusStoragePartitions() {
return statusStoragePartitions;
}
Expand Down Expand Up @@ -199,6 +206,7 @@ public Map<String, String> buildKafkaConnectPropertyMap() {
workerProps.put("status.storage.partitions", this.getStatusStoragePartitions().toString());
workerProps.put("heartbeat.interval.ms", this.getHeartBeatInterval().toString());
workerProps.put("session.timeout.ms", this.getSessionTimeout().toString());
workerProps.put("scheduled.rebalance.max.delay.ms", this.getRebalanceMaxDelay().toString());
return workerProps;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public void test_config_get_worker_properties() throws IOException {
assertThat(workerProperties.get("status.storage.partitions"), is("1"));
assertThat(workerProperties.get("heartbeat.interval.ms"), is("300"));
assertThat(workerProperties.get("session.timeout.ms"), is("3000"));
assertThat(workerProperties.get("scheduled.rebalance.max.delay.ms"), is("60000"));
assertThat(workerProperties.get("testClass"), is(this.getClass().getName()));
assertThat(workerProperties.get("producer.testClass"), is(this.getClass().getName()));
assertThat(workerProperties.get("testKey"), is(authProperties.getProperty("testKey")));
Expand All @@ -106,7 +107,7 @@ public void test_config_default_worker_properties() throws IOException {
KafkaConnectConfig testConfig = makeConfig("src/test/resources/sample-data-prepper-config-with-default-kafka-connect-config-extension.yaml");
assertThat(testConfig, notNullValue());
assertThat(testConfig.getConnectStartTimeout().getSeconds(), is(60L));
assertThat(testConfig.getConnectorStartTimeout().getSeconds(), is(30L));
assertThat(testConfig.getConnectorStartTimeout().getSeconds(), is(360L));
assertThat(testConfig.getBootStrapServers(), nullValue());
WorkerProperties testWorkerProperties = testConfig.getWorkerProperties();
assertThat(testWorkerProperties, notNullValue());
Expand All @@ -118,5 +119,6 @@ public void test_config_default_worker_properties() throws IOException {
assertThat(workerProperties.get("status.storage.partitions"), is("5"));
assertThat(workerProperties.get("heartbeat.interval.ms"), is("3000"));
assertThat(workerProperties.get("session.timeout.ms"), is("30000"));
assertThat(workerProperties.get("scheduled.rebalance.max.delay.ms"), is("300000"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
public class KafkaConnectTest {
private static final String TEST_PIPELINE_NAME = "test";
private static final WorkerProperties DEFAULT_WORDER_PROPERTY = new WorkerProperties();
private static final long TEST_CONNECTOR_TIMEOUT_MS = 30000L; // 30 seconds
private static final long TEST_CONNECTOR_TIMEOUT_MS = 360000L; // 360 seconds
private static final long TEST_CONNECT_TIMEOUT_MS = 60000L; // 60 seconds
private static final Duration TEST_CONNECTOR_TIMEOUT = Duration.ofMillis(TEST_CONNECTOR_TIMEOUT_MS);
private static final Duration TEST_CONNECT_TIMEOUT = Duration.ofMillis(TEST_CONNECT_TIMEOUT_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ extensions:
offset_flush_timeout: 500ms #optional and default is 5000 (5s)
status_storage_partitions: 1 #optional and default is 5
heartbeat_interval: 300ms #optional and default is 3000 (3s)
session_timeout: 3s #optional and default is 30000 (30s)
session_timeout: 3s #optional and default is 30000 (30s)
connector_rebalance_max_delay: 60s

0 comments on commit 71ed291

Please sign in to comment.