diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java index 8d3156e22..467ea2cb2 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/FileNameFragment.java @@ -43,9 +43,12 @@ public final class FileNameFragment extends ConfigFragment { static final String FILE_MAX_RECORDS = "file.max.records"; static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone"; static final String FILE_NAME_TIMESTAMP_SOURCE = "file.name.timestamp.source"; - static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; + public static final String FILE_NAME_TEMPLATE_CONFIG = "file.name.template"; static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}"; + public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template"; + static final String DEFAULT_FILE_PATH_PREFIX_TEMPLATE = "topics/{{topic}}/partition={{partition}}/"; + public FileNameFragment(final AbstractConfig cfg) { super(cfg); } @@ -109,9 +112,18 @@ public void ensureValid(final String name, final Object value) { configDef.define(FILE_NAME_TIMESTAMP_SOURCE, ConfigDef.Type.STRING, TimestampSource.Type.WALLCLOCK.name(), new TimestampSourceValidator(), ConfigDef.Importance.LOW, "Specifies the the timestamp variable source. Default is wall-clock.", GROUP_FILE, fileGroupCounter++, // NOPMD - // UnusedAssignment ConfigDef.Width.SHORT, FILE_NAME_TIMESTAMP_SOURCE); + configDef.define(FILE_PATH_PREFIX_TEMPLATE_CONFIG, ConfigDef.Type.STRING, DEFAULT_FILE_PATH_PREFIX_TEMPLATE, + new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, + "The template for file prefix on S3. " + + "Supports `{{ variable }}` placeholders for substituting variables. " + + "Currently supported variables are `topic` and `partition` " + + "and are mandatory to have these in the directory structure." + + "Example prefix : topics/{{topic}}/partition/{{partition}}/", + GROUP_FILE, fileGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.LONG, FILE_PATH_PREFIX_TEMPLATE_CONFIG); + return configDef; } @@ -185,4 +197,8 @@ public int getMaxRecordsPerFile() { return cfg.getInt(FILE_MAX_RECORDS); } + public String getFilePathPrefixTemplateConfig() { + return cfg.getString(FILE_PATH_PREFIX_TEMPLATE_CONFIG); + } + } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java index 954c9151d..2c9cafe61 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -24,6 +24,7 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; public class SourceCommonConfig extends CommonConfig { @@ -69,6 +70,10 @@ public ErrorsTolerance getErrorsTolerance() { return sourceConfigFragment.getErrorsTolerance(); } + public ObjectDistributionStrategy getObjectDistributionStrategy() { + return sourceConfigFragment.getObjectDistributionStrategy(); + } + public int getMaxPollRecords() { return sourceConfigFragment.getMaxPollRecords(); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java index 58befa60e..f3955a7e3 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceConfigFragment.java @@ -16,12 +16,16 @@ package io.aiven.kafka.connect.common.config; +import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.OBJECT_HASH; +import static io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy.PARTITION_IN_FILENAME; + import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import io.aiven.kafka.connect.common.config.enums.ErrorsTolerance; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; -import org.codehaus.plexus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; public final class SourceConfigFragment extends ConfigFragment { private static final String GROUP_OTHER = "OTHER_CFG"; @@ -32,6 +36,8 @@ public final class SourceConfigFragment extends ConfigFragment { public static final String TARGET_TOPICS = "topics"; public static final String ERRORS_TOLERANCE = "errors.tolerance"; + public static final String OBJECT_DISTRIBUTION_STRATEGY = "object.distribution.strategy"; + /** * Construct the ConfigFragment.. * @@ -67,7 +73,14 @@ public static ConfigDef update(final ConfigDef configDef) { ConfigDef.Width.NONE, TARGET_TOPIC_PARTITIONS); configDef.define(TARGET_TOPICS, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "eg : connect-storage-offsets", GROUP_OFFSET_TOPIC, - offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); // NOPMD + offsetStorageGroupCounter++, ConfigDef.Width.NONE, TARGET_TOPICS); + configDef.define(OBJECT_DISTRIBUTION_STRATEGY, ConfigDef.Type.STRING, OBJECT_HASH.name(), + new ObjectDistributionStrategyValidator(), ConfigDef.Importance.MEDIUM, + "Based on tasks.max config and this strategy, objects are processed in distributed" + + " way by Kafka connect workers, supported values : " + OBJECT_HASH + ", " + + PARTITION_IN_FILENAME, + GROUP_OTHER, offsetStorageGroupCounter++, ConfigDef.Width.NONE, OBJECT_DISTRIBUTION_STRATEGY); // NOPMD + // UnusedAssignment return configDef; } @@ -92,6 +105,10 @@ public ErrorsTolerance getErrorsTolerance() { return ErrorsTolerance.forName(cfg.getString(ERRORS_TOLERANCE)); } + public ObjectDistributionStrategy getObjectDistributionStrategy() { + return ObjectDistributionStrategy.forName(cfg.getString(OBJECT_DISTRIBUTION_STRATEGY)); + } + private static class ErrorsToleranceValidator implements ConfigDef.Validator { @Override public void ensureValid(final String name, final Object value) { @@ -103,4 +120,15 @@ public void ensureValid(final String name, final Object value) { } } + private static class ObjectDistributionStrategyValidator implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + final String objectDistributionStrategy = (String) value; + if (StringUtils.isNotBlank(objectDistributionStrategy)) { + // This will throw an Exception if not a valid value. + ObjectDistributionStrategy.forName(objectDistributionStrategy); + } + } + } + } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index c6aea0e82..8069d08c1 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -30,7 +30,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.function.IOSupplier; -import org.codehaus.plexus.util.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java new file mode 100644 index 000000000..546c0c4c4 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/utils/FilePatternUtils.java @@ -0,0 +1,88 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.input.utils; + +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.kafka.common.config.ConfigException; + +import org.apache.commons.lang3.StringUtils; + +public final class FilePatternUtils { + + public static final String PATTERN_PARTITION_KEY = "partition"; + public static final String PATTERN_TOPIC_KEY = "topic"; + public static final String START_OFFSET_PATTERN = "{{start_offset}}"; + public static final String TIMESTAMP_PATTERN = "{{timestamp}}"; + public static final String PARTITION_PATTERN = "{{" + PATTERN_PARTITION_KEY + "}}"; + public static final String TOPIC_PATTERN = "{{" + PATTERN_TOPIC_KEY + "}}"; + + // Use a named group to return the partition in a complex string to always get the correct information for the + // partition number. + public static final String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_PARTITION_KEY + ">\\d+)"; + public static final String NUMBER_REGEX_PATTERN = "(?:\\d+)"; + public static final String TOPIC_NAMED_GROUP_REGEX_PATTERN = "(?<" + PATTERN_TOPIC_KEY + ">[a-zA-Z0-9\\-_.]+)"; + + private FilePatternUtils() { + // hidden + } + public static Pattern configurePattern(final String expectedSourceNameFormat) { + if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { + throw new ConfigException(String.format( + "Source name format %s missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", + expectedSourceNameFormat)); + } + // Build REGEX Matcher + String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN); + regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN); + regexString = StringUtils.replace(regexString, TOPIC_PATTERN, TOPIC_NAMED_GROUP_REGEX_PATTERN); + regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN); + try { + return Pattern.compile(regexString); + } catch (IllegalArgumentException iae) { + throw new ConfigException( + String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString), + iae); + } + } + + public static Optional getTopic(final Pattern filePattern, final String sourceName) { + return matchPattern(filePattern, sourceName).map(matcher -> matcher.group(PATTERN_TOPIC_KEY)); + } + + public static Optional getPartitionId(final Pattern filePattern, final String sourceName) { + return matchPattern(filePattern, sourceName).flatMap(matcher -> { + try { + return Optional.of(Integer.parseInt(matcher.group(PATTERN_PARTITION_KEY))); + } catch (NumberFormatException e) { + return Optional.empty(); + } + }); + } + + private static Optional matchPattern(final Pattern filePattern, final String sourceName) { + if (filePattern == null || sourceName == null) { + throw new IllegalArgumentException("filePattern and sourceName must not be null"); + } + + final Matcher matcher = filePattern.matcher(sourceName); + return matcher.find() ? Optional.of(matcher) : Optional.empty(); + } + +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java new file mode 100644 index 000000000..8d370c689 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/DistributionStrategy.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.task; + +import java.util.regex.Pattern; + +/** + * An {@link DistributionStrategy} provides a mechanism to share the work of processing records from objects (or files) + * into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers. + *

