diff --git a/commons/build.gradle.kts b/commons/build.gradle.kts
index 23240446..101ef8db 100644
--- a/commons/build.gradle.kts
+++ b/commons/build.gradle.kts
@@ -87,6 +87,7 @@ dependencies {
testImplementation(jackson.databind)
testImplementation(testinglibs.mockito.core)
testImplementation(testinglibs.assertj.core)
+ testImplementation(testinglibs.awaitility)
testImplementation(testFixtures(project(":commons")))
testImplementation(testinglibs.woodstox.stax2.api)
testImplementation(apache.hadoop.mapreduce.client.core)
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java
index 7fb8cd9b..954c9151 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java
@@ -22,6 +22,8 @@
import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.input.InputFormat;
+import io.aiven.kafka.connect.common.source.input.Transformer;
+import io.aiven.kafka.connect.common.source.input.TransformerFactory;
public class SourceCommonConfig extends CommonConfig {
@@ -64,11 +66,15 @@ public String getTargetTopicPartitions() {
}
public ErrorsTolerance getErrorsTolerance() {
- return ErrorsTolerance.forName(sourceConfigFragment.getErrorsTolerance());
+ return sourceConfigFragment.getErrorsTolerance();
}
public int getMaxPollRecords() {
return sourceConfigFragment.getMaxPollRecords();
}
+ public Transformer getTransformer() {
+ return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
+ }
+
}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
index c62431dc..58befa60 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java
@@ -88,8 +88,8 @@ public int getExpectedMaxMessageBytes() {
return cfg.getInt(EXPECTED_MAX_MESSAGE_BYTES);
}
- public String getErrorsTolerance() {
- return cfg.getString(ERRORS_TOLERANCE);
+ public ErrorsTolerance getErrorsTolerance() {
+ return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
}
private static class ErrorsToleranceValidator implements ConfigDef.Validator {
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
new file mode 100644
index 00000000..f55257f4
--- /dev/null
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
@@ -0,0 +1,511 @@
+/*
+ * Copyright 2024-2025 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+
+import io.aiven.kafka.connect.common.config.SourceCommonConfig;
+import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles extracting records from an iterator and returning them to Kafka. It uses an exponential backoff
+ * with jitter to reduce the number of calls to the backend when there is no data. This solution:
+ *
+ *
When polled this implementation moves available records from the SourceRecord iterator to the return array.
+ *
if there are no records
+ *
+ *
{@link #poll()} will return null.
+ *
The poll will delay no more than approx 5 seconds.
+ *
+ *
+ *
Upto {@link #maxPollRecords} will be sent in a single poll request
+ *
When the connector is stopped any collected records are returned to kafka before stopping.
+ *
+ *
+ *
+ */
+public abstract class AbstractSourceTask extends SourceTask {
+
+ public static final List NULL_RESULT = null;
+
+ /**
+ * The maximum time to spend polling. This is set to 5 seconds as that is the time that is allotted to a system for
+ * shutdown.
+ */
+ public static final Duration MAX_POLL_TIME = Duration.ofSeconds(5);
+ /**
+ * The boolean that indicates the connector is stopped.
+ */
+ private final AtomicBoolean connectorStopped;
+
+ /**
+ * The logger to use. Set from the class implementing AbstractSourceTask.
+ */
+ private final Logger logger;
+
+ /**
+ * The maximum number of records to put in a poll. Specified in the configuration.
+ */
+ private int maxPollRecords;
+
+ /**
+ * The Backoff implementation that executes the delay in the poll loop.
+ */
+ private final Backoff backoff;
+
+ private final Timer timer;
+
+ /**
+ * The configuration
+ */
+ private SourceCommonConfig config;
+
+ private Iterator sourceRecordIterator;
+
+ /**
+ * Constructor.
+ *
+ * @param logger
+ * the logger to use.
+ */
+ protected AbstractSourceTask(final Logger logger) {
+ super();
+ this.logger = logger;
+ connectorStopped = new AtomicBoolean();
+ timer = new Timer(MAX_POLL_TIME);
+ backoff = new Backoff(timer.getBackoffConfig());
+ }
+
+ /**
+ * Gets the iterator of SourceRecords. The iterator that SourceRecords are extracted from during a poll event. When
+ * this iterator runs out of records it should attempt to reset and read more records from the backend on the next
+ * {@code hasNext()} call. In this way it should detect when new data has been added to the backend and continue
+ * processing.
+ *
+ * This method should handle any backend exception that can be retried. Any runtime exceptions that are thrown when
+ * this iterator executes may cause the task to abort.
+ *
+ *
+ * @param config
+ * the configuration for the Backoff.
+ * @return The iterator of SourceRecords.
+ */
+ abstract protected Iterator getIterator(BackoffConfig config);
+
+ /**
+ * Called by {@link #start} to allows the concrete implementation to configure itself based on properties.
+ *
+ * @param props
+ * the properties to use for configuration.
+ */
+ abstract protected SourceCommonConfig configure(Map props);
+
+ @Override
+ public final void start(final Map props) {
+ logger.debug("Starting");
+ config = configure(props);
+ maxPollRecords = config.getMaxPollRecords();
+ sourceRecordIterator = getIterator(timer.getBackoffConfig());
+ }
+
+ /**
+ * Try to add a SourceRecord to the results.
+ *
+ * @param results
+ * the result to add the record to.
+ * @param sourceRecordIterator
+ * the source record iterator.
+ * @return true if successful, false if the iterator is empty.
+ */
+ private boolean tryAdd(final List results, final Iterator sourceRecordIterator) {
+ if (sourceRecordIterator.hasNext()) {
+ backoff.reset();
+ final SourceRecord sourceRecord = sourceRecordIterator.next();
+ if (logger.isDebugEnabled()) {
+ logger.debug("tryAdd() : read record {}", sourceRecord.sourceOffset());
+ }
+ results.add(sourceRecord);
+ return true;
+ }
+ logger.info("No records found in tryAdd call");
+ return false;
+ }
+
+ /**
+ * Returns {@code true} if the connector is not stopped and the timer has not expired.
+ *
+ * @return {@code true} if the connector is not stopped and the timer has not expired.
+ */
+ protected boolean stillPolling() {
+ final boolean result = !connectorStopped.get() && !timer.isExpired();
+ logger.debug("Still polling: {}", result);
+ return result;
+ }
+
+ @Override
+ public final List poll() {
+ logger.debug("Polling");
+ if (connectorStopped.get()) {
+ logger.info("Stopping");
+ closeResources();
+ return NULL_RESULT;
+ } else {
+ timer.start();
+ try {
+ final List result = populateList();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Poll() returning {} SourceRecords.", result == null ? null : result.size());
+ }
+ return result;
+ } finally {
+ timer.stop();
+ timer.reset();
+ }
+ }
+ }
+
+ /**
+ * Attempts to populate the return list. Will read as many records into the list as it can until the timer expires
+ * or the task is shut down.
+ *
+ * @return A list SourceRecords or {@code null} if the system hit a runtime exception.
+ */
+ private List populateList() {
+ final List results = new ArrayList<>();
+ try {
+ while (stillPolling() && results.size() < maxPollRecords) {
+ if (!tryAdd(results, sourceRecordIterator)) {
+ if (!results.isEmpty()) {
+ logger.debug("tryAdd() did not add to the list, returning current results.");
+ // if we could not get a record and the results are not empty return them
+ break;
+ }
+ logger.debug("Attempting {}", backoff);
+ backoff.cleanDelay();
+ }
+ }
+
+ } catch (RuntimeException e) { // NOPMD must catch runtime here.
+ logger.error("Error during poll(): {}", e.getMessage(), e);
+ if (config.getErrorsTolerance() == ErrorsTolerance.NONE) {
+ logger.error("Stopping Task");
+ throw e;
+ }
+ }
+ return results.isEmpty() ? NULL_RESULT : results;
+ }
+
+ @Override
+ public final void stop() {
+ logger.debug("Stopping");
+ connectorStopped.set(true);
+ }
+
+ /**
+ * Returns the running state of the task.
+ *
+ * @return {@code true} if the connector is running, {@code false} otherwise.
+ */
+ public final boolean isRunning() {
+ return !connectorStopped.get();
+ }
+
+ /**
+ * Close any resources the source has open. Called by the IteratorRunnable when it is stopping.
+ */
+ abstract protected void closeResources();
+
+ /**
+ * Calculates elapsed time and flags when expired.
+ */
+ protected static class Timer extends StopWatch {
+ /**
+ * The length of time that the timer should run.
+ */
+ private final long duration;
+
+ /**
+ * The flag that indicates the timer has been aborted.
+ */
+ private boolean hasAborted;
+
+ /**
+ * Constructor.
+ *
+ * @param duration
+ * the length of time the timer should run.
+ */
+ Timer(final Duration duration) {
+ super();
+ this.duration = duration.toMillis();
+ }
+
+ /**
+ * Gets the maximum duration for this timer.
+ *
+ * @return the maximum duration for the timer.
+ */
+ public long millisecondsRemaining() {
+ return super.isStarted() ? duration - super.getTime() : duration;
+ }
+
+ /**
+ * Returns {@code true} if the timer has expired.
+ *
+ * @return {@code true} if the timer has expired.
+ */
+ public boolean isExpired() {
+ return hasAborted || super.getTime() >= duration;
+ }
+
+ /**
+ * Aborts the timer. Timer will report that it has expired until reset is called.
+ */
+ public void abort() {
+ hasAborted = true;
+ }
+
+ @Override
+ public void start() {
+ try {
+ hasAborted = false;
+ super.start();
+ } catch (IllegalStateException e) {
+ throw new IllegalStateException("Timer: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ super.stop();
+ } catch (IllegalStateException e) {
+ throw new IllegalStateException("Timer: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void reset() {
+ try {
+ hasAborted = false;
+ super.reset();
+ } catch (IllegalStateException e) {
+ throw new IllegalStateException("Timer: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Gets a Backoff Config for this timer.
+ *
+ * @return a backoff Configuration.
+ */
+ public BackoffConfig getBackoffConfig() {
+ return new BackoffConfig() {
+
+ @Override
+ public SupplierOfLong getSupplierOfTimeRemaining() {
+ return Timer.this::millisecondsRemaining;
+ }
+
+ @Override
+ public AbortTrigger getAbortTrigger() {
+ return Timer.this::abort;
+ }
+ };
+ }
+ }
+
+ /**
+ * Performs a delay based on the number of successive {@link #delay()} or {@link #cleanDelay()} calls without a
+ * {@link #reset()}. Delay increases exponentially but never exceeds the time remaining by more than 0.512 seconds.
+ */
+ public static class Backoff {
+ /** The logger to write to */
+ private static final Logger LOGGER = LoggerFactory.getLogger(Backoff.class);
+ /**
+ * The maximum jitter random number. Should be a power of 2 for speed.
+ */
+ public static final int MAX_JITTER = 1024;
+
+ public static final int JITTER_SUBTRAHEND = MAX_JITTER / 2;
+ /**
+ * A supplier of the time remaining (in milliseconds) on the overriding timer.
+ */
+ private final SupplierOfLong timeRemaining;
+
+ /**
+ * A function to call to abort the timer.
+ */
+ private final AbortTrigger abortTrigger;
+
+ /**
+ * The maximum number of times {@link #delay()} will be called before maxWait is reached.
+ */
+ private int maxCount;
+ /**
+ * The number of times {@link #delay()} has been called.
+ */
+ private int waitCount;
+
+ /**
+ * A random number generator to construct jitter.
+ */
+ Random random = new Random();
+
+ /**
+ * Constructor.
+ *
+ * @param config
+ * The configuration for the backoff.
+ */
+ public Backoff(final BackoffConfig config) {
+ this.timeRemaining = config.getSupplierOfTimeRemaining();
+ this.abortTrigger = config.getAbortTrigger();
+ reset();
+ }
+
+ /**
+ * Reset the backoff time so that delay is again at the minimum.
+ */
+ public final void reset() {
+ // if the reminaing time is 0 or negative the maxCount will be infinity
+ // so make sure that it is 0 in that case.
+ final long remainingTime = timeRemaining.get();
+ maxCount = remainingTime < 1L ? 0 : (int) (Math.log10(remainingTime) / Math.log10(2));
+ waitCount = 0;
+ LOGGER.debug("Reset {}", this);
+ }
+
+ /**
+ * Handle adjustment when maxCount could not be set.
+ *
+ * @return the corrected maxCount
+ */
+ private int getMaxCount() {
+ if (maxCount == 0) {
+ reset();
+ }
+ return maxCount;
+ }
+
+ /**
+ * Calculates the delay wihtout jitter.
+ *
+ * @return the number of milliseconds the delay will be.
+ */
+ public long estimatedDelay() {
+ long sleepTime = timeRemaining.get();
+ if (sleepTime > 0 && waitCount < maxCount) {
+ sleepTime = (long) Math.min(sleepTime, Math.pow(2, waitCount + 1));
+ }
+ return sleepTime < 0 ? 0 : sleepTime;
+ }
+
+ /**
+ * Calculates the range of jitter in milliseconds.
+ *
+ * @return the maximum jitter in milliseconds. jitter is +/- maximum jitter.
+ */
+ public int getMaxJitter() {
+ return MAX_JITTER - JITTER_SUBTRAHEND;
+ }
+
+ private long timeWithJitter() {
+ // generate approx +/- 0.512 seconds of jitter
+ final int jitter = random.nextInt(MAX_JITTER) - JITTER_SUBTRAHEND;
+ return (long) Math.pow(2, waitCount) + jitter;
+ }
+
+ /**
+ * Delay execution based on the number of times this method has been called.
+ *
+ * @throws InterruptedException
+ * If any thread interrupts this thread.
+ */
+ public void delay() throws InterruptedException {
+ final long sleepTime = timeRemaining.get();
+ if (sleepTime > 0 && waitCount < (maxCount == 0 ? getMaxCount() : maxCount)) {
+ waitCount++;
+ final long nextSleep = timeWithJitter();
+ // don't sleep negative time. Jitter can introduce negative tme.
+ if (nextSleep > 0) {
+ if (nextSleep >= sleepTime) {
+ LOGGER.debug("Backoff aborting timer");
+ abortTrigger.apply();
+ } else {
+ LOGGER.debug("Backoff sleepiing {}", nextSleep);
+ Thread.sleep(nextSleep);
+ }
+ }
+ }
+ }
+
+ /**
+ * Like {@link #delay} but swallows the {@link InterruptedException}.
+ */
+ public void cleanDelay() {
+ try {
+ delay();
+ } catch (InterruptedException exception) {
+ // do nothing return results below
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Backoff %s/%s, %s milliseconds remaining.", waitCount, maxCount, timeRemaining.get());
+ }
+ }
+
+ /**
+ * A functional interface to return long values.
+ */
+ @FunctionalInterface
+ public interface SupplierOfLong {
+ long get();
+ }
+
+ /**
+ * A functional interface that will abort the timer. After being called timer will indicate that it is expired,
+ * until it is reset.
+ */
+ @FunctionalInterface
+ public interface AbortTrigger {
+ void apply();
+ }
+
+ /**
+ * An interface to define the Backoff configuration. Used for convenience with Timer.
+ */
+ public interface BackoffConfig {
+ SupplierOfLong getSupplierOfTimeRemaining();
+ AbortTrigger getAbortTrigger();
+ }
+}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java
index de770cbc..760d074d 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java
@@ -37,7 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AvroTransformer extends Transformer {
+public class AvroTransformer extends Transformer {
private final AvroData avroData;
@@ -54,9 +54,9 @@ public void configureValueConverter(final Map config, final Abst
}
@Override
- public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier,
- final String topic, final int topicPartition, final AbstractConfig sourceConfig) {
- return new StreamSpliterator<>(LOGGER, inputStreamIOSupplier) {
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
private DataFileStream dataFileStream;
private final DatumReader datumReader = new GenericDatumReader<>();
@@ -78,9 +78,10 @@ public void doClose() {
}
@Override
- protected boolean doAdvance(final Consumer super GenericRecord> action) {
+ protected boolean doAdvance(final Consumer super SchemaAndValue> action) {
if (dataFileStream.hasNext()) {
- action.accept(dataFileStream.next());
+ final GenericRecord record = dataFileStream.next();
+ action.accept(avroData.toConnectData(record.getSchema(), record));
return true;
}
return false;
@@ -88,12 +89,6 @@ protected boolean doAdvance(final Consumer super GenericRecord> action) {
};
}
- @Override
- public SchemaAndValue getValueData(final GenericRecord record, final String topic,
- final AbstractConfig sourceConfig) {
- return avroData.toConnectData(record.getSchema(), record);
- }
-
@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
index f571062d..232aaef2 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java
@@ -31,7 +31,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ByteArrayTransformer extends Transformer {
+public class ByteArrayTransformer extends Transformer {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class);
private static final int MAX_BUFFER_SIZE = 4096;
@@ -42,9 +42,9 @@ public void configureValueConverter(final Map config, final Abst
}
@Override
- public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier,
- final String topic, final int topicPartition, final AbstractConfig sourceConfig) {
- return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected InputStream inputOpened(final InputStream input) {
return input;
@@ -56,7 +56,7 @@ protected void doClose() {
}
@Override
- protected boolean doAdvance(final Consumer super byte[]> action) {
+ protected boolean doAdvance(final Consumer super SchemaAndValue> action) {
final byte[] buffer = new byte[MAX_BUFFER_SIZE];
try {
final int bytesRead = IOUtils.read(inputStream, buffer);
@@ -64,9 +64,9 @@ protected boolean doAdvance(final Consumer super byte[]> action) {
return false;
}
if (bytesRead < MAX_BUFFER_SIZE) {
- action.accept(Arrays.copyOf(buffer, bytesRead));
+ action.accept(new SchemaAndValue(null, Arrays.copyOf(buffer, bytesRead)));
} else {
- action.accept(buffer);
+ action.accept(new SchemaAndValue(null, buffer));
}
return true;
} catch (IOException e) {
@@ -77,11 +77,6 @@ protected boolean doAdvance(final Consumer super byte[]> action) {
};
}
- @Override
- public SchemaAndValue getValueData(final byte[] record, final String topic, final AbstractConfig sourceConfig) {
- return new SchemaAndValue(null, record);
- }
-
@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
index 4ff0f1a2..c6aea0e8 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java
@@ -34,7 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class JsonTransformer extends Transformer {
+public class JsonTransformer extends Transformer {
private final JsonConverter jsonConverter;
@@ -52,9 +52,9 @@ public void configureValueConverter(final Map config, final Abst
}
@Override
- public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier,
- final String topic, final int topicPartition, final AbstractConfig sourceConfig) {
- final StreamSpliterator spliterator = new StreamSpliterator<>(LOGGER, inputStreamIOSupplier) {
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
BufferedReader reader;
@Override
@@ -75,7 +75,7 @@ public void doClose() {
}
@Override
- public boolean doAdvance(final Consumer super byte[]> action) {
+ public boolean doAdvance(final Consumer super SchemaAndValue> action) {
String line = null;
try {
// remove blank and empty lines.
@@ -87,7 +87,7 @@ public boolean doAdvance(final Consumer super byte[]> action) {
}
}
line = line.trim();
- action.accept(line.getBytes(StandardCharsets.UTF_8));
+ action.accept(jsonConverter.toConnectData(topic, line.getBytes(StandardCharsets.UTF_8)));
return true;
} catch (IOException e) {
LOGGER.error("Error reading input stream: {}", e.getMessage(), e);
@@ -95,13 +95,6 @@ public boolean doAdvance(final Consumer super byte[]> action) {
}
}
};
-
- return spliterator;
- }
-
- @Override
- public SchemaAndValue getValueData(final byte[] record, final String topic, final AbstractConfig sourceConfig) {
- return jsonConverter.toConnectData(topic, record);
}
@Override
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java
index 7da61c41..2c47d510 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java
@@ -43,7 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ParquetTransformer extends Transformer {
+public class ParquetTransformer extends Transformer {
private final AvroData avroData;
@@ -59,12 +59,6 @@ public void configureValueConverter(final Map config, final Abst
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}
- @Override
- public SchemaAndValue getValueData(final GenericRecord record, final String topic,
- final AbstractConfig sourceConfig) {
- return avroData.toConnectData(record.getSchema(), record);
- }
-
@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
@@ -72,10 +66,10 @@ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topi
}
@Override
- public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier,
- final String topic, final int topicPartition, final AbstractConfig sourceConfig) {
+ public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic,
+ final int topicPartition, final AbstractConfig sourceConfig) {
- final StreamSpliterator spliterator = new StreamSpliterator<>(LOGGER, inputStreamIOSupplier) {
+ return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
private ParquetReader reader;
private File parquetFile;
@@ -114,11 +108,11 @@ protected void doClose() {
}
@Override
- protected boolean doAdvance(final Consumer super GenericRecord> action) {
+ protected boolean doAdvance(final Consumer super SchemaAndValue> action) {
try {
final GenericRecord record = reader.read();
if (record != null) {
- action.accept(record); // Pass record to the stream
+ action.accept(avroData.toConnectData(record.getSchema(), record)); // Pass record to the stream
return true;
}
} catch (IOException e) {
@@ -127,7 +121,6 @@ protected boolean doAdvance(final Consumer super GenericRecord> action) {
return false;
}
};
- return spliterator;
}
static void deleteTmpFile(final Path parquetFile) {
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java
index 196d9ae3..09e8c0ca 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java
@@ -30,14 +30,14 @@
import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
-public abstract class Transformer {
+public abstract class Transformer {
public abstract void configureValueConverter(Map config, AbstractConfig sourceConfig);
- public final Stream getRecords(final IOSupplier inputStreamIOSupplier, final String topic,
- final int topicPartition, final AbstractConfig sourceConfig, final long skipRecords) {
+ public final Stream getRecords(final IOSupplier inputStreamIOSupplier,
+ final String topic, final int topicPartition, final AbstractConfig sourceConfig, final long skipRecords) {
- final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition,
+ final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition,
sourceConfig);
return StreamSupport.stream(spliterator, false).onClose(spliterator::close).skip(skipRecords);
}
@@ -55,20 +55,15 @@ public final Stream getRecords(final IOSupplier inputStreamIOSup
* the source configuraiton.
* @return a StreamSpliterator instance.
*/
- protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier,
- String topic, int topicPartition, AbstractConfig sourceConfig);
-
- public abstract SchemaAndValue getValueData(T record, String topic, AbstractConfig sourceConfig);
+ protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, String topic,
+ int topicPartition, AbstractConfig sourceConfig);
public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, AbstractConfig sourceConfig);
/**
* A Spliterator that performs various checks on the opening/closing of the input stream.
- *
- * @param
- * the type of item created by this Spliterator.
*/
- protected abstract static class StreamSpliterator implements Spliterator {
+ protected abstract static class StreamSpliterator implements Spliterator {
/**
* The input stream supplier.
*/
@@ -109,7 +104,7 @@ protected StreamSpliterator(final Logger logger, final IOSupplier i
* the Consumer to call if record is created.
* @return {@code true} if a record was processed, {@code false} otherwise.
*/
- abstract protected boolean doAdvance(Consumer super T> action);
+ abstract protected boolean doAdvance(Consumer super SchemaAndValue> action);
/**
* Method to close additional inputs if needed.
@@ -121,6 +116,7 @@ public final void close() {
try {
if (inputStream != null) {
inputStream.close();
+ inputStream = null; // NOPMD setting null to release resources
closed = true;
}
} catch (IOException e) {
@@ -143,15 +139,16 @@ public final void close() {
abstract protected InputStream inputOpened(InputStream input) throws IOException;
@Override
- public final boolean tryAdvance(final Consumer super T> action) {
- boolean result = false;
+ public final boolean tryAdvance(final Consumer super SchemaAndValue> action) {
if (closed) {
- logger.error("Attempt to advance after closed");
+ return false;
}
+ boolean result = false;
try {
if (inputStream == null) {
try {
- inputStream = inputOpened(inputStreamIOSupplier.get());
+ inputStream = inputStreamIOSupplier.get();
+ inputOpened(inputStream);
} catch (IOException e) {
logger.error("Error trying to open inputStream: {}", e.getMessage(), e);
close();
@@ -169,7 +166,7 @@ public final boolean tryAdvance(final Consumer super T> action) {
}
@Override
- public final Spliterator trySplit() { // NOPMD returning null is reqruied by API
+ public final Spliterator trySplit() { // NOPMD returning null is reqruied by API
return null;
}
diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java
index 43a1b0ef..57460430 100644
--- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java
+++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java
@@ -16,48 +16,46 @@
package io.aiven.kafka.connect.common.source.input;
-import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY;
import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE;
-import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.json.JsonConverter;
-import io.aiven.kafka.connect.common.config.SchemaRegistryFragment;
-import io.aiven.kafka.connect.common.config.SourceCommonConfig;
-
import io.confluent.connect.avro.AvroData;
+/**
+ * A factory to create Transformers.
+ */
public final class TransformerFactory {
-
+ /** The cache size for systems that read Avro data */
public static final int CACHE_SIZE = 100;
private TransformerFactory() {
// hidden
}
- public static Transformer getTransformer(final SourceCommonConfig sourceConfig) {
- final InputFormat inputFormatEnum = new SchemaRegistryFragment(sourceConfig).getInputFormat();
- switch (inputFormatEnum) {
+
+ /**
+ * Gets a configured Transformer.
+ *
+ * @param inputFormat
+ * The input format for the transformer.
+ * @return the Transformer for the specified input format.
+ */
+ public static Transformer getTransformer(final InputFormat inputFormat) {
+ switch (inputFormat) {
case AVRO :
return new AvroTransformer(new AvroData(CACHE_SIZE));
case PARQUET :
return new ParquetTransformer(new AvroData(CACHE_SIZE));
case JSONL :
final JsonConverter jsonConverter = new JsonConverter();
- configureJsonConverter(jsonConverter);
+ jsonConverter.configure(Map.of(SCHEMAS_ENABLE, "false"), false);
return new JsonTransformer(jsonConverter);
case BYTES :
return new ByteArrayTransformer();
default :
- throw new IllegalArgumentException(
- "Unknown input format in configuration: " + sourceConfig.getString(INPUT_FORMAT_KEY));
+ throw new IllegalArgumentException("Unknown input format in configuration: " + inputFormat);
}
}
-
- private static void configureJsonConverter(final JsonConverter jsonConverter) {
- final Map config = new HashMap<>();
- config.put(SCHEMAS_ENABLE, "false");
- jsonConverter.configure(config, false);
- }
}
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java
new file mode 100644
index 00000000..9b3a581e
--- /dev/null
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/AbstractSourceTaskTest.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2024 Aiven Oy
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.aiven.kafka.connect.common.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.awaitility.Awaitility.await;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.time.StopWatch;
+import org.junit.jupiter.api.Test;
+
+class AbstractSourceTaskTest {
+
+ @Test
+ void timerTest() {
+ final AbstractSourceTask.Timer timer = new AbstractSourceTask.Timer(Duration.ofSeconds(1));
+ assertThat(timer.millisecondsRemaining()).isEqualTo(Duration.ofSeconds(1).toMillis());
+ timer.start();
+ await().atMost(Duration.ofSeconds(2)).until(timer::isExpired);
+ assertThat(timer.millisecondsRemaining()).isLessThan(0);
+ timer.stop();
+ assertThat(timer.millisecondsRemaining()).isEqualTo(Duration.ofSeconds(1).toMillis());
+ }
+
+ @Test
+ void timerSequenceTest() {
+ final AbstractSourceTask.Timer timer = new AbstractSourceTask.Timer(Duration.ofSeconds(1));
+ // stopped state does not allow stop
+ assertThatExceptionOfType(IllegalStateException.class).as("stop while not running")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ timer.reset(); // verify that an exception is not thrown.
+
+ // started state does not allow start
+ timer.start();
+ assertThatExceptionOfType(IllegalStateException.class).as("start while running")
+ .isThrownBy(timer::start)
+ .withMessageStartingWith("Timer: ");
+ timer.reset();
+ timer.start(); // restart the timer.
+ timer.stop();
+
+ // stopped state does not allow stop or start
+ assertThatExceptionOfType(IllegalStateException.class).as("stop after stop")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ assertThatExceptionOfType(IllegalStateException.class).as("start after stop")
+ .isThrownBy(timer::start)
+ .withMessageStartingWith("Timer: ");
+ timer.reset();
+
+ // stopped + reset does not allow stop.
+ assertThatExceptionOfType(IllegalStateException.class).as("stop after reset (1)")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ timer.start();
+ timer.reset();
+
+ // started + reset does not allow stop;
+ assertThatExceptionOfType(IllegalStateException.class).as("stop after reset (2)")
+ .isThrownBy(timer::stop)
+ .withMessageStartingWith("Timer: ");
+ }
+
+ @Test
+ void backoffTest() throws InterruptedException {
+ final AbstractSourceTask.Timer timer = new AbstractSourceTask.Timer(Duration.ofSeconds(1));
+ final AbstractSourceTask.Backoff backoff = new AbstractSourceTask.Backoff(timer.getBackoffConfig());
+ final long estimatedDelay = backoff.estimatedDelay();
+ assertThat(estimatedDelay).isLessThan(500);
+
+ // execute delay without timer running.
+ final StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ backoff.delay();
+ stopWatch.stop();
+ assertThat(stopWatch.getTime()).as("Result without timer running")
+ .isBetween(estimatedDelay - backoff.getMaxJitter(), estimatedDelay + backoff.getMaxJitter());
+
+ timer.start();
+ for (int i = 0; i < 9; i++) {
+ stopWatch.reset();
+ timer.reset();
+ timer.start();
+ stopWatch.start();
+ await().atMost(Duration.ofSeconds(2)).until(() -> {
+ backoff.delay();
+ return backoff.estimatedDelay() == 0 || timer.isExpired();
+ });
+ stopWatch.stop();
+ timer.stop();
+ final int step = i;
+ if (!timer.isExpired()) {
+ assertThat(stopWatch.getTime()).as(() -> String.format("Result with timer running at step %s", step))
+ .isBetween(Duration.ofSeconds(1).toMillis() - backoff.getMaxJitter(),
+ Duration.ofSeconds(1).toMillis() + backoff.getMaxJitter());
+ }
+ }
+ }
+
+ @Test
+ void backoffIncrementalTimeTest() throws InterruptedException {
+ final AtomicBoolean abortTrigger = new AtomicBoolean();
+ // delay increases in powers of 2.
+ final long maxDelay = 1000; // not a power of 2
+ final AbstractSourceTask.BackoffConfig config = new AbstractSourceTask.BackoffConfig() {
+ @Override
+ public AbstractSourceTask.SupplierOfLong getSupplierOfTimeRemaining() {
+ return () -> maxDelay;
+ }
+
+ @Override
+ public AbstractSourceTask.AbortTrigger getAbortTrigger() {
+ return () -> abortTrigger.set(true);
+ }
+ };
+
+ final AbstractSourceTask.Backoff backoff = new AbstractSourceTask.Backoff(config);
+ long expected = 2;
+ while (backoff.estimatedDelay() < maxDelay) {
+ assertThat(backoff.estimatedDelay()).isEqualTo(expected);
+ backoff.delay();
+ expected *= 2;
+ }
+ assertThat(backoff.estimatedDelay()).isEqualTo(maxDelay);
+ assertThat(abortTrigger).isFalse();
+ }
+}
diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java
index 50e54a28..617dd290 100644
--- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java
+++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java
@@ -32,6 +32,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+
import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.confluent.connect.avro.AvroData;
@@ -75,7 +78,7 @@ void testConfigureValueConverter() {
void testReadAvroRecordsInvalidData() {
final InputStream inputStream = new ByteArrayInputStream("mock-avro-data".getBytes(StandardCharsets.UTF_8));
- final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig,
+ final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig,
0);
final List