Skip to content

Commit

Permalink
fixed test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
¨Claude committed Dec 27, 2024
1 parent 22fa3a0 commit b6f2d08
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class AbstractSourceTask extends SourceTask {
* The maximum time to spend polling. This is set to 5 seconds as that is the time that is allotted to a system for
* shutdown.
*/
protected static final Duration MAX_POLL_TIME = Duration.ofSeconds(5);
public static final Duration MAX_POLL_TIME = Duration.ofSeconds(5);
/**
* The boolean that indicates the connector is stopped.
*/
Expand Down Expand Up @@ -97,8 +97,8 @@ protected AbstractSourceTask(final Logger logger) {
super();
this.logger = logger;
connectorStopped = new AtomicBoolean();
backoff = new Backoff(MAX_POLL_TIME);
timer = new Timer(MAX_POLL_TIME);
backoff = new Backoff(timer::millisecondsRemaining);
}

/**
Expand All @@ -111,9 +111,11 @@ protected AbstractSourceTask(final Logger logger) {
* this iterator executes may cause the task to abort.
* </p>
*
* @param timer
* a SupplierOfLong that provides the amount of time remaining before the polling expires.
* @return The iterator of SourceRecords.
*/
abstract protected Iterator<SourceRecord> getIterator();
abstract protected Iterator<SourceRecord> getIterator(SupplierOfLong timer);

/**
* Called by {@link #start} to allows the concrete implementation to configure itself based on properties.
Expand All @@ -128,7 +130,7 @@ public final void start(final Map<String, String> props) {
logger.debug("Starting");
config = configure(props);
maxPollRecords = config.getMaxPollRecords();
sourceRecordIterator = getIterator();
sourceRecordIterator = getIterator(timer::millisecondsRemaining);
}

/**
Expand Down Expand Up @@ -184,13 +186,16 @@ private List<SourceRecord> populateList() {
final List<SourceRecord> results = new ArrayList<>();
try {
while (stillPolling() && results.size() < maxPollRecords) {
// if we could not get a record and the results are not empty return them
if (!tryAdd(results, sourceRecordIterator) && !results.isEmpty()) {
break;
if (!tryAdd(results, sourceRecordIterator)) {
if (!results.isEmpty()) {
// if we could not get a record and the results are not empty return them
break;
}
// attempt a backoff
backoff.cleanDelay();
}
// attempt a backoff
backoff.cleanDelay();
}

} catch (RuntimeException e) { // NOPMD must catch runtime here.
logger.error("Error during poll(): {}", e.getMessage(), e);
if (config.getErrorsTolerance() == ErrorsTolerance.NONE) {
Expand Down Expand Up @@ -241,6 +246,15 @@ private Timer(final Duration duration) {
this.duration = duration.toMillis();
}

/**
* Gets the maximum duration for this timer.
*
* @return the maximum duration for the timer.
*/
public long millisecondsRemaining() {
return super.isStarted() ? super.getTime() - duration : duration;
}

/**
* Returns {@code true} if the timer has expired.
*
Expand All @@ -252,18 +266,19 @@ public boolean expired() {
}

/**
* Calculates the amount of time to sleep during a backoff performs the sleep. Backoff calculation uses an
* expenantially increasing delay until the maxDelay is reached. Then all delays are maxDelay length.
* Performs a delay based on the number of successive {@link #delay()} or {@link #cleanDelay()} calls without a
* {@link #reset()}. Delay increases exponentially but never exceeds the time remaining by more than 0.512 seconds.
*/
protected static class Backoff {
/**
* The maximum wait time.
* A supplier of the time remaining (in milliseconds) on the overriding timer.
*/
private final long maxWait;
private final SupplierOfLong timeRemaining;

/**
* The maximum number of times {@link #delay()} will be called before maxWait is reached.
*/
private final int maxCount;
private int maxCount;
/**
* The number of times {@link #delay()} has been called.
*/
Expand All @@ -277,40 +292,44 @@ protected static class Backoff {
/**
* Constructor.
*
* @param maxDelay
* The maximum delay that this instance will use.
* @param timeRemaining
* A supplier of long as milliseconds remaining before time expires.
*/
public Backoff(final Duration maxDelay) {
// calculate the approx wait time.
maxWait = maxDelay.toMillis();
maxCount = (int) (Math.log10(maxWait) / Math.log10(2));
waitCount = 0;
public Backoff(final SupplierOfLong timeRemaining) {
this.timeRemaining = timeRemaining;
reset();
}

/**
* Reset the backoff time so that delay is again at the minimum.
*/
public void reset() {
public final void reset() {
// calculate the approx wait count.
maxCount = (int) (Math.log10(timeRemaining.get()) / Math.log10(2));
waitCount = 0;
}

private long timeWithJitter() {
// generate approx +/- 0.512 seconds of jitter
final int jitter = random.nextInt(1024) - 512;
return (long) Math.pow(2, waitCount) + jitter;
}
/**
* Delay execution based on the number of times this method has been called.
*
* @throws InterruptedException
* If any thread interrupts this thread.
*/
public void delay() throws InterruptedException {
// power of 2 next int is faster and so we generate approx +/- 0.512 seconds of jitter
final int jitter = random.nextInt(1024) - 512;

long sleepTime = timeRemaining.get();
if (waitCount < maxCount) {
waitCount++;
final long sleep = (long) Math.pow(2, waitCount) + jitter;
// don't allow jitter to set sleep argument negative.
Thread.sleep(Math.max(0, sleep));
} else {
Thread.sleep(maxWait + jitter);
sleepTime = Math.min(sleepTime, timeWithJitter());
}
// don't sleep negative time.
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
}

Expand All @@ -325,4 +344,12 @@ public void cleanDelay() {
}
}
}

/**
* A functional interface to return long values.
*/
@FunctionalInterface
public interface SupplierOfLong {
long get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@

package io.aiven.kafka.connect.s3.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import org.apache.kafka.connect.source.SourceRecord;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.AbstractSourceTask;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig;
import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient;
import io.aiven.kafka.connect.s3.source.utils.OffsetManager;
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor;
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord;
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator;
import io.aiven.kafka.connect.s3.source.utils.Version;
Expand All @@ -52,21 +55,19 @@ public class S3SourceTask extends AbstractSourceTask {
public static final String OBJECT_KEY = "object_key";
public static final String PARTITION = "topicPartition";


/** An iterator or S3SourceRecords */
private Iterator<S3SourceRecord> s3SourceRecordIterator;
/**
* The transformer that we are using TODO move this to AbstractSourceTask
*/
private Transformer transformer;
/** The task initialized flag */
private boolean taskInitialized;
private Transformer<?> transformer;
/** The AWS Source client */
private AWSV2SourceClient awsv2SourceClient;
/** The list of failed object keys */
private final Set<String> failedObjectKeys = new HashSet<>();
/** The offset manager this task uses */
private OffsetManager offsetManager;
private S3SourceConfig s3SourceConfig;

public S3SourceTask() {
super(LOGGER);
Expand All @@ -78,10 +79,10 @@ public String version() {
}

@Override
protected Iterator<SourceRecord> getIterator() { // NOPMD cognatavie complexity
return new Iterator<SourceRecord>() {
protected Iterator<SourceRecord> getIterator(SupplierOfLong timer) { // NOPMD cognatavie complexity
return new Iterator<>() {
/** The backoff for Amazon retryable exceptions */
final Backoff backoff = new Backoff(MAX_POLL_TIME);
final Backoff backoff = new Backoff(timer);
@Override
public boolean hasNext() {
try {
Expand Down Expand Up @@ -116,35 +117,40 @@ public boolean hasNext() {
@Override
public SourceRecord next() {
final S3SourceRecord s3SourceRecord = s3SourceRecordIterator.next();
offsetManager.incrementAndUpdateOffsetMap(s3SourceRecord.getPartitionMap(), s3SourceRecord.getObjectKey(), 1L);
return s3SourceRecord.getSourceRecord(s3SourceRecord.getTopic());
offsetManager.incrementAndUpdateOffsetMap(s3SourceRecord.getPartitionMap(),
s3SourceRecord.getObjectKey(), 1L);
final List<SourceRecord> result = RecordProcessor.processRecords(
Collections.singletonList(s3SourceRecord).iterator(), new ArrayList<>(), s3SourceConfig,
S3SourceTask.this::stillPolling, awsv2SourceClient, offsetManager);
return result.get(0);
}
};
}

@Override
protected SourceCommonConfig configure(final Map<String, String> props) {
LOGGER.info("S3 Source task started.");
final S3SourceConfig s3SourceConfig = new S3SourceConfig(props);
this.transformer = TransformerFactory.getTransformer(s3SourceConfig);
this.s3SourceConfig = new S3SourceConfig(props);
this.transformer = s3SourceConfig.getTransformer();
offsetManager = new OffsetManager(context, s3SourceConfig);
awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys);
setS3SourceRecordIterator(
new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient));
this.taskInitialized = true;
return s3SourceConfig;
}

@Override
public void commit() throws InterruptedException {
public void commit() {
LOGGER.info("Committed all records through last poll()");
}

@Override
public void commitRecord(final SourceRecord record) throws InterruptedException {
public void commitRecord(final SourceRecord record) {
if (LOGGER.isInfoEnabled()) {
final Map<String, Object> map = (Map<String, Object>) record.sourceOffset();
LOGGER.info("Committed individual record {} {} {} committed", map.get(BUCKET), map.get(OBJECT_KEY), offsetManager.recordsProcessedForObjectKey((Map)record.sourcePartition(), map.get(OBJECT_KEY).toString()));
LOGGER.info("Committed individual record {} {} {} committed", map.get(BUCKET), map.get(OBJECT_KEY),
offsetManager.recordsProcessedForObjectKey((Map<String, Object>) record.sourcePartition(),
map.get(OBJECT_KEY).toString()));
}
}

Expand All @@ -170,16 +176,7 @@ protected void closeResources() {
*
* @return the transformer that we are using.
*/
public Transformer getTransformer() {
public Transformer<?> getTransformer() {
return transformer;
}

/**
* Get the initialized flag.
*
* @return {@code true} if the task has been initialized, {@code false} otherwise.
*/
public boolean isTaskInitialized() {
return taskInitialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final public class ConnectUtils {
private ConnectUtils() {
// hidden
}
static Map<String, Object> getPartitionMap(final String topicName, final Integer defaultPartitionId,
public static Map<String, Object> getPartitionMap(final String topicName, final Integer defaultPartitionId,
final String bucketName) {
final Map<String, Object> partitionMap = new HashMap<>();
partitionMap.put(BUCKET, bucketName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
Expand All @@ -39,12 +39,12 @@ private RecordProcessor() {
}

public static List<SourceRecord> processRecords(final Iterator<S3SourceRecord> sourceRecordIterator,
final List<SourceRecord> results, final S3SourceConfig s3SourceConfig, final AtomicBoolean connectorStopped,
final List<SourceRecord> results, final S3SourceConfig s3SourceConfig, final Supplier<Boolean> stillPolling,
final AWSV2SourceClient sourceClient, final OffsetManager offsetManager) {

final int maxPollRecords = s3SourceConfig.getMaxPollRecords();

for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && !connectorStopped.get(); i++) {
for (int i = 0; sourceRecordIterator.hasNext() && i < maxPollRecords && stillPolling.get(); i++) {
final S3SourceRecord s3SourceRecord = sourceRecordIterator.next();
if (s3SourceRecord != null) {
final SourceRecord sourceRecord = createSourceRecord(s3SourceRecord, s3SourceConfig, sourceClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public String getObjectKey() {
return objectKey;
}

public SchemaAndValue getKey() {
return new SchemaAndValue(keyData.schema(), keyData.value());
}

public SchemaAndValue getValue() {
return new SchemaAndValue(valueData.schema(), valueData.value());
}

public void setOffsetMap(final Map<String, Object> offsetMap) {
this.offsetMap = new HashMap<>(offsetMap);
}
Expand Down
Loading

0 comments on commit b6f2d08

Please sign in to comment.