From 276490488f53f9ebeb217cd77375781a6ace26d4 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 25 Nov 2024 17:10:51 +0100 Subject: [PATCH 01/15] Add support for starting Kafka Connect Signed-off-by: Mickael Maison --- pom.xml | 4 +- .../test/container/StrimziConnectCluster.java | 238 ++++++++++++++++++ .../container/StrimziConnectContainer.java | 70 ++++++ .../test/container/StrimziKafkaCluster.java | 25 +- .../test/container/StrimziKafkaContainer.java | 9 + .../container/StrimziConnectClusterIT.java | 232 +++++++++++++++++ .../container/StrimziConnectClusterTest.java | 119 +++++++++ .../container/StrimziKafkaContainerTest.java | 10 +- 8 files changed, 693 insertions(+), 14 deletions(-) create mode 100644 src/main/java/io/strimzi/test/container/StrimziConnectCluster.java create mode 100644 src/main/java/io/strimzi/test/container/StrimziConnectContainer.java create mode 100644 src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java create mode 100644 src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java diff --git a/pom.xml b/pom.xml index 6069350..66301af 100644 --- a/pom.xml +++ b/pom.xml @@ -454,8 +454,8 @@ org.slf4j org.apache.commons.logging - 91 - 71 + 88 + 74 true diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java new file mode 100644 index 0000000..3a3578d --- /dev/null +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -0,0 +1,238 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import com.groupcdg.pitest.annotations.DoNotMutate; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * A Kafka Connect cluster using the latest image from quay.io/strimzi/kafka with the given version. + * Kafka Connect is started in distributed mode. Users must use the exposed REST API to start, stop and manage connectors. + */ +public class StrimziConnectCluster { + + private static final String NETWORK_ALIAS_PREFIX = "connect-"; + private static final int CONNECT_PORT = 8083; + private static final int INTER_WORKER_PORT = 8084; + + private final StrimziKafkaCluster kafkaCluster; + private final Map additionalConnectConfiguration; + private final String kafkaVersion; + private final boolean includeFileConnectors; + private final String groupId; + private final List workers; + + public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { + kafkaCluster = builder.kafkaCluster; + additionalConnectConfiguration = builder.additionalConnectConfiguration; + kafkaVersion = builder.kafkaVersion == null + ? KafkaVersionService.getInstance().latestRelease().getVersion() + : builder.kafkaVersion; + includeFileConnectors = builder.includeFileConnectors; + groupId = builder.groupId; + + String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion); + + workers = new ArrayList<>(); + for (int i = 0; i < builder.workersNum; i++) { + String host = NETWORK_ALIAS_PREFIX + i; + Properties configs = buildConfigs(host); + StrimziConnectContainer worker = new StrimziConnectContainer(imageName, kafkaCluster, configs) + .withNetwork(kafkaCluster.getNetwork()) + .withNetworkAliases(host) + .withExposedPorts(CONNECT_PORT) + .withEnv("LOG_DIR", "/tmp") + .waitForRunning() + .waitingFor(Wait.forHttp("/").forStatusCode(200)); + workers.add(worker); + } + } + + private Properties buildConfigs(String host) { + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", kafkaCluster.getNetworkBootstrapServers()); + properties.setProperty("group.id", groupId); + properties.setProperty("key.converter", "org.apache.kafka.connect.storage.StringConverter"); + properties.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter"); + properties.setProperty("offset.storage.topic", "connect-offsets"); + properties.setProperty("offset.storage.replication.factor", "-1"); + properties.setProperty("config.storage.topic", "connect-configs"); + properties.setProperty("config.storage.replication.factor", "-1"); + properties.setProperty("status.storage.topic", "connect-status"); + properties.setProperty("status.storage.replication.factor", "-1"); + properties.setProperty("listeners", "http://:" + CONNECT_PORT + ",http://" + host + ":" + INTER_WORKER_PORT); + properties.putAll(additionalConnectConfiguration); + if (includeFileConnectors) { + String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar"; + if (properties.containsKey("plugin.path")) { + String pluginPath = properties.getProperty("plugin.path"); + properties.setProperty("plugin.path", pluginPath + "," + connectFileJar); + } else { + properties.setProperty("plugin.path", connectFileJar); + } + } + return properties; + } + + /** + * Get the workers of this Kafka Connect cluster. + * + * @return collection of GenericContainer containers + */ + @DoNotMutate + public Collection> getWorkers() { + return new ArrayList<>(workers); + } + + /** + * Start the Kafka Connect cluster. + * This starts all the workers and waits for them to all be healthy and ready to be used. + */ + @DoNotMutate + public void start() { + for (StrimziConnectContainer worker : workers) { + worker.start(); + } + } + + /** + * Stop the Kafka Connect cluster. + */ + @DoNotMutate + public void stop() { + workers.forEach(StrimziConnectContainer::stop); + } + + /** + * Return the REST API endpoint of one of the available workers. + * + * @return the REST API endpoint + */ + @DoNotMutate + public String getRestEndpoint() { + for (StrimziConnectContainer worker : workers) { + if (worker.isRunning()) { + return "http://" + worker.getHost() + ":" + worker.getMappedPort(CONNECT_PORT); + } + } + throw new IllegalStateException("No workers are running and healthy"); + } + + /** + * Builder class for {@code StrimziConnectCluster}. + *

+ * Use this builder to create instances of {@code StrimziConnectCluster}. + * You must at least call {@link #withKafkaCluster(StrimziKafkaCluster)}, and + * {@link #withGroupId(String)} before calling {@link #build()}. + *

+ */ + public static class StrimziConnectClusterBuilder { + + private Map additionalConnectConfiguration = new HashMap<>(); + private boolean includeFileConnectors = true; + private int workersNum = 1; + private String kafkaVersion; + private StrimziKafkaCluster kafkaCluster; + private String groupId; + + /** + * Set the Kafka cluster the Kafka Connect cluster will use to. + * + * @param kafkaCluster the {@link StrimziKafkaCluster} instance + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withKafkaCluster(StrimziKafkaCluster kafkaCluster) { + this.kafkaCluster = kafkaCluster; + return this; + } + + /** + * Set the number of Kafka Connect workers in the cluster. + * If not called, the cluster has a single worker. + * + * @param workersNum the number of Kafka Connect workers + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withNumberOfWorkers(int workersNum) { + this.workersNum = workersNum; + return this; + } + + /** + * Add additional Kafka Connect configuration parameters. + * These configurations are applied to all workers in the cluster. + * + * @param additionalConnectConfiguration a map of additional Kafka Connect configuration options + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withAdditionalConnectConfiguration(Map additionalConnectConfiguration) { + this.additionalConnectConfiguration = additionalConnectConfiguration; + return this; + } + + /** + * Specify the Kafka version to be used for the Connect workers in the cluster. + * If not called, the latest Kafka version available from {@link KafkaVersionService} will be used. + * + * @param kafkaVersion the desired Kafka version for the Connect cluster + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withKafkaVersion(String kafkaVersion) { + this.kafkaVersion = kafkaVersion; + return this; + } + + /** + * Disable the FileStreams connectors. + * If not called, the FileSteams connectors are added to plugin.path. + * + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withoutFileConnectors() { + this.includeFileConnectors = false; + return this; + } + + /** + * Specify the group.id of the Connect cluster. + * + * @param groupId the group id + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining + */ + public StrimziConnectClusterBuilder withGroupId(String groupId) { + this.groupId = groupId; + return this; + } + + /** + * Build and return a {@code StrimziConnectCluster} instance based on the provided configurations. + * + * @return a new instance of {@code StrimziConnectCluster} + */ + public StrimziConnectCluster build() { + if (kafkaCluster == null) { + throw new IllegalArgumentException("A Kafka cluster must be specified"); + } + if (groupId == null) { + throw new IllegalArgumentException("The Connect cluster group.id configuration must be specified"); + } + if (workersNum <= 0) { + throw new IllegalArgumentException("The number of workers in the Connect cluster must be greater than 0"); + } + if (additionalConnectConfiguration == null) { + throw new IllegalArgumentException("The additional configuration must be specified"); + } + return new StrimziConnectCluster(this); + } + } +} diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java new file mode 100644 index 0000000..7802c07 --- /dev/null +++ b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java @@ -0,0 +1,70 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import com.groupcdg.pitest.annotations.DoNotMutate; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.images.builder.Transferable; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +class StrimziConnectContainer extends GenericContainer { + + private static final String STARTER_SCRIPT = "/start_connect.sh"; + private static final String CONFIG_FILE = "/opt/kafka/config/connect.properties"; + + private final StrimziKafkaCluster kafkaCluster; + private final Properties configs; + + public StrimziConnectContainer(String imageName, StrimziKafkaCluster kafkaCluster, Properties configs) { + super(imageName); + this.kafkaCluster = kafkaCluster; + this.configs = configs; + } + + @Override + @DoNotMutate + protected void doStart() { + super.setNetwork(kafkaCluster.getNetwork()); + super.setCommand("sh", "-c", "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); + super.doStart(); + } + + @Override + @DoNotMutate + protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) { + super.containerIsStarting(containerInfo, reused); + + // Write configs to a file in the container + StringWriter writer = new StringWriter(); + try { + configs.store(writer, null); + } catch (IOException e) { + throw new UncheckedIOException("Failed to build configuration file", e); + } + copyFileToContainer( + Transferable.of(writer.toString().getBytes(StandardCharsets.UTF_8)), + CONFIG_FILE); + + // Write starter script to a file in the container + String command = "/opt/kafka/bin/connect-distributed.sh " + CONFIG_FILE; + copyFileToContainer( + Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), + STARTER_SCRIPT + ); + } + + @DoNotMutate + public StrimziConnectContainer waitForRunning() { + super.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1)); + return this; + } +} diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java index 4a73bc4..d52f36e 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaCluster.java @@ -162,7 +162,7 @@ public static class StrimziKafkaClusterBuilder { * Sets the number of Kafka brokers in the cluster. * * @param brokersNum the number of Kafka brokers - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) { this.brokersNum = brokersNum; @@ -174,7 +174,7 @@ public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) { * If not provided, it defaults to the number of brokers. * * @param internalTopicReplicationFactor the replication factor for internal topics - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) { this.internalTopicReplicationFactor = internalTopicReplicationFactor; @@ -186,7 +186,7 @@ public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int interna * These configurations are applied to all brokers in the cluster. * * @param additionalKafkaConfiguration a map of additional Kafka configuration options - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map additionalKafkaConfiguration) { if (additionalKafkaConfiguration != null) { @@ -199,7 +199,7 @@ public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map * - * @return the current instance of {@code StrimziKafkaClusterBuilder} for method chaining + * @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining */ public StrimziKafkaClusterBuilder withKraft() { this.enableKRaft = true; @@ -263,6 +263,17 @@ public Collection getBrokers() { return this.brokers; } + /** + * Get the bootstrap servers that containers on the same network should use to connect + * @return a comma separated list of Kafka bootstrap servers + */ + @DoNotMutate + public String getNetworkBootstrapServers() { + return brokers.stream() + .map(broker -> ((StrimziKafkaContainer) broker).getNetworkBootstrapServers()) + .collect(Collectors.joining(",")); + } + @Override @DoNotMutate public boolean hasKraftOrExternalZooKeeperConfigured() { diff --git a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java index 8d96cef..f4852b5 100644 --- a/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziKafkaContainer.java @@ -618,6 +618,15 @@ public String getBootstrapServers() { return bootstrapServersProvider.apply(this); } + /** + * Get the bootstrap servers that containers on the same network should use to connect + * @return a comma separated list of Kafka bootstrap servers + */ + @DoNotMutate + public String getNetworkBootstrapServers() { + return NETWORK_ALIAS_PREFIX + brokerId + ":" + INTER_BROKER_LISTENER_PORT; + } + /** * Get the cluster id. This is only supported for KRaft containers. * @return The cluster id. diff --git a/src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java b/src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java new file mode 100644 index 0000000..f0280b3 --- /dev/null +++ b/src/test/java/io/strimzi/test/container/StrimziConnectClusterIT.java @@ -0,0 +1,232 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; + +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.hasItem; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +public class StrimziConnectClusterIT { + + public static final Set MIRROR_MAKER_CONNECTORS = Set.of( + "MirrorSourceConnector", + "MirrorCheckpointConnector", + "MirrorHeartbeatConnector"); + + public static final Set FILE_CONNECTORS = Set.of( + "FileStreamSinkConnector", + "FileStreamSourceConnector"); + + private StrimziKafkaCluster kafkaCluster; + private StrimziConnectCluster connectCluster; + + @BeforeEach + public void setUp() { + kafkaCluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withSharedNetwork() + .withNumberOfBrokers(1) + .withKraft() + .build(); + kafkaCluster.start(); + } + + @AfterEach + public void tearDown() { + kafkaCluster.stop(); + connectCluster.stop(); + } + + @Test + public void testBasicCluster() throws Exception { + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + String connectors = httpGet("/connector-plugins"); + for (String connector : MIRROR_MAKER_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + for (String connector : FILE_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + } + + @Test + public void testDisableFileConnectors() throws Exception { + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .withoutFileConnectors() + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + String connectors = httpGet("/connector-plugins"); + for (String connector : MIRROR_MAKER_CONNECTORS) { + assertThat(connectors, containsString(connector)); + } + for (String connector : FILE_CONNECTORS) { + assertThat(connectors, not(containsString(connector))); + } + } + + @Test + public void testKafkaVersion() throws Exception { + String version = "3.8.1"; + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .withKafkaVersion(version) + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + assertThat(root, containsString(version)); + } + + @Test + public void testOverrideConfigs() throws Exception { + String offsetTopic = "custom-offset-topic"; + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .withAdditionalConnectConfiguration(Map.of("offset.storage.topic", offsetTopic)) + .build(); + connectCluster.start(); + + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()))) { + Set topics = admin.listTopics().names().get(); + assertThat(topics, hasItem(offsetTopic)); + } + } + + @Test + public void testRunConnector() throws Exception { + String topic = "topic-to-export"; + String file = "/tmp/sink.out"; + String connectorName = "file-sink"; + List records = new ArrayList<>(); + try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()))) { + admin.createTopics(List.of(new NewTopic(topic, 1, (short) -1))); + } + try (KafkaProducer producer = new KafkaProducer<>(Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName() + ))) { + for (int i = 0; i < 5; i++) { + String value = "record" + i; + producer.send(new ProducerRecord<>(topic, value)); + records.add(value); + } + } + connectCluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("my-cluster") + .withKafkaCluster(kafkaCluster) + .build(); + connectCluster.start(); + assertThat(connectCluster.getWorkers().size(), is(1)); + String root = httpGet("/"); + assertThat(root, containsString(getClusterId())); + + String connectorConfig = + "{\n" + + " \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSinkConnector\",\n" + + " \"topics\": \"" + topic + "\",\n" + + " \"file\": \"" + file + "\"\n" + + "}"; + String config = httpPut("/connectors/" + connectorName + "/config", connectorConfig); + assertThat(config, containsString("\"name\":\"" + connectorName + "\"")); + assertThat(config, containsString("\"type\":\"sink\"")); + String connectors = httpGet("/connectors"); + assertThat(connectors, containsString(connectorName)); + + GenericContainer worker = connectCluster.getWorkers().iterator().next(); + + for (String record : records) { + Utils.waitFor("Checking " + record + " is in " + file, + Duration.ofSeconds(5), + Duration.ofMinutes(1), + () -> { + try { + Container.ExecResult result = worker.execInContainer("sh", "-c", "cat " + file); + return result.getStdout().contains(record); + } catch (Exception exc) { + return false; + } + } + ); + } + } + + public String httpGet(String path) throws Exception { + HttpClient httpClient = HttpClient.newHttpClient(); + URI uri = new URI(connectCluster.getRestEndpoint() + path); + HttpRequest request = HttpRequest.newBuilder() + .GET() + .uri(uri) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode(), is(HttpURLConnection.HTTP_OK)); + return response.body(); + } + + public String httpPut(String path, String body) throws Exception { + HttpClient httpClient = HttpClient.newHttpClient(); + URI uri = new URI(connectCluster.getRestEndpoint() + path); + HttpRequest request = HttpRequest.newBuilder() + .PUT(HttpRequest.BodyPublishers.ofString(body)) + .setHeader("Content-Type", "application/json") + .uri(uri) + .build(); + HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString()); + assertThat(response.statusCode(), is(HttpURLConnection.HTTP_CREATED)); + return response.body(); + } + + String getClusterId() throws Exception { + try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster.getBootstrapServers()))) { + return admin.describeCluster().clusterId().get(); + } + } +} diff --git a/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java new file mode 100644 index 0000000..1e71734 --- /dev/null +++ b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java @@ -0,0 +1,119 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.test.container; + + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class StrimziConnectClusterTest { + + @Test + void testConnectClusterNegativeOrZeroNumberOfWorkers() { + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withNumberOfWorkers(0) + .withGroupId("groupId") + .build() + ); + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withNumberOfWorkers(-1) + .withGroupId("groupId") + .build() + ); + } + + @Test + void testConnectClusterWithoutBootstrapServers() { + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withGroupId("groupId") + .withNumberOfWorkers(1) + .build() + ); + } + + @Test + void testConnectClusterWithoutGroupId() { + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withNumberOfWorkers(1) + .build() + ); + } + + @Test + void testConnectCluster() { + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId("groupId") + .build(); + } + + @Test + void testGetRestApiEndpointThrowsBeforeStart() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId("groupId") + .build(); + assertThrows(IllegalStateException.class, cluster::getRestEndpoint); + } + + @Test + void testAllBuilderMethods() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withNumberOfWorkers(2) + .withGroupId("groupId") + .withKafkaVersion("3.8.1") + .withAdditionalConnectConfiguration(Map.of("plugin.path", "/tmp")) + .withoutFileConnectors() + .build(); + } + + @Test + void testSetPluginPathWithFileConnector() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withNumberOfWorkers(2) + .withGroupId("groupId") + .withAdditionalConnectConfiguration(Map.of("plugin.path", "/tmp")) + .build(); + } + + @Test + void testWithAdditionalConnectConfigurationNull() { + assertThrows(IllegalArgumentException.class, () -> + new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withNumberOfWorkers(2) + .withGroupId("groupId") + .withAdditionalConnectConfiguration(null) + .build()); + } +} diff --git a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java index 47ba3ab..6e80a8a 100644 --- a/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java +++ b/src/test/java/io/strimzi/test/container/StrimziKafkaContainerTest.java @@ -36,6 +36,11 @@ class StrimziKafkaContainerTest { private StrimziKafkaContainer kafkaContainer; + @BeforeEach + void setUp() { + kafkaContainer = new StrimziKafkaContainer(); + } + @Test void testDefaultInitialization() { assertThat(kafkaContainer, is(notNullValue())); @@ -107,11 +112,6 @@ void testIllegalStateWhenUsingExternalZooKeeperWithKraft() { .withExternalZookeeperConnect("localhost:2181")); } - @BeforeEach - void setUp() { - kafkaContainer = new StrimziKafkaContainer(); - } - @Test void testGetProxyWithoutConfigurationThrowsException() { assertThrows(IllegalStateException.class, kafkaContainer::getProxy); From 0b3b5a8f2014b4275df66c2dea310bc304f0b217 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:41:03 +0100 Subject: [PATCH 02/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index 3a3578d..7d99941 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -33,7 +33,7 @@ public class StrimziConnectCluster { private final List workers; public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { - kafkaCluster = builder.kafkaCluster; + this.kafkaCluster = builder.kafkaCluster; additionalConnectConfiguration = builder.additionalConnectConfiguration; kafkaVersion = builder.kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() From 2c1d4e20019f5036d3044fbed3fa76af1f2a6f81 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:41:11 +0100 Subject: [PATCH 03/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index 7d99941..6cde65c 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -34,7 +34,7 @@ public class StrimziConnectCluster { public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { this.kafkaCluster = builder.kafkaCluster; - additionalConnectConfiguration = builder.additionalConnectConfiguration; + this.additionalConnectConfiguration = builder.additionalConnectConfiguration; kafkaVersion = builder.kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : builder.kafkaVersion; From 224f3ba99ecc44b4fba2b02be727e544817f9b6a Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:41:20 +0100 Subject: [PATCH 04/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index 6cde65c..b46336d 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -35,7 +35,7 @@ public class StrimziConnectCluster { public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { this.kafkaCluster = builder.kafkaCluster; this.additionalConnectConfiguration = builder.additionalConnectConfiguration; - kafkaVersion = builder.kafkaVersion == null + this.kafkaVersion = builder.kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : builder.kafkaVersion; includeFileConnectors = builder.includeFileConnectors; From 172706beab8247486ee1377cd6af7b6762999ecd Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:41:27 +0100 Subject: [PATCH 05/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index b46336d..4d5ed79 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -38,7 +38,7 @@ public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { this.kafkaVersion = builder.kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : builder.kafkaVersion; - includeFileConnectors = builder.includeFileConnectors; + this.includeFileConnectors = builder.includeFileConnectors; groupId = builder.groupId; String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion); From 73001f16baefb1d5a564aa725894c7da2b1f8c70 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:41:34 +0100 Subject: [PATCH 06/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index 4d5ed79..d24c252 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -39,7 +39,7 @@ public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { ? KafkaVersionService.getInstance().latestRelease().getVersion() : builder.kafkaVersion; this.includeFileConnectors = builder.includeFileConnectors; - groupId = builder.groupId; + this.groupId = builder.groupId; String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion); From a38bff93f5913e00d562d3dc721ce60915c20b78 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:42:11 +0100 Subject: [PATCH 07/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index d24c252..a85635d 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -41,7 +41,7 @@ public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { this.includeFileConnectors = builder.includeFileConnectors; this.groupId = builder.groupId; - String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion); + final String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion); workers = new ArrayList<>(); for (int i = 0; i < builder.workersNum; i++) { From 891766f8f260bc5cc8c40f130997a17628f3467b Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:42:20 +0100 Subject: [PATCH 08/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index a85635d..af5ceb1 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -45,7 +45,7 @@ public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { workers = new ArrayList<>(); for (int i = 0; i < builder.workersNum; i++) { - String host = NETWORK_ALIAS_PREFIX + i; + final String host = NETWORK_ALIAS_PREFIX + i; Properties configs = buildConfigs(host); StrimziConnectContainer worker = new StrimziConnectContainer(imageName, kafkaCluster, configs) .withNetwork(kafkaCluster.getNetwork()) From 472a1c9eab30516d8fa1e9919c481e85cfa4be3c Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:42:31 +0100 Subject: [PATCH 09/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index af5ceb1..b0fcd46 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -46,7 +46,7 @@ public StrimziConnectCluster(StrimziConnectClusterBuilder builder) { workers = new ArrayList<>(); for (int i = 0; i < builder.workersNum; i++) { final String host = NETWORK_ALIAS_PREFIX + i; - Properties configs = buildConfigs(host); + final Properties configs = buildConfigs(host); StrimziConnectContainer worker = new StrimziConnectContainer(imageName, kafkaCluster, configs) .withNetwork(kafkaCluster.getNetwork()) .withNetworkAliases(host) From 0bf6e2b973dd14a312c87472f4d88b808727caa8 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:42:41 +0100 Subject: [PATCH 10/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index b0fcd46..9e308bc 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -75,7 +75,7 @@ private Properties buildConfigs(String host) { if (includeFileConnectors) { String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar"; if (properties.containsKey("plugin.path")) { - String pluginPath = properties.getProperty("plugin.path"); + final String pluginPath = properties.getProperty("plugin.path"); properties.setProperty("plugin.path", pluginPath + "," + connectFileJar); } else { properties.setProperty("plugin.path", connectFileJar); From ae2bf2eeeebaacadff6a28716768025a536df16f Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:42:57 +0100 Subject: [PATCH 11/15] Update src/main/java/io/strimzi/test/container/StrimziConnectCluster.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectCluster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java index 9e308bc..8c2bcd9 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectCluster.java @@ -73,7 +73,7 @@ private Properties buildConfigs(String host) { properties.setProperty("listeners", "http://:" + CONNECT_PORT + ",http://" + host + ":" + INTER_WORKER_PORT); properties.putAll(additionalConnectConfiguration); if (includeFileConnectors) { - String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar"; + final String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar"; if (properties.containsKey("plugin.path")) { final String pluginPath = properties.getProperty("plugin.path"); properties.setProperty("plugin.path", pluginPath + "," + connectFileJar); From 5854a28d9f735578042d9a3e4b83b5b605027d8a Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 7 Jan 2025 10:43:05 +0100 Subject: [PATCH 12/15] Update src/main/java/io/strimzi/test/container/StrimziConnectContainer.java Co-authored-by: Maros Orsak Signed-off-by: Mickael Maison --- .../java/io/strimzi/test/container/StrimziConnectContainer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java index 7802c07..b892512 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java @@ -55,7 +55,7 @@ protected void containerIsStarting(final InspectContainerResponse containerInfo, CONFIG_FILE); // Write starter script to a file in the container - String command = "/opt/kafka/bin/connect-distributed.sh " + CONFIG_FILE; + final String command = "/opt/kafka/bin/connect-distributed.sh " + CONFIG_FILE; copyFileToContainer( Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700), STARTER_SCRIPT From 3f6e692138985922be68748ad67aea365b71a809 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Fri, 10 Jan 2025 14:56:29 +0100 Subject: [PATCH 13/15] Add testcase with empty additional config Signed-off-by: Mickael Maison --- .../container/StrimziConnectClusterTest.java | 43 ++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java index 1e71734..f9930b8 100644 --- a/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java +++ b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java @@ -9,10 +9,15 @@ import java.util.Map; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; public class StrimziConnectClusterTest { + private static final String GROUP_ID = "groupId"; + private static final int WORKERS = 1; + @Test void testConnectClusterNegativeOrZeroNumberOfWorkers() { assertThrows(IllegalArgumentException.class, () -> @@ -21,7 +26,7 @@ void testConnectClusterNegativeOrZeroNumberOfWorkers() { .withNumberOfBrokers(1) .build()) .withNumberOfWorkers(0) - .withGroupId("groupId") + .withGroupId(GROUP_ID) .build() ); assertThrows(IllegalArgumentException.class, () -> @@ -30,7 +35,7 @@ void testConnectClusterNegativeOrZeroNumberOfWorkers() { .withNumberOfBrokers(1) .build()) .withNumberOfWorkers(-1) - .withGroupId("groupId") + .withGroupId(GROUP_ID) .build() ); } @@ -39,8 +44,8 @@ void testConnectClusterNegativeOrZeroNumberOfWorkers() { void testConnectClusterWithoutBootstrapServers() { assertThrows(IllegalArgumentException.class, () -> new StrimziConnectCluster.StrimziConnectClusterBuilder() - .withGroupId("groupId") - .withNumberOfWorkers(1) + .withGroupId(GROUP_ID) + .withNumberOfWorkers(WORKERS) .build() ); } @@ -52,19 +57,20 @@ void testConnectClusterWithoutGroupId() { .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() .withNumberOfBrokers(1) .build()) - .withNumberOfWorkers(1) + .withNumberOfWorkers(WORKERS) .build() ); } @Test void testConnectCluster() { - new StrimziConnectCluster.StrimziConnectClusterBuilder() + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() .withNumberOfBrokers(1) .build()) - .withGroupId("groupId") + .withGroupId(GROUP_ID) .build(); + assertThat(cluster.getWorkers().size(), is(WORKERS)); } @Test @@ -73,8 +79,9 @@ void testGetRestApiEndpointThrowsBeforeStart() { .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() .withNumberOfBrokers(1) .build()) - .withGroupId("groupId") + .withGroupId(GROUP_ID) .build(); + assertThat(cluster.getWorkers().size(), is(WORKERS)); assertThrows(IllegalStateException.class, cluster::getRestEndpoint); } @@ -85,11 +92,12 @@ void testAllBuilderMethods() { .withNumberOfBrokers(1) .build()) .withNumberOfWorkers(2) - .withGroupId("groupId") + .withGroupId(GROUP_ID) .withKafkaVersion("3.8.1") .withAdditionalConnectConfiguration(Map.of("plugin.path", "/tmp")) .withoutFileConnectors() .build(); + assertThat(cluster.getWorkers().size(), is(2)); } @Test @@ -99,9 +107,10 @@ void testSetPluginPathWithFileConnector() { .withNumberOfBrokers(1) .build()) .withNumberOfWorkers(2) - .withGroupId("groupId") + .withGroupId(GROUP_ID) .withAdditionalConnectConfiguration(Map.of("plugin.path", "/tmp")) .build(); + assertThat(cluster.getWorkers().size(), is(2)); } @Test @@ -112,8 +121,20 @@ void testWithAdditionalConnectConfigurationNull() { .withNumberOfBrokers(1) .build()) .withNumberOfWorkers(2) - .withGroupId("groupId") + .withGroupId(GROUP_ID) .withAdditionalConnectConfiguration(null) .build()); } + + @Test + void testWithEmptyAdditionalConnectConfiguration() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId(GROUP_ID) + .withAdditionalConnectConfiguration(Map.of()) + .build(); + assertThat(cluster.getWorkers().size(), is(WORKERS)); + } } From f6f1de004fd2915e48b5da22a65ceaff9997b141 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 13 Jan 2025 12:08:14 +0100 Subject: [PATCH 14/15] Add tests for plugin path Signed-off-by: Mickael Maison --- .../container/StrimziConnectContainer.java | 5 ++ .../container/StrimziConnectClusterTest.java | 47 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java index b892512..59fa599 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java @@ -67,4 +67,9 @@ public StrimziConnectContainer waitForRunning() { super.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1)); return this; } + + @DoNotMutate + /* for testing */ Properties getConfigs() { + return configs; + } } diff --git a/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java index f9930b8..dad63a5 100644 --- a/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java +++ b/src/test/java/io/strimzi/test/container/StrimziConnectClusterTest.java @@ -8,8 +8,11 @@ import org.junit.jupiter.api.Test; import java.util.Map; +import java.util.Properties; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -137,4 +140,48 @@ void testWithEmptyAdditionalConnectConfiguration() { .build(); assertThat(cluster.getWorkers().size(), is(WORKERS)); } + + @Test + void testIncludeFileConnectorsAddsPluginPath() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId("groupId") + .build(); + + Properties configs = ((StrimziConnectContainer) cluster.getWorkers().iterator().next()).getConfigs(); + assertThat(configs.getProperty("plugin.path"), containsString("connect-file")); + } + + @Test + void testAdditionalPluginPath() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId("groupId") + .withAdditionalConnectConfiguration(Map.of("plugin.path", "/other-path")) + .build(); + + Properties configs = ((StrimziConnectContainer) cluster.getWorkers().iterator().next()).getConfigs(); + assertThat(configs.getProperty("plugin.path"), containsString("connect-file")); + assertThat(configs.getProperty("plugin.path"), containsString("/other-path")); + } + + @Test + void testAdditionalPluginPathWithoutFileConnectors() { + StrimziConnectCluster cluster = new StrimziConnectCluster.StrimziConnectClusterBuilder() + .withKafkaCluster(new StrimziKafkaCluster.StrimziKafkaClusterBuilder() + .withNumberOfBrokers(1) + .build()) + .withGroupId("groupId") + .withoutFileConnectors() + .withAdditionalConnectConfiguration(Map.of("plugin.path", "/other-path")) + .build(); + + Properties configs = ((StrimziConnectContainer) cluster.getWorkers().iterator().next()).getConfigs(); + assertThat(configs.getProperty("plugin.path"), not(containsString("connect-file"))); + assertThat(configs.getProperty("plugin.path"), containsString("/other-path")); + } } From a89dcbf8b28b2d4957c03ff8612acf4f4694ac4c Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 13 Jan 2025 13:20:31 +0100 Subject: [PATCH 15/15] Address feedback from Maros Signed-off-by: Mickael Maison --- pom.xml | 4 ++-- .../io/strimzi/test/container/StrimziConnectContainer.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 66301af..bcb5981 100644 --- a/pom.xml +++ b/pom.xml @@ -454,8 +454,8 @@ org.slf4j org.apache.commons.logging - 88 - 74 + 92 + 72 true diff --git a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java index 59fa599..443aca6 100644 --- a/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java +++ b/src/main/java/io/strimzi/test/container/StrimziConnectContainer.java @@ -68,7 +68,6 @@ public StrimziConnectContainer waitForRunning() { return this; } - @DoNotMutate /* for testing */ Properties getConfigs() { return configs; }