Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for starting Kafka Connect #117

Merged
merged 15 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,8 @@
<avoidCallsTo>org.slf4j</avoidCallsTo>
<avoidCallsTo>org.apache.commons.logging</avoidCallsTo>
</avoidCallsTo>
<mutationThreshold>91</mutationThreshold>
<coverageThreshold>71</coverageThreshold>
<mutationThreshold>88</mutationThreshold>
<coverageThreshold>74</coverageThreshold>
see-quick marked this conversation as resolved.
Show resolved Hide resolved
<verbose>true</verbose>
</configuration>
</plugin>
Expand Down
238 changes: 238 additions & 0 deletions src/main/java/io/strimzi/test/container/StrimziConnectCluster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.test.container;

import com.groupcdg.pitest.annotations.DoNotMutate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
* A Kafka Connect cluster using the latest image from quay.io/strimzi/kafka with the given version.
* Kafka Connect is started in distributed mode. Users must use the exposed REST API to start, stop and manage connectors.
*/
public class StrimziConnectCluster {

private static final String NETWORK_ALIAS_PREFIX = "connect-";
private static final int CONNECT_PORT = 8083;
private static final int INTER_WORKER_PORT = 8084;

private final StrimziKafkaCluster kafkaCluster;
private final Map<String, String> additionalConnectConfiguration;
private final String kafkaVersion;
private final boolean includeFileConnectors;
private final String groupId;
private final List<StrimziConnectContainer> workers;

public StrimziConnectCluster(StrimziConnectClusterBuilder builder) {
kafkaCluster = builder.kafkaCluster;
mimaison marked this conversation as resolved.
Show resolved Hide resolved
additionalConnectConfiguration = builder.additionalConnectConfiguration;
mimaison marked this conversation as resolved.
Show resolved Hide resolved
kafkaVersion = builder.kafkaVersion == null
mimaison marked this conversation as resolved.
Show resolved Hide resolved
? KafkaVersionService.getInstance().latestRelease().getVersion()
: builder.kafkaVersion;
includeFileConnectors = builder.includeFileConnectors;
mimaison marked this conversation as resolved.
Show resolved Hide resolved
groupId = builder.groupId;
mimaison marked this conversation as resolved.
Show resolved Hide resolved

String imageName = KafkaVersionService.strimziTestContainerImageName(kafkaVersion);
mimaison marked this conversation as resolved.
Show resolved Hide resolved

workers = new ArrayList<>();
for (int i = 0; i < builder.workersNum; i++) {
String host = NETWORK_ALIAS_PREFIX + i;
mimaison marked this conversation as resolved.
Show resolved Hide resolved
Properties configs = buildConfigs(host);
mimaison marked this conversation as resolved.
Show resolved Hide resolved
StrimziConnectContainer worker = new StrimziConnectContainer(imageName, kafkaCluster, configs)
.withNetwork(kafkaCluster.getNetwork())
.withNetworkAliases(host)
.withExposedPorts(CONNECT_PORT)
.withEnv("LOG_DIR", "/tmp")
.waitForRunning()
.waitingFor(Wait.forHttp("/").forStatusCode(200));
workers.add(worker);
}
}

private Properties buildConfigs(String host) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaCluster.getNetworkBootstrapServers());
properties.setProperty("group.id", groupId);
properties.setProperty("key.converter", "org.apache.kafka.connect.storage.StringConverter");
properties.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter");
properties.setProperty("offset.storage.topic", "connect-offsets");
properties.setProperty("offset.storage.replication.factor", "-1");
properties.setProperty("config.storage.topic", "connect-configs");
properties.setProperty("config.storage.replication.factor", "-1");
properties.setProperty("status.storage.topic", "connect-status");
properties.setProperty("status.storage.replication.factor", "-1");
properties.setProperty("listeners", "http://:" + CONNECT_PORT + ",http://" + host + ":" + INTER_WORKER_PORT);
properties.putAll(additionalConnectConfiguration);
if (includeFileConnectors) {
String connectFileJar = "/opt/kafka/libs/connect-file-" + kafkaVersion + ".jar";
mimaison marked this conversation as resolved.
Show resolved Hide resolved
if (properties.containsKey("plugin.path")) {
String pluginPath = properties.getProperty("plugin.path");
mimaison marked this conversation as resolved.
Show resolved Hide resolved
properties.setProperty("plugin.path", pluginPath + "," + connectFileJar);
} else {
properties.setProperty("plugin.path", connectFileJar);
}
}
return properties;
}

/**
* Get the workers of this Kafka Connect cluster.
*
* @return collection of GenericContainer containers
*/
@DoNotMutate
public Collection<GenericContainer<?>> getWorkers() {
return new ArrayList<>(workers);
}

/**
* Start the Kafka Connect cluster.
* This starts all the workers and waits for them to all be healthy and ready to be used.
*/
@DoNotMutate
public void start() {
for (StrimziConnectContainer worker : workers) {
worker.start();
}
}

/**
* Stop the Kafka Connect cluster.
*/
@DoNotMutate
public void stop() {
workers.forEach(StrimziConnectContainer::stop);
}

