Skip to content

Commit

Permalink
Tasks assignment strategy - commons integration - [KCON-63] (#384)
Browse files Browse the repository at this point in the history
[KCON-63]

- Integrate Task assignment strategies of common module into s3 release
feature branch
- Delete hard coding of file pattern from s3 iterator class
- Update existing tests
- Added new integration tests to verify other strategy use cases
  • Loading branch information
muralibasani authored Jan 14, 2025
1 parent c4604f4 commit 6b967d3
Show file tree
Hide file tree
Showing 27 changed files with 951 additions and 901 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ public final class FileNameFragment extends ConfigFragment {
static final String FILE_MAX_RECORDS = "file.max.records";
static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source";
static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template";
static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}";

public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template";
static final String DEFAULT_FILE_PATH_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/";

public FileNameFragment(final AbstractConfig cfg) {
super(cfg);
}
Expand Down Expand Up @@ -109,9 +112,18 @@ public void ensureValid(final String name, final Object value) {
configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(),
new TimestampSourceValidator(), ConfigDef.Importance.LOW,
"Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE);

configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"The template for file prefix on S3. "
+ "Supports `{{ variable }}` placeholders for substituting variables. "
+ "Currently supported variables are `topic` and `partition` "
+ "and are mandatory to have these in the directory structure."
+ "Example prefix : topics/{{topic}}/partition/{{partition}}/",
GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG);

return configDef;
}

Expand Down Expand Up @@ -185,4 +197,8 @@ public int getMaxRecordsPerFile() {
return cfg.getInt(FILE_MAX_RECORDS);
}

