Skip to content

Commit

Permalink
feat: configurable retry interval after retriable exceptions (#54)
Browse files Browse the repository at this point in the history
If the connector throws a retriable exception, there is no need
for Connect's Kafka consumer to wait for as long to re-consume
messages from the Kafka topic. It's therefore helpful to be able
to specify that the consumer should use a shorter timeout for
polls immediately following a retriable exception.

This commit introduces a new config parameter mq.retry.backoff.ms
which allows this to be controlled.

By default, it is set to 1 minute, which is how long it
currently waits.

Signed-off-by: Dale Lane <[email protected]>
  • Loading branch information
dalelane authored Apr 29, 2023
1 parent ea8a06c commit af9111d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ COPY --chown=esuser:esgroup --from=builder /opt/kafka/libs/ /opt/kafka/libs/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-distributed.properties /opt/kafka/config/
COPY --chown=esuser:esgroup --from=builder /opt/kafka/config/connect-log4j.properties /opt/kafka/config/
RUN mkdir /opt/kafka/logs && chown esuser:esgroup /opt/kafka/logs
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.4.0-jar-with-dependencies.jar /opt/kafka/libs/
COPY --chown=esuser:esgroup target/kafka-connect-mq-sink-1.5.0-jar-with-dependencies.jar /opt/kafka/libs/

WORKDIR /opt/kafka

Expand Down
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ curl -X POST -H "Content-Type: application/json" http://localhost:8083/connector
This repository includes an example Dockerfile to run Kafka Connect in distributed mode. It also adds in the MQ sink connector as an available connector plugin. It uses the default `connect-distributed.properties` and `connect-log4j.properties` files.

1. `mvn clean package`
1. `docker build -t kafkaconnect-with-mq-sink:1.4.0 .`
1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0`
1. `docker build -t kafkaconnect-with-mq-sink:1.5.0 .`
1. `docker run -p 8083:8083 kafkaconnect-with-mq-sink:1.5.0`

**NOTE:** To provide custom properties files create a folder called `config` containing the `connect-distributed.properties` and `connect-log4j.properties` files and use a Docker volume to make them available when running the container like this:

``` shell
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.4.0
docker run -v $(pwd)/config:/opt/kafka/config -p 8083:8083 kafkaconnect-with-mq-sink:1.5.0
```

To start the MQ connector, you can use `config/mq-sink.json` in this repository after replacing all placeholders and use a command like this:
Expand Down Expand Up @@ -312,6 +312,7 @@ The configuration options for the Kafka Connect sink connector for IBM MQ are as
| mq.message.builder.partition.property | The JMS message property to set from the Kafka partition | string | | Blank or valid JMS property name |
| mq.message.builder.offset.property | The JMS message property to set from the Kafka offset | string | | Blank or valid JMS property name |
| mq.reply.queue | The name of the reply-to queue | string | | MQ queue name or queue URI |
| mq.retry.backoff.ms | Wait time, in milliseconds, before retrying after retriable exceptions | long | 60000 | [0,...] |


### Using a CCDT file
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>com.ibm.eventstreams.connect</groupId>
<artifactId>kafka-connect-mq-sink</artifactId>
<packaging>jar</packaging>
<version>1.4.0</version>
<version>1.5.0</version>
<name>kafka-connect-mq-sink</name>
<organization>
<name>IBM Corporation</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,12 @@ public class MQSinkConnector extends SinkConnector {
public static final String CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Whether to copy Kafka headers to JMS message properties.";
public static final String CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES = "Copy Kafka headers to JMS message properties";

public static String VERSION = "1.4.0";
public static final String CONFIG_NAME_MQ_RETRY_BACKOFF_MS = "mq.retry.backoff.ms";
public static final String CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS = "Time to wait, in milliseconds, before retrying after retriable exceptions";
public static final String CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS = "Retry backoff (ms)";


public static String VERSION = "1.5.0";

private Map<String, String> configProps;

Expand Down Expand Up @@ -336,6 +341,10 @@ public class MQSinkConnector extends SinkConnector {
CONFIG_DOCUMENTATION_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES, CONFIG_GROUP_MQ, 27, Width.SHORT,
CONFIG_DISPLAY_KAFKA_HEADERS_COPY_TO_JMS_PROPERTIES);

config.define(CONFIG_NAME_MQ_RETRY_BACKOFF_MS, Type.LONG, 60000, Range.between(0L, 99999999900L), Importance.LOW,
CONFIG_DOCUMENTATION_MQ_RETRY_BACKOFF_MS, CONFIG_GROUP_MQ, 28, Width.SHORT,
CONFIG_DISPLAY_MQ_RETRY_BACKOFF_MS);

return config;
}
}
25 changes: 21 additions & 4 deletions src/main/java/com/ibm/eventstreams/connect/mqsink/MQSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

Expand All @@ -32,6 +33,8 @@ public class MQSinkTask extends SinkTask {

private JMSWriter writer;

private long retryBackoffMs = 60000;

public MQSinkTask() {
}

Expand Down Expand Up @@ -61,6 +64,13 @@ public MQSinkTask() {
log.debug("Task props entry {} : {}", entry.getKey(), value);
}

// check if a custom retry time is provided
String retryBackoffMsStr = props.get(MQSinkConnector.CONFIG_NAME_MQ_RETRY_BACKOFF_MS);
if (retryBackoffMsStr != null) {
retryBackoffMs = Long.parseLong(retryBackoffMsStr);
}
log.debug("Setting retry backoff {}", retryBackoffMs);

// Construct a writer to interface with MQ
writer = new JMSWriter();
writer.configure(props);
Expand All @@ -85,12 +95,19 @@ public MQSinkTask() {
@Override public void put(Collection<SinkRecord> records) {
log.trace("[{}] Entry {}.put, records.size={}", Thread.currentThread().getId(), this.getClass().getName(), records.size());

for (SinkRecord r: records) {
log.debug("Putting record for topic {}, partition {} and offset {}", r.topic(), r.kafkaPartition(), r.kafkaOffset());
writer.send(r);
try {
for (SinkRecord r: records) {
log.debug("Putting record for topic {}, partition {} and offset {}", r.topic(), r.kafkaPartition(), r.kafkaOffset());
writer.send(r);
}

writer.commit();
}
catch (RetriableException rte) {
context.timeout(retryBackoffMs);
throw rte;
}

writer.commit();
log.trace("[{}] Exit {}.put", Thread.currentThread().getId(), this.getClass().getName());
}

Expand Down

0 comments on commit af9111d

Please sign in to comment.