/**
* Return the REST API endpoint of one of the available workers.
*
* @return the REST API endpoint
*/
@DoNotMutate
public String getRestEndpoint() {
for (StrimziConnectContainer worker : workers) {
if (worker.isRunning()) {
return "http://" + worker.getHost() + ":" + worker.getMappedPort(CONNECT_PORT);
}
}
throw new IllegalStateException("No workers are running and healthy");
}

/**
* Builder class for {@code StrimziConnectCluster}.
* <p>
* Use this builder to create instances of {@code StrimziConnectCluster}.
* You must at least call {@link #withKafkaCluster(StrimziKafkaCluster)}, and
* {@link #withGroupId(String)} before calling {@link #build()}.
* </p>
*/
public static class StrimziConnectClusterBuilder {

private Map<String, String> additionalConnectConfiguration = new HashMap<>();
private boolean includeFileConnectors = true;
private int workersNum = 1;
private String kafkaVersion;
private StrimziKafkaCluster kafkaCluster;
private String groupId;

/**
* Set the Kafka cluster the Kafka Connect cluster will use to.
*
* @param kafkaCluster the {@link StrimziKafkaCluster} instance
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withKafkaCluster(StrimziKafkaCluster kafkaCluster) {
this.kafkaCluster = kafkaCluster;
return this;
}

/**
* Set the number of Kafka Connect workers in the cluster.
* If not called, the cluster has a single worker.
*
* @param workersNum the number of Kafka Connect workers
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withNumberOfWorkers(int workersNum) {
this.workersNum = workersNum;
return this;
}

/**
* Add additional Kafka Connect configuration parameters.
* These configurations are applied to all workers in the cluster.
*
* @param additionalConnectConfiguration a map of additional Kafka Connect configuration options
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withAdditionalConnectConfiguration(Map<String, String> additionalConnectConfiguration) {
this.additionalConnectConfiguration = additionalConnectConfiguration;
return this;
}

/**
* Specify the Kafka version to be used for the Connect workers in the cluster.
* If not called, the latest Kafka version available from {@link KafkaVersionService} will be used.
*
* @param kafkaVersion the desired Kafka version for the Connect cluster
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withKafkaVersion(String kafkaVersion) {
this.kafkaVersion = kafkaVersion;
return this;
}

/**
* Disable the FileStreams connectors.
* If not called, the FileSteams connectors are added to plugin.path.
*
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withoutFileConnectors() {
this.includeFileConnectors = false;
return this;
}

/**
* Specify the group.id of the Connect cluster.
*
* @param groupId the group id
* @return the current instance of {@code StrimziConnectClusterBuilder} for method chaining
*/
public StrimziConnectClusterBuilder withGroupId(String groupId) {
this.groupId = groupId;
return this;
}

/**
* Build and return a {@code StrimziConnectCluster} instance based on the provided configurations.
*
* @return a new instance of {@code StrimziConnectCluster}
*/
public StrimziConnectCluster build() {
if (kafkaCluster == null) {
throw new IllegalArgumentException("A Kafka cluster must be specified");
}
if (groupId == null) {
throw new IllegalArgumentException("The Connect cluster group.id configuration must be specified");
}
if (workersNum <= 0) {
throw new IllegalArgumentException("The number of workers in the Connect cluster must be greater than 0");
}
if (additionalConnectConfiguration == null) {
throw new IllegalArgumentException("The additional configuration must be specified");
}
return new StrimziConnectCluster(this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.test.container;

import com.github.dockerjava.api.command.InspectContainerResponse;
import com.groupcdg.pitest.annotations.DoNotMutate;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;

import java.io.IOException;
import java.io.StringWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Properties;

class StrimziConnectContainer extends GenericContainer<StrimziConnectContainer> {

private static final String STARTER_SCRIPT = "/start_connect.sh";
private static final String CONFIG_FILE = "/opt/kafka/config/connect.properties";

private final StrimziKafkaCluster kafkaCluster;
private final Properties configs;

public StrimziConnectContainer(String imageName, StrimziKafkaCluster kafkaCluster, Properties configs) {
super(imageName);
this.kafkaCluster = kafkaCluster;
this.configs = configs;
}

@Override
@DoNotMutate
protected void doStart() {
super.setNetwork(kafkaCluster.getNetwork());
super.setCommand("sh", "-c", "while [ ! -x " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
super.doStart();
}

@Override
@DoNotMutate
protected void containerIsStarting(final InspectContainerResponse containerInfo, final boolean reused) {
super.containerIsStarting(containerInfo, reused);

// Write configs to a file in the container
StringWriter writer = new StringWriter();
try {
configs.store(writer, null);
} catch (IOException e) {
throw new UncheckedIOException("Failed to build configuration file", e);
}
copyFileToContainer(
Transferable.of(writer.toString().getBytes(StandardCharsets.UTF_8)),
CONFIG_FILE);

// Write starter script to a file in the container
String command = "/opt/kafka/bin/connect-distributed.sh " + CONFIG_FILE;
mimaison marked this conversation as resolved.
Show resolved Hide resolved
copyFileToContainer(
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700),
STARTER_SCRIPT
);
}

@DoNotMutate
public StrimziConnectContainer waitForRunning() {
super.waitingFor(Wait.forLogMessage(".*Finished starting connectors and tasks.*", 1));
return this;
}
}
Loading
Loading