Skip to content

Commit

Permalink
Polling efficiency (#378)
Browse files Browse the repository at this point in the history
Fixes for KCON-26 - Backoff when no data available.
Fixes for KCON-28 - Improve poll method

Creates an AbstractSourceTask in commons to handle response to poll and
backoff calculations as well as start, stop. Implementations need to
implement an Iterator that poll will call to retrieve data.

Private classes Timer and Backoff are created in AbstractSourceTask and
may be moved out at a later date if needed elsewhere.

Changes made to configurations to support configuration extraction in
AbstractSourceTask.

Modifications to S3SourceTask to operate under AbstractSourceTask.

Additional tests added

---------

Co-authored-by: ¨Claude <¨[email protected]¨>
Co-authored-by: Jarkko Jaakola <[email protected]>
Co-authored-by: Murali Basani <[email protected]>
  • Loading branch information
4 people authored Jan 9, 2025
1 parent f6d3087 commit c4604f4
Show file tree
Hide file tree
Showing 31 changed files with 1,932 additions and 752 deletions.
1 change: 1 addition & 0 deletions commons/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ dependencies {
testImplementation(jackson.databind)
testImplementation(testinglibs.mockito.core)
testImplementation(testinglibs.assertj.core)
testImplementation(testinglibs.awaitility)
testImplementation(testFixtures(project(":commons")))
testImplementation(testinglibs.woodstox.stax2.api)
testImplementation(apache.hadoop.mapreduce.client.core)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;

public class SourceCommonConfig extends CommonConfig {

Expand Down Expand Up @@ -64,11 +66,15 @@ public String getTargetTopicPartitions() {
}

public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(sourceConfigFragment.getErrorsTolerance());
return sourceConfigFragment.getErrorsTolerance();
}

public int getMaxPollRecords() {
return sourceConfigFragment.getMaxPollRecords();
}

public Transformer getTransformer() {
return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public int getExpectedMaxMessageBytes() {
return cfg.getInt(EXPECTED_MAX_MESSAGE_BYTES);
}

public String getErrorsTolerance() {
return cfg.getString(ERRORS_TOLERANCE);
public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
}

private static class ErrorsToleranceValidator implements ConfigDef.Validator {
Expand Down
Loading

0 comments on commit c4604f4

Please sign in to comment.