public String getFilePathPrefixTemplateConfig() {
return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.aiven.kafka.connect.common.source.input.InputFormat;
import io.aiven.kafka.connect.common.source.input.Transformer;
import io.aiven.kafka.connect.common.source.input.TransformerFactory;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;

public class SourceCommonConfig extends CommonConfig {

Expand Down Expand Up @@ -69,6 +70,10 @@ public ErrorsTolerance getErrorsTolerance() {
return sourceConfigFragment.getErrorsTolerance();
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return sourceConfigFragment.getObjectDistributionStrategy();
}

public int getMaxPollRecords() {
return sourceConfigFragment.getMaxPollRecords();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@

package io.aiven.kafka.connect.common.config;

import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.OBJECT_HASH;
import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance;
import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy;

import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;

public final class SourceConfigFragment extends ConfigFragment {
private static final String GROUP_OTHER = "OTHER_CFG";
Expand All @@ -32,6 +36,8 @@ public final class SourceConfigFragment extends ConfigFragment {
public static final String TARGET_TOPICS = "topics";
public static final String ERRORS_TOLERANCE = "errors.tolerance";

public static final String OBJECT_DISTRIBUTION_STRATEGY = "object.distribution.strategy";

/**
* Construct the ConfigFragment..
*
Expand Down Expand Up @@ -67,7 +73,14 @@ public static ConfigDef update(final ConfigDef configDef) {
ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS);
configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC,
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); // NOPMD
offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS);
configDef.define(OBJECT_DISTRIBUTION_STRATEGY, ConfigDef.Type.STRING, OBJECT_HASH.name(),
new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM,
"Based on tasks.max config and this strategy, objects are processed in distributed"
+ " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", "
+ PARTITION_IN_FILENAME,
GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD
// UnusedAssignment

return configDef;
}
Expand All @@ -92,6 +105,10 @@ public ErrorsTolerance getErrorsTolerance() {
return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE));
}

public ObjectDistributionStrategy getObjectDistributionStrategy() {
return ObjectDistributionStrategy.forName(cfg.getString(OBJECT_DISTRIBUTION_STRATEGY));
}

private static class ErrorsToleranceValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
Expand All @@ -103,4 +120,15 @@ public void ensureValid(final String name, final Object value) {
}
}

private static class ObjectDistributionStrategyValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {
final String objectDistributionStrategy = (String) value;
if (StringUtils.isNotBlank(objectDistributionStrategy)) {
// This will throw an Exception if not a valid value.
ObjectDistributionStrategy.forName(objectDistributionStrategy);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.codehaus.plexus.util.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2025 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.input.utils;

import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.kafka.common.config.ConfigException;

import org.apache.commons.lang3.StringUtils;

public final class FilePatternUtils {

public static final String PATTERN_PARTITION_KEY = "partition";
public static final String PATTERN_TOPIC_KEY = "topic";
public static final String START_OFFSET_PATTERN = "{{start_offset}}";
public static final String TIMESTAMP_PATTERN = "{{timestamp}}";
public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}";
public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}";

// Use a named group to return the partition in a complex string to always get the correct information for the
// partition number.
public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)";
public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)";
public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)";

private FilePatternUtils() {
// hidden
}
public static Pattern configurePattern(final String expectedSourceNameFormat) {
if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) {
throw new ConfigException(String.format(
"Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.",
expectedSourceNameFormat));
}
// Build REGEX Matcher
String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN);
regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN);
try {
return Pattern.compile(regexString);
} catch (IllegalArgumentException iae) {
throw new ConfigException(
String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString),
iae);
}
}

public static Optional<String> getTopic(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY));
}

public static Optional<Integer> getPartitionId(final Pattern filePattern, final String sourceName) {
return matchPattern(filePattern, sourceName).flatMap(matcher -> {
try {
return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY)));
} catch (NumberFormatException e) {
return Optional.empty();
}
});
}

private static Optional<Matcher> matchPattern(final Pattern filePattern, final String sourceName) {
if (filePattern == null || sourceName == null) {
throw new IllegalArgumentException("filePattern and sourceName must not be null");
}

final Matcher matcher = filePattern.matcher(sourceName);
return matcher.find() ? Optional.of(matcher) : Optional.empty();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

/**
* An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files)
* into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers.
* <p>
* The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the
* overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together
* sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer
* workers than tasks, and they will be assigned the remaining tasks as work completes.
*/
public interface DistributionStrategy {
/**
* Check if the object should be processed by the task with the given {@code taskId}. Any single object should be
* assigned deterministically to a single taskId.
*
* @param taskId
* a task ID, usually for the currently running task
* @param valueToBeEvaluated
* The value to be evaluated to determine if it should be processed by the task.
* @return true if the task should process the object, false if it should not.
*/
boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern);

/**
* When a connector receives a reconfigure event this method should be called to ensure that the distribution
* strategy is updated correctly.
*
* @param maxTasks
* The maximum number of tasks created for the Connector
*/
void configureDistributionStrategy(int maxTasks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,27 @@

package io.aiven.kafka.connect.common.source.task;

import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of
* the object's filename, which is uniformly distributed and deterministic across workers.
* {@link HashDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of the
* object's filename, which is uniformly distributed and deterministic across workers.
* <p>
* This is well-suited to use cases where the order of events between records from objects is not important, especially
* when ingesting files into Kafka that were not previously created by a supported cloud storage Sink.
*/
public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class);
public final class HashDistributionStrategy implements DistributionStrategy {
private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class);
private int maxTasks;
HashObjectDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
public HashDistributionStrategy(final int maxTasks) {
configureDistributionStrategy(maxTasks);
}

@Override
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated) {
public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) {
if (filenameToBeEvaluated == null) {
LOG.warn("Ignoring as it is not passing a correct filename to be evaluated.");
return false;
Expand All @@ -46,8 +48,8 @@ public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated
}

@Override
public void reconfigureDistributionStrategy(final int maxTasks, final String expectedFormat) {
setMaxTasks(maxTasks);
public void configureDistributionStrategy(final int maxTasks) {
this.maxTasks = maxTasks;
}

public void setMaxTasks(final int maxTasks) {
Expand Down
Loading

0 comments on commit 6b967d3

Please sign in to comment.