+ * The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the + * overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together + * sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer + * workers than tasks, and they will be assigned the remaining tasks as work completes. + */ +public interface DistributionStrategy { + /** + * Check if the object should be processed by the task with the given {@code taskId}. Any single object should be + * assigned deterministically to a single taskId. + * + * @param taskId + * a task ID, usually for the currently running task + * @param valueToBeEvaluated + * The value to be evaluated to determine if it should be processed by the task. + * @return true if the task should process the object, false if it should not. + */ + boolean isPartOfTask(int taskId, String valueToBeEvaluated, Pattern filePattern); + + /** + * When a connector receives a reconfigure event this method should be called to ensure that the distribution + * strategy is updated correctly. + * + * @param maxTasks + * The maximum number of tasks created for the Connector + */ + void configureDistributionStrategy(int maxTasks); +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java similarity index 75% rename from commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java rename to commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java index c39676ad0..4928f30d9 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategy.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategy.java @@ -16,25 +16,27 @@ package io.aiven.kafka.connect.common.source.task; +import java.util.regex.Pattern; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link HashObjectDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of - * the object's filename, which is uniformly distributed and deterministic across workers. + * {@link HashDistributionStrategy} evenly distributes cloud storage objects between tasks using the hashcode of the + * object's filename, which is uniformly distributed and deterministic across workers. *

* This is well-suited to use cases where the order of events between records from objects is not important, especially * when ingesting files into Kafka that were not previously created by a supported cloud storage Sink. */ -public final class HashObjectDistributionStrategy implements ObjectDistributionStrategy { - private final static Logger LOG = LoggerFactory.getLogger(HashObjectDistributionStrategy.class); +public final class HashDistributionStrategy implements DistributionStrategy { + private final static Logger LOG = LoggerFactory.getLogger(HashDistributionStrategy.class); private int maxTasks; - HashObjectDistributionStrategy(final int maxTasks) { - this.maxTasks = maxTasks; + public HashDistributionStrategy(final int maxTasks) { + configureDistributionStrategy(maxTasks); } @Override - public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated) { + public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated, final Pattern filePattern) { if (filenameToBeEvaluated == null) { LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); return false; @@ -46,8 +48,8 @@ public boolean isPartOfTask(final int taskId, final String filenameToBeEvaluated } @Override - public void reconfigureDistributionStrategy(final int maxTasks, final String expectedFormat) { - setMaxTasks(maxTasks); + public void configureDistributionStrategy(final int maxTasks) { + this.maxTasks = maxTasks; } public void setMaxTasks(final int maxTasks) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java deleted file mode 100644 index 5925d880d..000000000 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/ObjectDistributionStrategy.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.common.source.task; - -/** - * An {@link ObjectDistributionStrategy} provides a mechanism to share the work of processing records from objects (or - * files) into tasks, which are subsequently processed (potentially in parallel) by Kafka Connect workers. - *

- * The number of objects in cloud storage can be very high, and they are distributed amongst tasks to minimize the - * overhead of assigning work to Kafka worker threads. All objects assigned to the same task will be processed together - * sequentially by the same worker, which can be useful for maintaining order between objects. There are usually fewer - * workers than tasks, and they will be assigned the remaining tasks as work completes. - */ -public interface ObjectDistributionStrategy { - - /** - * Check if the object should be processed by the task with the given {@code taskId}. Any single object should be - * assigned deterministically to a single taskId. - * - * @param taskId - * a task ID, usually for the currently running task - * @param valueToBeEvaluated - * The value to be evaluated to determine if it should be processed by the task. - * @return true if the task should process the object, false if it should not. - */ - boolean isPartOfTask(int taskId, String valueToBeEvaluated); - - /** - * When a connector receives a reconfigure event this method should be called to ensure that the distribution - * strategy is updated correctly. - * - * @param maxTasks - * The maximum number of tasks created for the Connector - * @param expectedFormat - * The expected format, of files, path, table names or other ways to partition the tasks. - */ - void reconfigureDistributionStrategy(int maxTasks, String expectedFormat); - - /** - * Check if the task is responsible for this set of files by checking if the given task matches the partition id. - * - * @param taskId - * the current running task - * @param partitionId - * The partitionId recovered from the file path. - * @return true if this task is responsible for this partition. false if it is not responsible for this task. - */ - default boolean taskMatchesPartition(final int taskId, final int partitionId) { - // The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this - // will break. - return taskId == partitionId; - } - - /** - * In the event of more partitions existing then tasks configured, the task will be required to take up additional - * tasks that match. - * - * @param taskId - * the current running task. - * @param maxTasks - * The maximum number of configured tasks allowed to run for this connector. - * @param partitionId - * The partitionId recovered from the file path. - * @return true if the task supplied should handle the supplied partition - */ - default boolean taskMatchesModOfPartitionAndMaxTask(final int taskId, final int maxTasks, final int partitionId) { - - return taskMatchesPartition(taskId, partitionId % maxTasks); - } - - default boolean toBeProcessedByThisTask(final int taskId, final int maxTasks, final int partitionId) { - return partitionId < maxTasks - ? taskMatchesPartition(taskId, partitionId) - : taskMatchesModOfPartitionAndMaxTask(taskId, maxTasks, partitionId); - - } -} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java new file mode 100644 index 000000000..25f22dfc0 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategy.java @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.task; + +import java.util.Optional; +import java.util.regex.Pattern; + +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The {@link PartitionDistributionStrategy} finds a partition in the object's filename by matching it to an expected + * format, and assigns all partitions to the same task. + *

+ * This useful when a sink connector has created the object name in a format like + * {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed + * within a single task. + */ +public final class PartitionDistributionStrategy implements DistributionStrategy { + private final static Logger LOG = LoggerFactory.getLogger(PartitionDistributionStrategy.class); + private int maxTasks; + + public PartitionDistributionStrategy(final int maxTasks) { + this.maxTasks = maxTasks; + } + + /** + * + * @param sourceNameToBeEvaluated + * is the filename/table name of the source for the connector. + * @return Predicate to confirm if the given source name matches + */ + @Override + public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated, final Pattern filePattern) { + if (sourceNameToBeEvaluated == null) { + LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); + return false; + } + final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, + sourceNameToBeEvaluated); + + if (optionalPartitionId.isPresent()) { + return optionalPartitionId.get() < maxTasks + ? taskMatchesPartition(taskId, optionalPartitionId.get()) + : taskMatchesPartition(taskId, optionalPartitionId.get() % maxTasks); + } + LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated); + return false; + } + + boolean taskMatchesPartition(final int taskId, final int partitionId) { + // The partition id and task id are both expected to start at 0 but if the task id is changed to start at 1 this + // will break. + return taskId == partitionId; + } + + /** + * When a connector reconfiguration event is received this method should be called to ensure the correct strategy is + * being implemented by the connector. + * + * @param maxTasks + * maximum number of configured tasks for this connector + */ + @Override + public void configureDistributionStrategy(final int maxTasks) { + this.maxTasks = maxTasks; + } +} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java deleted file mode 100644 index f74e56826..000000000 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategy.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.common.source.task; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.kafka.common.config.ConfigException; - -import org.codehaus.plexus.util.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The {@link PartitionInFilenameDistributionStrategy} finds a partition in the object's filename by matching it to an - * expected format, and assigns all partitions to the same task. - *

- * This useful when a sink connector has created the object name in a format like - * {@code topicname-{{partition}}-{{start_offset}}}, and we want all objects with the same partition to be processed - * within a single task. - */ -public final class PartitionInFilenameDistributionStrategy implements ObjectDistributionStrategy { - private final static Logger LOG = LoggerFactory.getLogger(PartitionInFilenameDistributionStrategy.class); - private final static String NUMBER_REGEX_PATTERN = "(\\d)+"; - // Use a named group to return the partition in a complex string to always get the correct information for the - // partition number. - private final static String PARTITION_NAMED_GROUP_REGEX_PATTERN = "(?\\d)+"; - private final static String PARTITION_PATTERN = "\\{\\{partition}}"; - private final static String START_OFFSET_PATTERN = "\\{\\{start_offset}}"; - private final static String TIMESTAMP_PATTERN = "\\{\\{timestamp}}"; - public static final String PARTITION = "partition"; - private Pattern partitionPattern; - - private int maxTasks; - - PartitionInFilenameDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) { - configureDistributionStrategy(maxTasks, expectedSourceNameFormat); - } - - /** - * - * @param sourceNameToBeEvaluated - * is the filename/table name of the source for the connector. - * @return Predicate to confirm if the given source name matches - */ - @Override - public boolean isPartOfTask(final int taskId, final String sourceNameToBeEvaluated) { - if (sourceNameToBeEvaluated == null) { - LOG.warn("Ignoring as it is not passing a correct filename to be evaluated."); - return false; - } - final Matcher match = partitionPattern.matcher(sourceNameToBeEvaluated); - if (match.find()) { - return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(match.group(PARTITION))); - } - LOG.warn("Unable to find the partition from this file name {}", sourceNameToBeEvaluated); - return false; - } - - /** - * When a connector reconfiguration event is received this method should be called to ensure the correct strategy is - * being implemented by the connector. - * - * @param maxTasks - * maximum number of configured tasks for this connector - * @param expectedSourceNameFormat - * what the format of the source should appear like so to configure the task distribution. - */ - @Override - public void reconfigureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) { - configureDistributionStrategy(maxTasks, expectedSourceNameFormat); - } - - private void configureDistributionStrategy(final int maxTasks, final String expectedSourceNameFormat) { - if (expectedSourceNameFormat == null || !expectedSourceNameFormat.contains(PARTITION_PATTERN)) { - throw new ConfigException(String.format( - "Source name format %s missing partition pattern {{partition}}, please configure the expected source to include the partition pattern.", - expectedSourceNameFormat)); - } - setMaxTasks(maxTasks); - // Build REGEX Matcher - String regexString = StringUtils.replace(expectedSourceNameFormat, START_OFFSET_PATTERN, NUMBER_REGEX_PATTERN); - regexString = StringUtils.replace(regexString, TIMESTAMP_PATTERN, NUMBER_REGEX_PATTERN); - regexString = StringUtils.replace(regexString, PARTITION_PATTERN, PARTITION_NAMED_GROUP_REGEX_PATTERN); - try { - partitionPattern = Pattern.compile(regexString); - } catch (IllegalArgumentException iae) { - throw new ConfigException( - String.format("Unable to compile the regex pattern %s to retrieve the partition id.", regexString), - iae); - } - } - - private void setMaxTasks(final int maxTasks) { - this.maxTasks = maxTasks; - } - -} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java deleted file mode 100644 index 85e1c3e75..000000000 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategy.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.common.source.task; - -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.connect.errors.ConnectException; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The {@link PartitionInPathDistributionStrategy} finds a partition number in the path by matching a - * {@code {{partition}} } marker in the path. - *

- * This useful when a sink connector has created the object name in a path like - * {@code /PREFIX/partition={{partition}}/YYYY/MM/DD/mm/}}, and we want all objects with the same partition to be - * processed within a single task. - *

- * Partitions are evenly distributed between tasks. For example, in Connect with 10 Partitions and 3 tasks: - * - *

- *   | Task | Partitions |
- *   |------|------------|
- *   | 0    | 0, 3, 6, 9 |
- *   | 1    | 1, 4, 7    |
- *   | 2    | 2, 5, 8    |
- * 
- */ -public final class PartitionInPathDistributionStrategy implements ObjectDistributionStrategy { - public static final String PARTITION_ID_PATTERN = "\\{\\{partition}}"; - private final static Logger LOG = LoggerFactory.getLogger(PartitionInPathDistributionStrategy.class); - - private String prefix; - private int maxTasks; - - PartitionInPathDistributionStrategy(final int maxTasks, final String expectedPathFormat) { - configureDistributionStrategy(maxTasks, expectedPathFormat); - } - - @Override - public boolean isPartOfTask(final int taskId, final String pathToBeEvaluated) { - if (pathToBeEvaluated == null || !pathToBeEvaluated.startsWith(prefix)) { - LOG.warn("Ignoring path {}, does not contain the preconfigured prefix {} set up at startup", - pathToBeEvaluated, prefix); - return false; - } - final String modifiedPath = StringUtils.substringAfter(pathToBeEvaluated, prefix); - if (!modifiedPath.contains("/")) { - LOG.warn("Ignoring path {}, does not contain any sub folders after partitionId prefix {}", - pathToBeEvaluated, prefix); - return false; - } - final String partitionId = StringUtils.substringBefore(modifiedPath, "/"); - - try { - return toBeProcessedByThisTask(taskId, maxTasks, Integer.parseInt(partitionId)); - } catch (NumberFormatException ex) { - throw new ConnectException(String - .format("Unexpected non integer value found parsing path for partitionId: %s", pathToBeEvaluated)); - } - } - - /** - * - * @param maxTasks - * The maximum number of configured tasks for this - * @param expectedPathFormat - * The format of the path and where to identify - */ - @Override - public void reconfigureDistributionStrategy(final int maxTasks, final String expectedPathFormat) { - configureDistributionStrategy(maxTasks, expectedPathFormat); - } - - private void configureDistributionStrategy(final int maxTasks, final String expectedPathFormat) { - setMaxTasks(maxTasks); - - if (StringUtils.isEmpty(expectedPathFormat) || !expectedPathFormat.contains(PARTITION_ID_PATTERN)) { - throw new ConfigException(String.format( - "Expected path format %s is missing the identifier '%s' to correctly select the partition", - expectedPathFormat, PARTITION_ID_PATTERN)); - } - prefix = StringUtils.substringBefore(expectedPathFormat, PARTITION_ID_PATTERN); - } - - private void setMaxTasks(final int maxTasks) { - this.maxTasks = maxTasks; - } - -} diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/task/enums/ObjectDistributionStrategy.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/enums/ObjectDistributionStrategy.java new file mode 100644 index 000000000..26c1efa94 --- /dev/null +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/task/enums/ObjectDistributionStrategy.java @@ -0,0 +1,48 @@ +/* + * Copyright 2025 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.task.enums; + +import java.util.Arrays; +import java.util.Objects; + +import org.apache.kafka.common.config.ConfigException; + +public enum ObjectDistributionStrategy { + + OBJECT_HASH("object_hash"), PARTITION_IN_FILENAME("partition_in_filename"); + + private final String name; + + public String value() { + return name; + } + + ObjectDistributionStrategy(final String name) { + this.name = name; + } + + public static ObjectDistributionStrategy forName(final String name) { + Objects.requireNonNull(name, "name cannot be null"); + for (final ObjectDistributionStrategy objectDistributionStrategy : ObjectDistributionStrategy.values()) { + if (objectDistributionStrategy.name.equalsIgnoreCase(name)) { + return objectDistributionStrategy; + } + } + throw new ConfigException(String.format("Unknown object.distribution.strategy type: %s, allowed values %s ", + name, Arrays.toString(ObjectDistributionStrategy.values()))); + } +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java similarity index 82% rename from commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategyTest.java rename to commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java index 63a6a76f5..50ef73964 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashObjectDistributionStrategyTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/HashDistributionStrategyTest.java @@ -21,10 +21,12 @@ import java.util.ArrayList; import java.util.List; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; + import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -final class HashObjectDistributionStrategyTest { +final class HashDistributionStrategyTest { @ParameterizedTest @CsvSource({ "logs-0-0002.txt", "logs-1-0002.txt", "logs-2-0002.txt", "logs-3-0002.txt", "logs-4-0002.txt", @@ -34,10 +36,11 @@ final class HashObjectDistributionStrategyTest { "reallylongfilenamecreatedonS3tohisdesomedata and alsohassome spaces.txt" }) void hashDistributionExactlyOnce(final String path) { final int maxTaskId = 10; - final ObjectDistributionStrategy taskDistribution = new HashObjectDistributionStrategy(maxTaskId); + final DistributionStrategy taskDistribution = new HashDistributionStrategy(maxTaskId); final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTaskId; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); } assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); @@ -51,18 +54,20 @@ void hashDistributionExactlyOnce(final String path) { "reallylongfilenamecreatedonS3tohisdesomedata and alsohassome spaces.txt" }) void hashDistributionExactlyOnceWithReconfigureEvent(final String path) { int maxTasks = 10; - final ObjectDistributionStrategy taskDistribution = new HashObjectDistributionStrategy(maxTasks); + final DistributionStrategy taskDistribution = new HashDistributionStrategy(maxTasks); final List results = new ArrayList<>(); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); } assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); results.clear(); maxTasks = 5; - taskDistribution.reconfigureDistributionStrategy(maxTasks, null); + taskDistribution.configureDistributionStrategy(maxTasks); for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))); } assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java new file mode 100644 index 000000000..c62fbb9bc --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionDistributionStrategyTest.java @@ -0,0 +1,299 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.source.task; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.common.config.ConfigException; + +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +final class PartitionDistributionStrategyTest { + + @Test + void partitionInFileNameDefaultAivenS3Sink() { + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(2); + assertThat(taskDistribution.isPartOfTask(1, "logs-1-00112.gz", + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"))).isTrue(); + } + + @Test + void partitionLocationNotSetExpectException() { + assertThatThrownBy(() -> new PartitionDistributionStrategy(2).isPartOfTask(1, "", + FilePatternUtils.configurePattern("logs-23--"))) + .isInstanceOf(ConfigException.class) + .hasMessage( + "Source name format logs-23-- missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); + + } + + @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") + @CsvSource({ "{{topic}}-{{partition}}-{{start_offset}},logs-0-00112.gz", + "{{topic}}-2024-{{timestamp}}-{{partition}}-{{start_offset}},logs-2024-20220201-0-00112.gz", + "{{topic}}-2023-{{partition}}-{{start_offset}},logs-2023-0-00112.gz", + "logs-2023-{{partition}}-{{start_offset}},logs-2023-0-00112.gz", + "{{topic}}-{{timestamp}}-{{timestamp}}-{{timestamp}}-{{partition}}-{{start_offset}},logs1-2022-10-02-10-00112.gz", + "{{topic}}{{partition}}-{{start_offset}},89521-00112.gz", + "{{topic}}-{{partition}},Emergency-TEST1-00112.gz", + "Emergency-TEST1-{{partition}},Emergency-TEST1-00112.gz", + "{{topic}}-{{partition}}-{{start_offset}},PROD-logs-1-00112.gz", + "{{topic}}-{{partition}},DEV_team_1-00112.gz", + "{{topic}}-{{partition}}-{{start_offset}},timeseries-1-00112.gz" }) + void testPartitionFileNamesAndExpectedOutcomes(final String configuredFilenamePattern, final String filename) { + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(1); + // This test is testing the filename matching not the task allocation. + assertThat(taskDistribution.isPartOfTask(0, filename, + FilePatternUtils.configurePattern(configuredFilenamePattern))).isTrue(); + } + + @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") + @CsvSource({ "different-topic-{{partition}}-{{start_offset}},logs-1-00112.gz", + "no-seperator-in-date-partition-offset-{{timestamp}}-{{partition}}-{{start_offset}},no-seperator-in-date-partition-offset-202420220201100112.gz", + "logs-2024-{{timestamp}}-{{partition}}-{{start_offset}},logs-20201-1-00112.gz", + "logs-2024-{{timestamp}}{{partition}}-{{start_offset}},logs-202011-00112.gz", + "logs-2024-{{timestamp}}{{partition}}-{{start_offset}}, ", + "logs-2023-{{partition}}-{{start_offset}},logs-2023-one-00112.gz" }) + void expectFalseOnMalformedFilenames(final String configuredFilenamePattern, final String filename) { + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(1); + // This test is testing the filename matching not the task allocation. + assertThat(taskDistribution.isPartOfTask(0, filename, + FilePatternUtils.configurePattern(configuredFilenamePattern))).isFalse(); + } + + @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1}, Filename: {1}") + @CsvSource({ "0,10,topics/logs/0/logs-0-0002.txt", "1,10,topics/logs/1/logs-1-0002.txt", + "2,10,topics/logs/2/logs-2-0002.txt", "3,10,topics/logs/3/logs-3-0002.txt", + "4,10,topics/logs/4/logs-4-0002.txt", "5,10,topics/logs/5/logs-5-0002.txt", + "6,10,topics/logs/6/logs-6-0002.txt", "7,10,topics/logs/7/logs-7-0002.txt", + "8,10,topics/logs/8/logs-8-0002.txt", "9,10,topics/logs/9/logs-9-0002.txt" }) + void checkCorrectDistributionAcrossTasksOnFileName(final int taskId, final int maxTasks, final String path) { + + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + + assertThat(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("logs-{{partition}}-{{start_offset}}"))).isTrue(); + } + + @ParameterizedTest(name = "[{index}] MaxTasks: {0}, Filename: {1}") + @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.txt", "10,topics/logs/2/logs-0002.txt", + "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", + "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", + "10,topics/logs/9/logs-0002.txt" }) + void filenameDistributionExactlyOnceDistribution(final int maxTasks, final String path) { + + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final List results = new ArrayList<>(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("logs-{{partition}}.txt"))); + } + assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + } + + @ParameterizedTest(name = "[{index}] MaxTasks: {0}, TaskId: {1}, Filename: {2}") + @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", + "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", + "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", + "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) + void filenameDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, + final int maxTaskAfterReConfig, final String path) { + + final String expectedSourceNameFormat = "logs-{{partition}}.txt"; + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final List results = new ArrayList<>(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern(expectedSourceNameFormat))); + } + assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); + + results.clear(); + for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern(expectedSourceNameFormat))); + } + assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + Boolean.FALSE); + } + + @ParameterizedTest + @CsvSource({ + "{topic}}-1.txt,'Source name format {topic}}-1.txt missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", + " ,'Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'", + "empty-pattern,'Source name format empty-pattern missing partition pattern {{partition}} please configure the expected source to include the partition pattern.'" }) + void malformedFilenameSetup(final String expectedSourceFormat, final String expectedErrorMessage) { + final int maxTaskId = 1; + assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", + FilePatternUtils.configurePattern(expectedSourceFormat))).isInstanceOf(ConfigException.class) + .hasMessage(expectedErrorMessage); + } + + @Test + void errorExpectedNullGivenForSourceNameFormat() { + final int maxTaskId = 1; + assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", + FilePatternUtils.configurePattern(null))).isInstanceOf(ConfigException.class) + .hasMessage("Source name format null missing partition pattern {{partition}} please configure" + + " the expected source to include the partition pattern."); + } + + @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") + @CsvSource({ "0,1,topics/logs/partition=5/logs+5+0002.txt,true", + "0,4,topics/logs/partition=5/logs+5+0002.txt,false", "1,4,topics/logs/partition=5/logs+5+0002.txt,true", + "0,3,topics/logs/partition=5/logs+5+0002.txt,false", "0,5,topics/logs/partition=5/logs+5+0002.txt,true", + "2,3,topics/logs/partition=5/logs+5+0002.txt,true" }) + void withLeadingStringPartitionNamingConvention(final int taskId, final int maxTasks, final String path, + final boolean expectedResult) { + + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + + assertThat(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("topics/{{topic}}/partition={{partition}}/.*$"))) + .isEqualTo(expectedResult); + } + + @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") + @CsvSource({ "0,1,bucket/topics/topic-1/5/logs+5+0002.txt,true", + "0,4,bucket/topics/topic-1/5/logs+5+0002.txt,false", "1,4,bucket/topics/topic-1/5/logs+5+0002.txt,true", + "0,3,bucket/topics/topic-1/5/logs+5+0002.txt,false", "0,5,bucket/topics/topic-1/5/logs+5+0002.txt,true", + "2,3,bucket/topics/topic-1/5/logs+5+0002.txt,true" }) + void partitionInPathConvention(final int taskId, final int maxTaskId, final String path, + final boolean expectedResult) { + + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); + + assertThat(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("bucket/topics/{{topic}}/{{partition}}/.*$"))) + .isEqualTo(expectedResult); + } + + @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") + @CsvSource({ "0,10,topics/logs/0/logs-0002.txt", "1,10,topics/logs/1/logs-0002.txt", + "2,10,topics/logs/2/logs-0002.txt", "3,10,topics/logs/3/logs-0002.txt", "4,10,topics/logs/4/logs-0002.txt", + "5,10,topics/logs/5/logs-0002.txt", "6,10,topics/logs/6/logs-0002.txt", "7,10,topics/logs/7/logs-0002.txt", + "8,10,topics/logs/8/logs-0002.txt", "9,10,topics/logs/9/logs-0002.txt" }) + void checkCorrectDistributionAcrossTasks(final int taskId, final int maxTaskId, final String path) { + + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); + + assertThat(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isTrue(); + } + + @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") + @CsvSource({ "1,10,topcs/logs/0/logs-0002.txt", "2,10,topics/logs/1", "3,10,S3/logs/2/logs-0002.txt", + "4,10,topics/log/3/logs-0002.txt", "5,10,prod/logs/4/logs-0002.txt", "6,10,misspelt/logs/5/logs-0002.txt", + "7,10,test/logs/6/logs-0002.txt", "8,10,random/logs/7/logs-0002.txt", "9,10,DEV/logs/8/logs-0002.txt", + "10,10,poll/logs/9/logs-0002.txt" }) + void expectNoMatchOnUnconfiguredPaths(final int taskId, final int maxTaskId, final String path) { + + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); + + assertThat(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isFalse(); + } + + @Test + void expectExceptionOnNonIntPartitionSupplied() { + final int taskId = 1; + final int maxTaskId = 1; + final String path = "topics/logs/one/test-001.txt"; + + final PartitionDistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTaskId); + assertThat(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))).isFalse(); + } + + @Test + void malformedRegexSetup() { + final int maxTaskId = 1; + + assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, "", + FilePatternUtils.configurePattern("topics/{{topic}}/"))).isInstanceOf(ConfigException.class) + .hasMessage( + "Source name format topics/{{topic}}/ missing partition pattern {{partition}} please configure the expected source to include the partition pattern."); + } + + @ParameterizedTest + @CsvSource({ + ",Source name format null missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", + "@adsfs,Source name format @adsfs missing partition pattern {{partition}} please configure the expected source to include the partition pattern.", + "empty-path,Source name format empty-path missing partition pattern {{partition}} please configure the expected source to include the partition pattern." }) + void malformedPathSetup(final String expectedPathFormat, final String expectedErrorMessage) { + final int maxTaskId = 1; + + assertThatThrownBy(() -> new PartitionDistributionStrategy(maxTaskId).isPartOfTask(1, expectedPathFormat, + FilePatternUtils.configurePattern(expectedPathFormat))).isInstanceOf(ConfigException.class) + .hasMessage(expectedErrorMessage); + } + + @ParameterizedTest + @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.log", "10,topics/logs/2/logs-0002.txt", + "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", + "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", + "10,topics/logs/9/logs-0002.txt" }) + void partitionPathDistributionExactlyOnceDistribution(final int maxTasks, final String path) { + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final List results = new ArrayList<>(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern("topics/{{topic}}/{{partition}}/.*$"))); + } + assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + } + + @ParameterizedTest + @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", + "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", + "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", + "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) + void partitionPathDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, + final int maxTaskAfterReConfig, final String path) { + + final String expectedSourceNameFormat = "topics/{{topic}}/{{partition}}/.*$"; + final DistributionStrategy taskDistribution = new PartitionDistributionStrategy(maxTasks); + final List results = new ArrayList<>(); + for (int taskId = 0; taskId < maxTasks; taskId++) { + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern(expectedSourceNameFormat))); + } + assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); + taskDistribution.configureDistributionStrategy(maxTaskAfterReConfig); + + results.clear(); + for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { + results.add(taskDistribution.isPartOfTask(taskId, path, + FilePatternUtils.configurePattern(expectedSourceNameFormat))); + } + assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, + Boolean.FALSE); + } + +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategyTest.java deleted file mode 100644 index f1993ecba..000000000 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionInFilenameDistributionStrategyTest.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.common.source.task; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.kafka.common.config.ConfigException; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; - -final class PartitionInFilenameDistributionStrategyTest { - - @Test - void partitionInFileNameDefaultAivenS3Sink() { - final ObjectDistributionStrategy taskDistribution = new PartitionInFilenameDistributionStrategy(2, - "logs-\\{\\{partition}}-\\{\\{start_offset}}"); - assertThat(taskDistribution.isPartOfTask(1, "logs-1-00112.gz")).isTrue(); - } - - @Test - void partitionLocationNotSetExpectException() { - assertThatThrownBy(() -> new PartitionInFilenameDistributionStrategy(2, "logs-23--")) - .isInstanceOf(ConfigException.class) - .hasMessage( - "Source name format logs-23-- missing partition pattern {{partition}}, please configure the expected source to include the partition pattern."); - - } - - @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") - @CsvSource({ "logs-\\{\\{partition}}-\\{\\{start_offset}},logs-0-00112.gz", - "logs-2024-\\{\\{timestamp}}-\\{\\{partition}}-\\{\\{start_offset}},logs-2024-20220201-0-00112.gz", - "logs-2023-\\{\\{partition}}-\\{\\{start_offset}},logs-2023-0-00112.gz", - "logs1-\\{\\{timestamp}}-\\{\\{timestamp}}-\\{\\{timestamp}}-\\{\\{partition}}-\\{\\{start_offset}},logs1-2022-10-02-10-00112.gz", - "8952\\{\\{partition}}-\\{\\{start_offset}},89521-00112.gz", - "Emergency-TEST\\{\\{partition}}-\\{\\{start_offset}},Emergency-TEST1-00112.gz", - "PROD-logs-\\{\\{partition}}-\\{\\{start_offset}},PROD-logs-1-00112.gz", - "DEV_team_\\{\\{partition}}-\\{\\{start_offset}},DEV_team_1-00112.gz", - "timeseries-\\{\\{partition}}-\\{\\{start_offset}},timeseries-1-00112.gz" }) - void testPartitionFileNamesAndExpectedOutcomes(final String configuredFilenamePattern, final String filename) { - final ObjectDistributionStrategy taskDistribution = new PartitionInFilenameDistributionStrategy(1, - configuredFilenamePattern); - // This test is testing the filename matching not the task allocation. - assertThat(taskDistribution.isPartOfTask(0, filename)).isTrue(); - } - - @ParameterizedTest(name = "[{index}] Pattern: {0}, Filename: {1}") - @CsvSource({ "different-topic-\\{\\{partition}}-\\{\\{start_offset}},logs-1-00112.gz", - "no-seperator-in-date-partition-offset-\\{\\{timestamp}}-\\{\\{partition}}-\\{\\{start_offset}},no-seperator-in-date-partition-offset-202420220201100112.gz", - "logs-2024-\\{\\{timestamp}}-\\{\\{partition}}-\\{\\{start_offset}},logs-20201-1-00112.gz", - "logs-2024-\\{\\{timestamp}}\\{\\{partition}}-\\{\\{start_offset}},logs-202011-00112.gz", - "logs-2024-\\{\\{timestamp}}\\{\\{partition}}-\\{\\{start_offset}}, ", - "logs-2023-\\{\\{partition}}-\\{\\{start_offset}},logs-2023-one-00112.gz" }) - void expectFalseOnMalformedFilenames(final String configuredFilenamePattern, final String filename) { - final ObjectDistributionStrategy taskDistribution = new PartitionInFilenameDistributionStrategy(1, - configuredFilenamePattern); - // This test is testing the filename matching not the task allocation. - assertThat(taskDistribution.isPartOfTask(0, filename)).isFalse(); - } - - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1}, Filename: {1}") - @CsvSource({ "0,10,topics/logs/0/logs-0-0002.txt", "1,10,topics/logs/1/logs-1-0002.txt", - "2,10,topics/logs/2/logs-2-0002.txt", "3,10,topics/logs/3/logs-3-0002.txt", - "4,10,topics/logs/4/logs-4-0002.txt", "5,10,topics/logs/5/logs-5-0002.txt", - "6,10,topics/logs/6/logs-6-0002.txt", "7,10,topics/logs/7/logs-7-0002.txt", - "8,10,topics/logs/8/logs-8-0002.txt", "9,10,topics/logs/9/logs-9-0002.txt" }) - void checkCorrectDistributionAcrossTasks(final int taskId, final int maxTasks, final String path) { - - final ObjectDistributionStrategy taskDistribution = new PartitionInFilenameDistributionStrategy(maxTasks, - "logs-\\{\\{partition}}-\\{\\{start_offset}}"); - - assertThat(taskDistribution.isPartOfTask(taskId, path)).isTrue(); - } - - @ParameterizedTest(name = "[{index}] MaxTasks: {0}, Filename: {1}") - @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.txt", "10,topics/logs/2/logs-0002.txt", - "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", - "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", - "10,topics/logs/9/logs-0002.txt" }) - void filenameDistributionExactlyOnceDistribution(final int maxTasks, final String path) { - - final ObjectDistributionStrategy taskDistribution = new PartitionInFilenameDistributionStrategy(maxTasks, - "logs-\\{\\{partition}}.txt"); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); - } - - @ParameterizedTest(name = "[{index}] MaxTasks: {0}, TaskId: {1}, Filename: {2}") - @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", - "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", - "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", - "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) - void filenameDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, - final int maxTaskAfterReConfig, final String path) { - - final String expectedSourceNameFormat = "logs-\\{\\{partition}}.txt"; - final ObjectDistributionStrategy taskDistribution = new PartitionInFilenameDistributionStrategy(maxTasks, - expectedSourceNameFormat); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); - taskDistribution.reconfigureDistributionStrategy(maxTaskAfterReConfig, expectedSourceNameFormat); - - results.clear(); - for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); - } - - @ParameterizedTest - @CsvSource({ - "logs-{{partition}}.txt,'Source name format logs-{{partition}}.txt missing partition pattern {{partition}}, please configure the expected source to include the partition pattern.'", - " ,'Source name format null missing partition pattern {{partition}}, please configure the expected source to include the partition pattern.'", - "empty-pattern,'Source name format empty-pattern missing partition pattern {{partition}}, please configure the expected source to include the partition pattern.'" }) - void malformedFilenameSetup(final String expectedSourceFormat, final String expectedErrorMessage) { - final int maxTaskId = 1; - - assertThatThrownBy(() -> new PartitionInFilenameDistributionStrategy(maxTaskId, expectedSourceFormat)) - .isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); - } - - @Test - void errorExpectedNullGivenForSourceNameFormat() { - final int maxTaskId = 1; - - assertThatThrownBy(() -> new PartitionInFilenameDistributionStrategy(maxTaskId, null)) - .isInstanceOf(ConfigException.class) - .hasMessage( - "Source name format null missing partition pattern {{partition}}, please configure the expected source to include the partition pattern."); - } - -} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategyTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategyTest.java deleted file mode 100644 index 4c2a6fede..000000000 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/task/PartitionInPathDistributionStrategyTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright 2024 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.connect.common.source.task; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.kafka.common.config.ConfigException; -import org.apache.kafka.connect.errors.ConnectException; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; - -final class PartitionInPathDistributionStrategyTest { - - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "0,1,topics/logs/partition=5/logs+5+0002.txt,true", - "0,4,topics/logs/partition=5/logs+5+0002.txt,false", "1,4,topics/logs/partition=5/logs+5+0002.txt,true", - "0,3,topics/logs/partition=5/logs+5+0002.txt,false", "0,5,topics/logs/partition=5/logs+5+0002.txt,true", - "2,3,topics/logs/partition=5/logs+5+0002.txt,true" }) - void withLeadingStringPartitionNamingConvention(final int taskId, final int maxTasks, final String path, - final boolean expectedResult) { - - final PartitionInPathDistributionStrategy taskDistribution = new PartitionInPathDistributionStrategy(maxTasks, - "topics/logs/partition=\\{\\{partition}}/"); - - assertThat(taskDistribution.isPartOfTask(taskId, path)).isEqualTo(expectedResult); - } - - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "0,1,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "0,4,bucket/topics/topic-1/5/logs+5+0002.txt,false", "1,4,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "0,3,bucket/topics/topic-1/5/logs+5+0002.txt,false", "0,5,bucket/topics/topic-1/5/logs+5+0002.txt,true", - "2,3,bucket/topics/topic-1/5/logs+5+0002.txt,true" }) - void partitionInPathConvention(final int taskId, final int maxTaskId, final String path, - final boolean expectedResult) { - - final PartitionInPathDistributionStrategy taskDistribution = new PartitionInPathDistributionStrategy(maxTaskId, - "bucket/topics/topic-1/\\{\\{partition}}/"); - - assertThat(taskDistribution.isPartOfTask(taskId, path)).isEqualTo(expectedResult); - } - - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "0,10,topics/logs/0/logs-0002.txt", "1,10,topics/logs/1/logs-0002.txt", - "2,10,topics/logs/2/logs-0002.txt", "3,10,topics/logs/3/logs-0002.txt", "4,10,topics/logs/4/logs-0002.txt", - "5,10,topics/logs/5/logs-0002.txt", "6,10,topics/logs/6/logs-0002.txt", "7,10,topics/logs/7/logs-0002.txt", - "8,10,topics/logs/8/logs-0002.txt", "9,10,topics/logs/9/logs-0002.txt" }) - void checkCorrectDistributionAcrossTasks(final int taskId, final int maxTaskId, final String path) { - - final PartitionInPathDistributionStrategy taskDistribution = new PartitionInPathDistributionStrategy(maxTaskId, - "topics/logs/\\{\\{partition}}/"); - - assertThat(taskDistribution.isPartOfTask(taskId, path)).isTrue(); - } - - @ParameterizedTest(name = "[{index}] TaskId: {0}, MaxTasks: {1} Filename: {2}") - @CsvSource({ "1,10,topcs/logs/0/logs-0002.txt", "2,10,topics/logs/1", "3,10,S3/logs/2/logs-0002.txt", - "4,10,topics/log/3/logs-0002.txt", "5,10,prod/logs/4/logs-0002.txt", "6,10,misspelt/logs/5/logs-0002.txt", - "7,10,test/logs/6/logs-0002.txt", "8,10,random/logs/7/logs-0002.txt", "9,10,DEV/logs/8/logs-0002.txt", - "10,10,poll/logs/9/logs-0002.txt" }) - void expectNoMatchOnUnconfiguredPaths(final int taskId, final int maxTaskId, final String path) { - - final PartitionInPathDistributionStrategy taskDistribution = new PartitionInPathDistributionStrategy(maxTaskId, - "topics/logs/\\{\\{partition}}/"); - - assertThat(taskDistribution.isPartOfTask(taskId, path)).isFalse(); - } - - @Test - void expectExceptionOnNonIntPartitionSupplied() { - final int taskId = 1; - final int maxTaskId = 1; - final String path = "topics/logs/one/test-001.txt"; - - final PartitionInPathDistributionStrategy taskDistribution = new PartitionInPathDistributionStrategy(maxTaskId, - "topics/logs/\\{\\{partition}}/"); - assertThatThrownBy(() -> taskDistribution.isPartOfTask(taskId, path)).isInstanceOf(ConnectException.class) - .hasMessage( - "Unexpected non integer value found parsing path for partitionId: topics/logs/one/test-001.txt"); - } - - @Test - void malformedRegexSetup() { - final int maxTaskId = 1; - - assertThatThrownBy(() -> new PartitionInPathDistributionStrategy(maxTaskId, "topics/logs/{{partition}}/")) - .isInstanceOf(ConfigException.class) - .hasMessage( - "Expected path format topics/logs/{{partition}}/ is missing the identifier '\\{\\{partition}}' to correctly select the partition"); - } - - @ParameterizedTest - @CsvSource({ - ",Expected path format null is missing the identifier '\\{\\{partition}}' to correctly select the partition", - "@adsfs,Expected path format @adsfs is missing the identifier '\\{\\{partition}}' to correctly select the partition", - "empty-path,Expected path format empty-path is missing the identifier '\\{\\{partition}}' to correctly select the partition" }) - void malformedPathSetup(final String expectedPathFormat, final String expectedErrorMessage) { - final int maxTaskId = 1; - - assertThatThrownBy(() -> new PartitionInPathDistributionStrategy(maxTaskId, expectedPathFormat)) - .isInstanceOf(ConfigException.class) - .hasMessage(expectedErrorMessage); - } - - @ParameterizedTest - @CsvSource({ "10,topics/logs/0/logs-0002.txt", "10,topics/logs/1/logs-001.log", "10,topics/logs/2/logs-0002.txt", - "10,topics/logs/3/logs-0002.txt", "10,topics/logs/4/logs-0002.txt", "10,topics/logs/5/logs-0002.txt", - "10,topics/logs/6/logs-0002.txt", "10,topics/logs/7/logs-0002.txt", "10,topics/logs/8/logs-0002.txt", - "10,topics/logs/9/logs-0002.txt" }) - void partitionPathDistributionExactlyOnceDistribution(final int maxTasks, final String path) { - - final ObjectDistributionStrategy taskDistribution = new PartitionInPathDistributionStrategy(maxTasks, - "topics/logs/\\{\\{partition}}"); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); - } - - @ParameterizedTest - @CsvSource({ "10,5,topics/logs/0/logs-0002.txt", "10,5,topics/logs/1/logs-001.txt", - "10,5,topics/logs/2/logs-0002.txt", "10,5,topics/logs/3/logs-0002.txt", "10,5,topics/logs/4/logs-0002.txt", - "10,5,topics/logs/5/logs-0002.txt", "10,5,topics/logs/6/logs-0002.txt", "10,5,topics/logs/7/logs-0002.txt", - "10,5,topics/logs/8/logs-0002.txt", "10,5,topics/logs/9/logs-0002.txt" }) - void partitionPathDistributionExactlyOnceDistributionWithTaskReconfiguration(final int maxTasks, - final int maxTaskAfterReConfig, final String path) { - - final String expectedSourceNameFormat = "topics/logs/\\{\\{partition}}"; - final ObjectDistributionStrategy taskDistribution = new PartitionInPathDistributionStrategy(maxTasks, - expectedSourceNameFormat); - final List results = new ArrayList<>(); - for (int taskId = 0; taskId < maxTasks; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE); - taskDistribution.reconfigureDistributionStrategy(maxTaskAfterReConfig, expectedSourceNameFormat); - - results.clear(); - for (int taskId = 0; taskId < maxTaskAfterReConfig; taskId++) { - results.add(taskDistribution.isPartOfTask(taskId, path)); - } - assertThat(results).containsExactlyInAnyOrder(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE, Boolean.FALSE, - Boolean.FALSE); - } - -} diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 42d10aad7..5d95d6ebd 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -23,7 +23,6 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; -import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR; @@ -34,8 +33,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -49,6 +46,8 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.TransformerFactory; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; @@ -58,7 +57,6 @@ import org.apache.avro.Schema; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -76,28 +74,16 @@ class AwsIntegrationTest implements IntegrationBase { @Container public static final LocalStackContainer LOCALSTACK = IntegrationBase.createS3Container(); - private static String s3Prefix; - private S3Client s3Client; private String s3Endpoint; private BucketAccessor testBucketAccessor; - @Override - public String getS3Prefix() { - return s3Prefix; - } - @Override public S3Client getS3Client() { return s3Client; } - @BeforeAll - static void setUpAll() { - s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/"; - } - @BeforeEach void setupAWS() { s3Client = IntegrationBase.createS3Client(LOCALSTACK); @@ -118,7 +104,6 @@ private Map getConfig(final String topics, final int maxTasks) { config.put(AWS_SECRET_ACCESS_KEY_CONFIG, S3_SECRET_ACCESS_KEY); config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint); config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME); - config.put(AWS_S3_PREFIX_CONFIG, getS3Prefix()); config.put(TARGET_TOPIC_PARTITIONS, "0,1"); config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); @@ -146,14 +131,14 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final List offsetKeys = new ArrayList<>(); final List expectedKeys = new ArrayList<>(); // write 2 objects to s3 - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000")); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000")); - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001")); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001")); + expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0")); + expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0")); + expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1")); + expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1")); // we don't expext the empty one. offsetKeys.addAll(expectedKeys); - offsetKeys.add(writeToS3(topicName, new byte[0], "00003")); + offsetKeys.add(writeToS3(topicName, new byte[0], "3")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -165,10 +150,11 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig); - final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>()); + final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient); + TransformerFactory.getTransformer(InputFormat.BYTES), sourceClient, new HashDistributionStrategy(1), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); final HashSet seenKeys = new HashSet<>(); while (sourceRecordIterator.hasNext()) { @@ -183,8 +169,10 @@ void sourceRecordIteratorBytesTest(final TestInfo testInfo) { @Test void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); + final int maxTasks = 1; + final int taskId = 0; - final Map configData = getConfig(topicName, 1); + final Map configData = getConfig(topicName, maxTasks); configData.put(INPUT_FORMAT_KEY, InputFormat.AVRO.getValue()); configData.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); @@ -211,12 +199,12 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final Set offsetKeys = new HashSet<>(); - offsetKeys.add(writeToS3(topicName, outputStream1, "00001")); - offsetKeys.add(writeToS3(topicName, outputStream2, "00001")); + offsetKeys.add(writeToS3(topicName, outputStream1, "1")); + offsetKeys.add(writeToS3(topicName, outputStream2, "1")); - offsetKeys.add(writeToS3(topicName, outputStream3, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream4, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream5, "00002")); + offsetKeys.add(writeToS3(topicName, outputStream3, "2")); + offsetKeys.add(writeToS3(topicName, outputStream4, "2")); + offsetKeys.add(writeToS3(topicName, outputStream5, "2")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -228,10 +216,12 @@ void sourceRecordIteratorAvroTest(final TestInfo testInfo) throws IOException { final OffsetManager offsetManager = new OffsetManager(context, s3SourceConfig); - final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>()); + final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator sourceRecordIterator = new SourceRecordIterator(s3SourceConfig, offsetManager, - TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient); + TransformerFactory.getTransformer(InputFormat.AVRO), sourceClient, + new HashDistributionStrategy(maxTasks), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), taskId); final HashSet seenKeys = new HashSet<>(); final Map> seenRecords = new HashMap<>(); @@ -275,15 +265,15 @@ void verifyIteratorRehydration(final TestInfo testInfo) { final List actualKeys = new ArrayList<>(); // write 2 objects to s3 - expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000") + expectedKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0") .substring((OBJECT_KEY + SEPARATOR).length())); - expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000") + expectedKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0") .substring((OBJECT_KEY + SEPARATOR).length())); assertThat(testBucketAccessor.listObjects()).hasSize(2); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configData); - final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig, new HashSet<>()); + final AWSV2SourceClient sourceClient = new AWSV2SourceClient(s3SourceConfig); final Iterator iter = sourceClient.getS3ObjectIterator(null); assertThat(iter).hasNext(); @@ -296,7 +286,7 @@ void verifyIteratorRehydration(final TestInfo testInfo) { assertThat(actualKeys).containsAll(expectedKeys); // write 3rd object to s3 - expectedKeys.add(writeToS3(topicName, testData3.getBytes(StandardCharsets.UTF_8), "00000") + expectedKeys.add(writeToS3(topicName, testData3.getBytes(StandardCharsets.UTF_8), "0") .substring((OBJECT_KEY + SEPARATOR).length())); assertThat(testBucketAccessor.listObjects()).hasSize(3); diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index a8b91a197..fa4f60b76 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -102,8 +102,6 @@ static byte[] generateNextAvroMessagesStartingFromId(final int messageId, final S3Client getS3Client(); - String getS3Prefix(); - /** * Write file to s3 with the specified key and data. * @@ -134,8 +132,7 @@ default void writeToS3WithKey(final String objectKey, final byte[] testDataBytes * {@link io.aiven.kafka.connect.s3.source.utils.OffsetManager#SEPARATOR} */ default String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId) { - final String objectKey = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topicName - + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; + final String objectKey = topicName + "-" + partitionId + "-" + System.currentTimeMillis() + ".txt"; writeToS3WithKey(objectKey, testDataBytes); return OBJECT_KEY + SEPARATOR + objectKey; } diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index 083d8627e..ad31acc88 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -16,10 +16,13 @@ package io.aiven.kafka.connect.s3.source; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; +import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.SourceConfigFragment.OBJECT_DISTRIBUTION_STRATEGY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; @@ -27,6 +30,8 @@ import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_PREFIX_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG; +import static io.aiven.kafka.connect.s3.source.S3SourceTask.OBJECT_KEY; +import static io.aiven.kafka.connect.s3.source.utils.OffsetManager.SEPARATOR; import static java.util.Map.entry; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -36,8 +41,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -55,17 +58,21 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import io.aiven.kafka.connect.common.source.input.InputFormat; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.s3.source.testutils.BucketAccessor; import io.aiven.kafka.connect.s3.source.testutils.ContentUtils; import com.fasterxml.jackson.databind.JsonNode; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.localstack.LocalStackContainer; @@ -80,7 +87,6 @@ final class IntegrationTest implements IntegrationBase { private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationTest.class); private static final String CONNECTOR_NAME = "aiven-s3-source-connector"; - private static final String COMMON_PREFIX = "s3-source-connector-for-apache-kafka-test-"; private static final int OFFSET_FLUSH_INTERVAL_MS = 500; private static String s3Endpoint; @@ -95,22 +101,16 @@ final class IntegrationTest implements IntegrationBase { private ConnectRunner connectRunner; private static S3Client s3Client; + private TestInfo testInfo; @Override public S3Client getS3Client() { return s3Client; } - @Override - public String getS3Prefix() { - return s3Prefix; - } - public @BeforeAll static void setUpAll() throws IOException, InterruptedException { - s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/"; - s3Client = IntegrationBase.createS3Client(LOCALSTACK); s3Endpoint = LOCALSTACK.getEndpoint().toString(); testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET_NAME); @@ -122,6 +122,7 @@ public String getS3Prefix() { @BeforeEach void setUp(final TestInfo testInfo) throws Exception { testBucketAccessor.createBucket(); + this.testInfo = testInfo; connectRunner = new ConnectRunner(OFFSET_FLUSH_INTERVAL_MS); final List ports = IntegrationBase.getKafkaListenerPorts(); @@ -151,10 +152,25 @@ void tearDown() { testBucketAccessor.removeBucket(); } - @Test - void bytesTest(final TestInfo testInfo) { + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void bytesTest(final boolean addPrefix) { final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1); + final ObjectDistributionStrategy objectDistributionStrategy; + final int partitionId = 0; + final String prefixPattern = "topics/{{topic}}/partition={{partition}}/"; + String s3Prefix = ""; + if (addPrefix) { + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; + s3Prefix = "topics/" + topicName + "/partition=" + partitionId + "/"; + } else { + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; + } + + final String fileNamePatternSeparator = "_"; + + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, objectDistributionStrategy, + addPrefix, s3Prefix, prefixPattern, fileNamePatternSeparator); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); @@ -165,11 +181,15 @@ void bytesTest(final TestInfo testInfo) { final List offsetKeys = new ArrayList<>(); // write 2 objects to s3 - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00000")); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00000")); - offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "00001")); - offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "00001")); - offsetKeys.add(writeToS3(topicName, new byte[0], "00003")); + offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "0", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "0", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData1.getBytes(StandardCharsets.UTF_8), "1", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, testData2.getBytes(StandardCharsets.UTF_8), "1", s3Prefix, + fileNamePatternSeparator)); + offsetKeys.add(writeToS3(topicName, new byte[0], "3", s3Prefix, "-")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -190,7 +210,9 @@ void bytesTest(final TestInfo testInfo) { @Test void avroTest(final TestInfo testInfo) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getAvroConfig(topicName, InputFormat.AVRO); + final boolean addPrefix = false; + final Map connectorConfig = getAvroConfig(topicName, InputFormat.AVRO, addPrefix, "", "", + ObjectDistributionStrategy.OBJECT_HASH); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); @@ -215,12 +237,14 @@ void avroTest(final TestInfo testInfo) throws IOException { final Set offsetKeys = new HashSet<>(); - offsetKeys.add(writeToS3(topicName, outputStream1, "00001")); - offsetKeys.add(writeToS3(topicName, outputStream2, "00001")); + final String s3Prefix = ""; + + offsetKeys.add(writeToS3(topicName, outputStream1, "1", s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream2, "1", s3Prefix, "-")); - offsetKeys.add(writeToS3(topicName, outputStream3, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream4, "00002")); - offsetKeys.add(writeToS3(topicName, outputStream5, "00002")); + offsetKeys.add(writeToS3(topicName, outputStream3, "2", s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream4, "2", s3Prefix, "-")); + offsetKeys.add(writeToS3(topicName, outputStream5, "2", s3Prefix, "-")); assertThat(testBucketAccessor.listObjects()).hasSize(5); @@ -244,16 +268,26 @@ void avroTest(final TestInfo testInfo) throws IOException { connectRunner.getBootstrapServers()); } - @Test - void parquetTest(final TestInfo testInfo) throws IOException { + @ParameterizedTest + @ValueSource(booleans = { true, false }) + void parquetTest(final boolean addPrefix) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); - final String partition = "00000"; - final String fileName = org.apache.commons.lang3.StringUtils.defaultIfBlank(getS3Prefix(), "") + topicName + "-" - + partition + "-" + System.currentTimeMillis() + ".txt"; + final String partition = "0"; + final ObjectDistributionStrategy objectDistributionStrategy; + final String prefixPattern = "bucket/topics/{{topic}}/partition/{{partition}}/"; + String s3Prefix = ""; + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; + if (addPrefix) { + s3Prefix = "bucket/topics/" + topicName + "/partition/" + partition + "/"; + } + + final String fileName = (StringUtils.isNotBlank(s3Prefix) ? s3Prefix : "") + topicName + "-" + partition + "-" + + System.currentTimeMillis() + ".txt"; final String name = "testuser"; - final Map connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET); + final Map connectorConfig = getAvroConfig(topicName, InputFormat.PARQUET, addPrefix, s3Prefix, + prefixPattern, objectDistributionStrategy); connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); final Path path = ContentUtils.getTmpFilePath(name); @@ -275,8 +309,11 @@ void parquetTest(final TestInfo testInfo) throws IOException { .containsExactlyInAnyOrderElementsOf(expectedRecordNames); } - private Map getAvroConfig(final String topicName, final InputFormat inputFormat) { - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 4); + private Map getAvroConfig(final String topicName, final InputFormat inputFormat, + final boolean addPrefix, final String s3Prefix, final String prefixPattern, + final ObjectDistributionStrategy objectDistributionStrategy) { + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 4, objectDistributionStrategy, + addPrefix, s3Prefix, prefixPattern, "-"); connectorConfig.put(INPUT_FORMAT_KEY, inputFormat.getValue()); connectorConfig.put(SCHEMA_REGISTRY_URL, schemaRegistry.getSchemaRegistryUrl()); connectorConfig.put(VALUE_CONVERTER_KEY, "io.confluent.connect.avro.AvroConverter"); @@ -288,7 +325,8 @@ private Map getAvroConfig(final String topicName, final InputFor @Test void jsonTest(final TestInfo testInfo) { final var topicName = IntegrationBase.topicName(testInfo); - final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1); + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, + ObjectDistributionStrategy.PARTITION_IN_FILENAME, false, "", "", "-"); connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.JSONL.getValue()); connectorConfig.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.json.JsonConverter"); @@ -301,7 +339,7 @@ void jsonTest(final TestInfo testInfo) { } final byte[] jsonBytes = jsonBuilder.toString().getBytes(StandardCharsets.UTF_8); - final String offsetKey = writeToS3(topicName, jsonBytes, "00001"); + final String offsetKey = writeToS3(topicName, jsonBytes, "1", "", "-"); // Poll Json messages from the Kafka topic and deserialize them final List records = IntegrationBase.consumeJsonMessages(topicName, 500, @@ -316,25 +354,36 @@ void jsonTest(final TestInfo testInfo) { verifyOffsetPositions(Map.of(offsetKey, 500), connectRunner.getBootstrapServers()); } - private Map getConfig(final String connectorName, final String topics, final int maxTasks) { - final Map config = new HashMap<>(basicS3ConnectorConfig()); + private Map getConfig(final String connectorName, final String topics, final int maxTasks, + final ObjectDistributionStrategy taskDistributionConfig, final boolean addPrefix, final String s3Prefix, + final String prefixPattern, final String fileNameSeparator) { + final Map config = new HashMap<>(basicS3ConnectorConfig(addPrefix, s3Prefix)); config.put("name", connectorName); config.put(TARGET_TOPICS, topics); config.put("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put(VALUE_CONVERTER_KEY, "org.apache.kafka.connect.converters.ByteArrayConverter"); config.put("tasks.max", String.valueOf(maxTasks)); + config.put(OBJECT_DISTRIBUTION_STRATEGY, taskDistributionConfig.value()); + config.put(FILE_NAME_TEMPLATE_CONFIG, + "{{topic}}" + fileNameSeparator + "{{partition}}" + fileNameSeparator + "{{start_offset}}"); + if (addPrefix) { + config.put(FILE_PATH_PREFIX_TEMPLATE_CONFIG, prefixPattern); + } return config; } - private static Map basicS3ConnectorConfig() { + private static Map basicS3ConnectorConfig(final boolean addPrefix, final String s3Prefix) { final Map config = new HashMap<>(); config.put("connector.class", AivenKafkaConnectS3SourceConnector.class.getName()); config.put(AWS_ACCESS_KEY_ID_CONFIG, S3_ACCESS_KEY_ID); config.put(AWS_SECRET_ACCESS_KEY_CONFIG, S3_SECRET_ACCESS_KEY); config.put(AWS_S3_ENDPOINT_CONFIG, s3Endpoint); config.put(AWS_S3_BUCKET_NAME_CONFIG, TEST_BUCKET_NAME); - config.put(AWS_S3_PREFIX_CONFIG, s3Prefix); + if (addPrefix) { + config.put(AWS_S3_PREFIX_CONFIG, s3Prefix); + } config.put(TARGET_TOPIC_PARTITIONS, "0,1"); + return config; } @@ -351,4 +400,12 @@ static void verifyOffsetPositions(final Map expectedRecords, fin }); } } + + String writeToS3(final String topicName, final byte[] testDataBytes, final String partitionId, + final String s3Prefix, final String separator) { + final String objectKey = (StringUtils.isNotBlank(s3Prefix) ? s3Prefix : "") + topicName + separator + + partitionId + separator + System.currentTimeMillis() + ".txt"; + writeToS3WithKey(objectKey, testDataBytes); + return OBJECT_KEY + SEPARATOR + objectKey; + } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java index 1bfc55580..3ed3fdafd 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java @@ -16,17 +16,21 @@ package io.aiven.kafka.connect.s3.source; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.Set; +import java.util.regex.Pattern; import org.apache.kafka.connect.source.SourceRecord; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.AbstractSourceTask; import io.aiven.kafka.connect.common.source.input.Transformer; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.DistributionStrategy; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.PartitionDistributionStrategy; +import io.aiven.kafka.connect.common.source.task.enums.ObjectDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import io.aiven.kafka.connect.s3.source.utils.AWSV2SourceClient; import io.aiven.kafka.connect.s3.source.utils.OffsetManager; @@ -63,12 +67,13 @@ public class S3SourceTask extends AbstractSourceTask { /** The AWS Source client */ private AWSV2SourceClient awsv2SourceClient; - /** The list of failed object keys */ - private final Set failedObjectKeys = new HashSet<>(); /** The offset manager this task uses */ private OffsetManager offsetManager; private S3SourceConfig s3SourceConfig; + private int taskId; + private Pattern filePattern; + public S3SourceTask() { super(LOGGER); } @@ -130,9 +135,9 @@ protected SourceCommonConfig configure(final Map props) { this.s3SourceConfig = new S3SourceConfig(props); this.transformer = s3SourceConfig.getTransformer(); offsetManager = new OffsetManager(context, s3SourceConfig); - awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig, failedObjectKeys); - setS3SourceRecordIterator( - new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, awsv2SourceClient)); + awsv2SourceClient = new AWSV2SourceClient(s3SourceConfig); + setS3SourceRecordIterator(new SourceRecordIterator(s3SourceConfig, offsetManager, this.transformer, + awsv2SourceClient, initializeObjectDistributionStrategy(), filePattern, taskId)); return s3SourceConfig; } @@ -173,4 +178,23 @@ protected void closeResources() { public Transformer getTransformer() { return transformer; } + + private DistributionStrategy initializeObjectDistributionStrategy() { + final ObjectDistributionStrategy objectDistributionStrategy = s3SourceConfig.getObjectDistributionStrategy(); + final int maxTasks = Integer.parseInt(s3SourceConfig.originals().get("tasks.max").toString()); + this.taskId = Integer.parseInt(s3SourceConfig.originals().get("task.id").toString()) % maxTasks; + DistributionStrategy distributionStrategy; + + if (objectDistributionStrategy == ObjectDistributionStrategy.PARTITION_IN_FILENAME) { + this.filePattern = FilePatternUtils + .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + distributionStrategy = new PartitionDistributionStrategy(maxTasks); + } else { + this.filePattern = FilePatternUtils + .configurePattern(s3SourceConfig.getS3FileNameFragment().getFilenameTemplate().toString()); + distributionStrategy = new HashDistributionStrategy(maxTasks); + } + + return distributionStrategy; + } } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index 23dc69e9e..ebcffdba5 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -42,9 +42,11 @@ final public class S3SourceConfig extends SourceCommonConfig { public static final Logger LOGGER = LoggerFactory.getLogger(S3SourceConfig.class); private final S3ConfigFragment s3ConfigFragment; + private final FileNameFragment s3FileNameFragment; public S3SourceConfig(final Map properties) { super(configDef(), handleDeprecatedYyyyUppercase(properties)); s3ConfigFragment = new S3ConfigFragment(this); + s3FileNameFragment = new FileNameFragment(this); validate(); // NOPMD ConstructorCallsOverridableMethod getStsRole is called } @@ -129,4 +131,8 @@ public S3ConfigFragment getS3ConfigFragment() { return s3ConfigFragment; } + public FileNameFragment getS3FileNameFragment() { + return s3FileNameFragment; + } + } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java index ed460a500..d9dbc0d45 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java @@ -17,10 +17,8 @@ package io.aiven.kafka.connect.s3.source.utils; import java.io.InputStream; -import java.util.HashSet; import java.util.Iterator; import java.util.Objects; -import java.util.Set; import java.util.function.Predicate; import java.util.stream.Stream; @@ -29,8 +27,6 @@ import org.apache.commons.io.function.IOSupplier; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -44,26 +40,19 @@ */ public class AWSV2SourceClient { - private static final Logger LOGGER = LoggerFactory.getLogger(AWSV2SourceClient.class); public static final int PAGE_SIZE_FACTOR = 2; private final S3SourceConfig s3SourceConfig; private final S3Client s3Client; private final String bucketName; private Predicate filterPredicate = s3Object -> s3Object.size() > 0; - private final Set failedObjectKeys; - - private final int taskId; - private final int maxTasks; /** * @param s3SourceConfig * configuration for Source connector - * @param failedObjectKeys - * all objectKeys which have already been tried but have been unable to process. */ - public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set failedObjectKeys) { - this(new S3ClientFactory().createAmazonS3Client(s3SourceConfig), s3SourceConfig, failedObjectKeys); + public AWSV2SourceClient(final S3SourceConfig s3SourceConfig) { + this(new S3ClientFactory().createAmazonS3Client(s3SourceConfig), s3SourceConfig); } /** @@ -73,47 +62,11 @@ public AWSV2SourceClient(final S3SourceConfig s3SourceConfig, final Set * amazonS3Client * @param s3SourceConfig * configuration for Source connector - * @param failedObjectKeys - * all objectKeys which have already been tried but have been unable to process. */ - AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig, - final Set failedObjectKeys) { + AWSV2SourceClient(final S3Client s3Client, final S3SourceConfig s3SourceConfig) { this.s3SourceConfig = s3SourceConfig; this.s3Client = s3Client; this.bucketName = s3SourceConfig.getAwsS3BucketName(); - this.failedObjectKeys = new HashSet<>(failedObjectKeys); - - // TODO the code below should be configured in some sort of taks assignement method/process/call. - int maxTasks; - try { - final Object value = s3SourceConfig.originals().get("tasks.max"); - if (value == null) { - LOGGER.info("Setting tasks.max to 1"); - maxTasks = 1; - } else { - maxTasks = Integer.parseInt(value.toString()); - } - } catch (NumberFormatException e) { // NOPMD catch null pointer - LOGGER.warn("Invalid tasks.max: {}", e.getMessage()); - LOGGER.info("Setting tasks.max to 1"); - maxTasks = 1; - } - this.maxTasks = maxTasks; - int taskId; - try { - final Object value = s3SourceConfig.originals().get("task.id"); - if (value == null) { - LOGGER.info("Setting task.id to 0"); - taskId = 0; - } else { - taskId = Integer.parseInt(value.toString()) % maxTasks; - } - } catch (NumberFormatException e) { // NOPMD catch null pointer - LOGGER.warn("Invalid task.id: {}", e.getMessage()); - LOGGER.info("Setting task.id to 0"); - taskId = 0; - } - this.taskId = taskId; } /** @@ -142,12 +95,7 @@ private Stream getS3ObjectStream(final String startToken) { return null; } - }) - .flatMap(response -> response.contents() - .stream() - .filter(filterPredicate) - .filter(objectSummary -> assignObjectToTask(objectSummary.key())) - .filter(objectSummary -> !failedObjectKeys.contains(objectSummary.key()))); + }).flatMap(response -> response.contents().stream().filter(filterPredicate)); } /** @@ -180,23 +128,14 @@ public IOSupplier getObject(final String objectKey) { return s3ObjectResponse::asInputStream; } - public void addFailedObjectKeys(final String objectKey) { - this.failedObjectKeys.add(objectKey); - } - - public void setFilterPredicate(final Predicate predicate) { - filterPredicate = predicate; - } - - private boolean assignObjectToTask(final String objectKey) { - final int taskAssignment = Math.floorMod(objectKey.hashCode(), maxTasks); - return taskAssignment == taskId; - } - public void shutdown() { s3Client.close(); } + public void addPredicate(final Predicate objectPredicate) { + this.filterPredicate = this.filterPredicate.and(objectPredicate); + } + /** * An iterator that reads from */ diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java index e945c2565..cab511693 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessor.java @@ -42,7 +42,6 @@ public static SourceRecord createSourceRecord(final S3SourceRecord s3SourceRecor if (ErrorsTolerance.NONE.equals(s3SourceConfig.getErrorsTolerance())) { throw new ConnectException("Data Exception caught during S3 record to source record transformation", e); } else { - sourceClient.addFailedObjectKeys(s3SourceRecord.getObjectKey()); LOGGER.warn( "Data Exception caught during S3 record to source record transformation {} . errors.tolerance set to 'all', logging warning and continuing to process.", e.getMessage(), e); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index bded51d1b..820be20aa 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -19,9 +19,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; +import java.util.Optional; import java.util.function.Function; -import java.util.function.Predicate; -import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Stream; @@ -29,9 +28,10 @@ import io.aiven.kafka.connect.common.source.input.ByteArrayTransformer; import io.aiven.kafka.connect.common.source.input.Transformer; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.DistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; -import org.apache.commons.collections4.IteratorUtils; import software.amazon.awssdk.services.s3.model.S3Object; /** @@ -39,11 +39,6 @@ * Parquet). */ public final class SourceRecordIterator implements Iterator { - public static final String PATTERN_TOPIC_KEY = "topicName"; - public static final String PATTERN_PARTITION_KEY = "partitionId"; - - public static final Pattern FILE_DEFAULT_PATTERN = Pattern.compile("(?[^/]+?)-" - + "(?\\d{5})-" + "(?[a-zA-Z0-9]+)" + "\\.(?[^.]+)$"); // topic-00001.txt public static final long BYTES_TRANSFORMATION_NUM_OF_RECS = 1L; private final OffsetManager offsetManager; @@ -59,25 +54,17 @@ public final class SourceRecordIterator implements Iterator { private String topic; private int partitionId; + private final DistributionStrategy distributionStrategy; + private final int taskId; + private final Iterator inner; private Iterator outer; - - private final Predicate fileNamePredicate = s3Object -> { - - final Matcher fileMatcher = FILE_DEFAULT_PATTERN.matcher(s3Object.key()); - - if (fileMatcher.find()) { - // TODO move this from the SourceRecordIterator so that we can decouple it from S3 and make it API agnostic - topic = fileMatcher.group(PATTERN_TOPIC_KEY); - partitionId = Integer.parseInt(fileMatcher.group(PATTERN_PARTITION_KEY)); - return true; - } - return false; - }; + private final Pattern filePattern; public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetManager offsetManager, - final Transformer transformer, final AWSV2SourceClient sourceClient) { + final Transformer transformer, final AWSV2SourceClient sourceClient, + final DistributionStrategy distributionStrategy, final Pattern filePattern, final int taskId) { super(); this.s3SourceConfig = s3SourceConfig; this.offsetManager = offsetManager; @@ -85,13 +72,35 @@ public SourceRecordIterator(final S3SourceConfig s3SourceConfig, final OffsetMan this.bucketName = s3SourceConfig.getAwsS3BucketName(); this.transformer = transformer; this.sourceClient = sourceClient; + this.filePattern = filePattern; + this.distributionStrategy = distributionStrategy; + this.taskId = taskId; + + // Initialize predicates + sourceClient.addPredicate(this::isFileMatchingPattern); + sourceClient.addPredicate(this::isFileAssignedToTask); // call filters out bad file names and extracts topic/partition - inner = IteratorUtils.filteredIterator(sourceClient.getS3ObjectIterator(null), - s3Object -> this.fileNamePredicate.test(s3Object)); + inner = sourceClient.getS3ObjectIterator(null); outer = Collections.emptyIterator(); } + public boolean isFileMatchingPattern(final S3Object s3Object) { + final Optional optionalTopic = FilePatternUtils.getTopic(filePattern, s3Object.key()); + final Optional optionalPartitionId = FilePatternUtils.getPartitionId(filePattern, s3Object.key()); + + if (optionalTopic.isPresent() && optionalPartitionId.isPresent()) { + topic = optionalTopic.get(); + partitionId = optionalPartitionId.get(); + return true; + } + return false; + } + + public boolean isFileAssignedToTask(final S3Object s3Object) { + return distributionStrategy.isPartOfTask(taskId, s3Object.key(), filePattern); + } + @Override public boolean hasNext() { while (!outer.hasNext() && inner.hasNext()) { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index 944ccbfdf..c915376c9 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -174,6 +174,7 @@ private void setBasicProperties() { properties.putIfAbsent("key.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); properties.putIfAbsent("value.converter", "org.apache.kafka.connect.converters.ByteArrayConverter"); properties.putIfAbsent("tasks.max", "1"); + properties.put("task.id", "1"); properties.putIfAbsent("connector.class", AivenKafkaConnectS3SourceConnector.class.getName()); properties.putIfAbsent(TARGET_TOPIC_PARTITIONS, "0,1"); properties.putIfAbsent(TARGET_TOPICS, "testtopic"); diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java index beed0681c..1a160d780 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java @@ -35,8 +35,6 @@ import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import software.amazon.awssdk.services.s3.S3Client; @@ -53,19 +51,16 @@ class AWSV2SourceClientTest { @Captor ArgumentCaptor requestCaptor; - private static Map getConfigMap(final int maxTasks, final int taskId) { + private static Map getConfigMap() { final Map configMap = new HashMap<>(); - configMap.put("tasks.max", String.valueOf(maxTasks)); - configMap.put("task.id", String.valueOf(taskId)); configMap.put(AWS_S3_BUCKET_NAME_CONFIG, "test-bucket"); return configMap; } - @ParameterizedTest - @CsvSource({ "3, 1" }) - void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) { - initializeWithTaskConfigs(maxTasks, taskId); + @Test + void testFetchObjectSummariesWithNoObjects() { + initializeWithTaskConfigs(); final ListObjectsV2Response listObjectsV2Response = createListObjectsV2Response(Collections.emptyList(), null); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); @@ -73,54 +68,31 @@ void testFetchObjectSummariesWithNoObjects(final int maxTasks, final int taskId) assertThat(summaries).isExhausted(); } - @ParameterizedTest - @CsvSource({ "1, 0" }) - void testFetchObjectSummariesWithOneObjectWithBasicConfig(final int maxTasks, final int taskId) { + @Test + void testFetchObjectSummariesWithOneObjectWithBasicConfig() { final String objectKey = "any-key"; - initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectKeysIterator(objectKey); - assertThat(summaries).hasNext(); - } - - @ParameterizedTest - @CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, - final String objectKey) { - initializeWithTaskConfigs(maxTasks, taskId); + initializeWithTaskConfigs(); final Iterator summaries = getS3ObjectKeysIterator(objectKey); assertThat(summaries).hasNext(); } - @ParameterizedTest - @CsvSource({ "4, 1, key1", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", - "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) - void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, - final String objectKey) { - initializeWithTaskConfigs(maxTasks, taskId); - final Iterator summaries = getS3ObjectKeysIterator(objectKey); - - assertThat(summaries).isExhausted(); - } - - @ParameterizedTest - @CsvSource({ "4, 3", "4, 0" }) - void testFetchObjectSummariesWithZeroByteObject(final int maxTasks, final int taskId) { - initializeWithTaskConfigs(maxTasks, taskId); + @Test + void testFetchObjectSummariesWithZeroByteObject() { + initializeWithTaskConfigs(); final ListObjectsV2Response listObjectsV2Response = getListObjectsV2Response(); when(s3Client.listObjectsV2(any(ListObjectsV2Request.class))).thenReturn(listObjectsV2Response); final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); - // assigned 1 object to taskid - assertThat(summaries).hasNext(); + assertThat(summaries.next()).isNotBlank(); assertThat(summaries.next()).isNotBlank(); assertThat(summaries).isExhausted(); } @Test void testFetchObjectSummariesWithPagination() throws IOException { - initializeWithTaskConfigs(4, 3); + initializeWithTaskConfigs(); final S3Object object1 = createObjectSummary(1, "key1"); final S3Object object2 = createObjectSummary(2, "key2"); final List firstBatch = List.of(object1); @@ -134,19 +106,19 @@ void testFetchObjectSummariesWithPagination() throws IOException { final Iterator summaries = awsv2SourceClient.getListOfObjectKeys(null); verify(s3Client, times(1)).listObjectsV2(any(ListObjectsV2Request.class)); assertThat(summaries.next()).isNotNull(); - assertThat(summaries).isExhausted(); + assertThat(summaries.next()).isNotNull(); } @Test void testFetchObjectWithPrefix() { - final Map configMap = getConfigMap(1, 0); + final Map configMap = getConfigMap(); configMap.put(AWS_S3_PREFIX_CONFIG, "test/"); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3Object object1 = createObjectSummary(1, "key1"); - final S3Object object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "topics/key1/1/key1.txt"); + final S3Object object2 = createObjectSummary(1, "topics/key2/2/key2.txt"); final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); @@ -167,19 +139,18 @@ void testFetchObjectWithPrefix() { // Not required with continuation token assertThat(allRequests.get(1).prefix()).isNull(); assertThat(allRequests.get(1).continuationToken()).isEqualTo("nextToken"); - } @Test void testFetchObjectWithInitialStartAfter() { - final Map configMap = getConfigMap(1, 0); + final Map configMap = getConfigMap(); final String startAfter = "file-option-1-12000.txt"; final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig); requestCaptor = ArgumentCaptor.forClass(ListObjectsV2Request.class); - final S3Object object1 = createObjectSummary(1, "key1"); - final S3Object object2 = createObjectSummary(1, "key2"); + final S3Object object1 = createObjectSummary(1, "key1-1-10000"); + final S3Object object2 = createObjectSummary(1, "key2-2-20000"); final ListObjectsV2Response firstResult = createListObjectsV2Response(List.of(object1), "nextToken"); final ListObjectsV2Response secondResult = createListObjectsV2Response(List.of(object2), null); @@ -227,12 +198,11 @@ private Iterator getS3ObjectKeysIterator(final String objectKey) { return awsv2SourceClient.getListOfObjectKeys(null); } - public void initializeWithTaskConfigs(final int maxTasks, final int taskId) { - final Map configMap = getConfigMap(maxTasks, taskId); + private void initializeWithTaskConfigs() { + final Map configMap = getConfigMap(); final S3SourceConfig s3SourceConfig = new S3SourceConfig(configMap); s3Client = mock(S3Client.class); - awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig, Collections.emptySet()); - + awsv2SourceClient = new AWSV2SourceClient(s3Client, s3SourceConfig); } private ListObjectsV2Response getListObjectsV2Response() { diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index af9b679fa..f7559ddfd 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,6 +16,8 @@ package io.aiven.kafka.connect.s3.source.utils; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; +import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_TOPIC_KEY; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyMap; @@ -35,6 +37,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Iterator; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Stream; import org.apache.kafka.connect.data.SchemaAndValue; @@ -44,10 +49,14 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; import io.aiven.kafka.connect.common.source.input.Transformer; import io.aiven.kafka.connect.common.source.input.TransformerFactory; +import io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils; +import io.aiven.kafka.connect.common.source.task.HashDistributionStrategy; import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import software.amazon.awssdk.services.s3.model.S3Object; final class SourceRecordIteratorTest { @@ -78,35 +87,38 @@ void testIteratorProcessesS3Objects() throws Exception { mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + final Pattern filePattern = mock(Pattern.class); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Collections.emptyIterator()); Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, - mockSourceApiClient); + mockSourceApiClient, new HashDistributionStrategy(1), + FilePatternUtils.configurePattern("{{topic}}-{{partition}}-{{start_offset}}"), 0); assertThat(iterator.hasNext()).isFalse(); + mockPatternMatcher(filePattern); final S3Object obj = S3Object.builder().key(key).build(); final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); - iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient); + iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, + new HashDistributionStrategy(1), filePattern, 0); - assertThat(iterator).hasNext(); + assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); - assertThat(iterator).isExhausted(); } } @Test void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { - final String key = "topic-00001-abc123.txt"; final S3Object s3Object = S3Object.builder().key(key).build(); // With ByteArrayTransformer try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); + final Pattern filePattern = mock(Pattern.class); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); @@ -120,10 +132,11 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { .thenReturn(Collections.singletonList(key).listIterator()); when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + mockPatternMatcher(filePattern); // should skip if any records were produced by source record iterator. final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient); + mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isFalse(); verify(mockSourceApiClient, never()).getObject(any()); verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); @@ -132,6 +145,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { // With AvroTransformer try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { when(mockSourceApiClient.getObject(key)).thenReturn(() -> inputStream); + final Pattern filePattern = mock(Pattern.class); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); mockTransformer = mock(AvroTransformer.class); when(mockSourceApiClient.getListOfObjectKeys(any())) @@ -139,18 +153,73 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { when(mockOffsetManager.recordsProcessedForObjectKey(anyMap(), anyString())) .thenReturn(BYTES_TRANSFORMATION_NUM_OF_RECS); + mockPatternMatcher(filePattern); when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, - mockTransformer, mockSourceApiClient); - assertThat(iterator.hasNext()).isTrue(); - iterator.next(); + mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); + assertThat(iterator.hasNext()).isFalse(); - verify(mockTransformer, times(1)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + verify(mockTransformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); } } + @ParameterizedTest + @CsvSource({ "4, 2, key1", "4, 3, key2", "4, 0, key3", "4, 1, key4" }) + void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdAssigned(final int maxTasks, final int taskId, + final String objectKey) { + + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + final Pattern filePattern = mock(Pattern.class); + + mockPatternMatcher(filePattern); + + final S3Object obj = S3Object.builder().key(objectKey).build(); + + final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); + when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); + when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); + final Predicate s3ObjectPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) + && iterator.isFileAssignedToTask(s3Object); + // Assert + assertThat(s3ObjectPredicate).accepts(obj); + } + + @ParameterizedTest + @CsvSource({ "4, 1, topic1-2-0", "4, 3, key1", "4, 0, key1", "4, 1, key2", "4, 2, key2", "4, 0, key2", "4, 1, key3", + "4, 2, key3", "4, 3, key3", "4, 0, key4", "4, 2, key4", "4, 3, key4" }) + void testFetchObjectSummariesWithOneNonZeroByteObjectWithTaskIdUnassigned(final int maxTasks, final int taskId, + final String objectKey) { + mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); + final Pattern filePattern = mock(Pattern.class); + + mockPatternMatcher(filePattern); + + final S3Object obj = S3Object.builder().key(objectKey).build(); + + final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); + when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); + when(mockSourceApiClient.getObject(any())).thenReturn(() -> bais); + final SourceRecordIterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, + mockSourceApiClient, new HashDistributionStrategy(maxTasks), filePattern, taskId); + final Predicate stringPredicate = s3Object -> iterator.isFileMatchingPattern(s3Object) + && iterator.isFileAssignedToTask(s3Object); + // Assert + assertThat(stringPredicate.test(obj)).as("Predicate should accept the objectKey: " + objectKey).isFalse(); + } + + private static void mockPatternMatcher(final Pattern filePattern) { + final Matcher fileMatcher = mock(Matcher.class); + when(filePattern.matcher(any())).thenReturn(fileMatcher); + when(fileMatcher.find()).thenReturn(true); + when(fileMatcher.group(PATTERN_TOPIC_KEY)).thenReturn("testtopic"); + when(fileMatcher.group(PATTERN_PARTITION_KEY)).thenReturn("0"); + } }