From a453a270577aefcb6587a2a3312138edeee3d29d Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Fri, 10 Jan 2025 13:51:51 +0000 Subject: [PATCH 1/2] Allow users to varify the size of the byteArrayBuffer Signed-off-by: Aindriu Lavelle --- .../common/config/SchemaRegistryFragment.java | 23 ++++++++ .../common/config/SourceCommonConfig.java | 4 ++ .../common/source/input/AvroTransformer.java | 9 ++-- .../source/input/ByteArrayTransformer.java | 18 ++++--- .../common/source/input/JsonTransformer.java | 9 ++-- .../source/input/ParquetTransformer.java | 8 +-- .../common/source/input/Transformer.java | 12 +++-- .../config/SchemaRegistryFragmentTest.java | 54 +++++++++++++++++++ .../input/ByteArrayTransformerTest.java | 38 ++++++++++++- .../input/TransformerStreamingTest.java | 30 ++++++----- .../utils/SourceRecordIteratorTest.java | 1 + 11 files changed, 168 insertions(+), 38 deletions(-) create mode 100644 commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java index 8ea7b7f95..48d5e139f 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import io.aiven.kafka.connect.common.source.input.InputFormat; @@ -30,6 +31,8 @@ public final class SchemaRegistryFragment extends ConfigFragment { public static final String AVRO_VALUE_SERIALIZER = "value.serializer"; public static final String INPUT_FORMAT_KEY = "input.format"; public static final String SCHEMAS_ENABLE = "schemas.enable"; + public static final String BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE = "byte.array.transformer.max.buffer.size"; + private static final int MAX_BUFFER_SIZE = 4096; /** * Construct the ConfigFragment.. @@ -53,6 +56,10 @@ public static ConfigDef update(final ConfigDef configDef) { new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD ConfigDef.Width.NONE, INPUT_FORMAT_KEY); + configDef.define(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, MAX_BUFFER_SIZE, + new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM, + "Max Size of the byte buffer when using the BYTE Transformer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + ConfigDef.Width.NONE, INPUT_FORMAT_KEY); configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, "Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD @@ -73,4 +80,20 @@ public Class getAvroValueSerializer() { return cfg.getClass(AVRO_VALUE_SERIALIZER); } + public int getByteArrayTransformerMaxBufferSize() { + return cfg.getInt(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE); + } + + private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + + final int size = (int) value; + if (size <= 0) { + throw new ConfigException(String.format("%s must be larger then 0", name)); + } + + } + } + } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java index 2c9cafe61..db6f2d15e 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -82,4 +82,8 @@ public Transformer getTransformer() { return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat()); } + public int getByteArrayTransformerMaxBufferSize() { + return schemaRegistryFragment.getByteArrayTransformerMaxBufferSize(); + } + } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java index 760d074d2..35039c083 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java @@ -24,10 +24,11 @@ import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; + import io.confluent.connect.avro.AvroData; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -49,13 +50,13 @@ public class AvroTransformer extends Transformer { } @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { + public void configureValueConverter(final Map config, final SourceCommonConfig sourceConfig) { config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL)); } @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final AbstractConfig sourceConfig) { + final int topicPartition, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { private DataFileStream dataFileStream; private final DatumReader datumReader = new GenericDatumReader<>(); @@ -91,7 +92,7 @@ protected boolean doAdvance(final Consumer action) { @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java index 232aaef24..98e01b15e 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java @@ -23,9 +23,10 @@ import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; + import org.apache.commons.io.IOUtils; import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; @@ -34,16 +35,17 @@ public class ByteArrayTransformer extends Transformer { private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class); - private static final int MAX_BUFFER_SIZE = 4096; - @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { + public void configureValueConverter(final Map config, final SourceCommonConfig sourceConfig) { // For byte array transformations, ByteArrayConverter is the converter which is the default config. + } @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final AbstractConfig sourceConfig) { + final int topicPartition, final SourceCommonConfig sourceConfig) { + // The max buffer size for the byte array the default is 4096 if not set by the user. + final int maxBufferSize = sourceConfig.getByteArrayTransformerMaxBufferSize(); return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { @Override protected InputStream inputOpened(final InputStream input) { @@ -57,13 +59,13 @@ protected void doClose() { @Override protected boolean doAdvance(final Consumer action) { - final byte[] buffer = new byte[MAX_BUFFER_SIZE]; + final byte[] buffer = new byte[maxBufferSize]; try { final int bytesRead = IOUtils.read(inputStream, buffer); if (bytesRead == 0) { return false; } - if (bytesRead < MAX_BUFFER_SIZE) { + if (bytesRead < maxBufferSize) { action.accept(new SchemaAndValue(null, Arrays.copyOf(buffer, bytesRead))); } else { action.accept(new SchemaAndValue(null, buffer)); @@ -79,7 +81,7 @@ protected boolean doAdvance(final Consumer action) { @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index 8069d08c1..8228038e9 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -24,10 +24,11 @@ import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; + import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.io.function.IOSupplier; import org.apache.commons.lang3.StringUtils; @@ -48,12 +49,12 @@ public class JsonTransformer extends Transformer { } @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { + public void configureValueConverter(final Map config, final SourceCommonConfig sourceConfig) { } @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final AbstractConfig sourceConfig) { + final int topicPartition, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { BufferedReader reader; @@ -99,7 +100,7 @@ public boolean doAdvance(final Consumer action) { @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java index 2c47d5103..eb4b1abf8 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java @@ -29,9 +29,9 @@ import java.util.Map; import java.util.function.Consumer; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.source.input.parquet.LocalInputFile; import io.confluent.connect.avro.AvroData; @@ -55,19 +55,19 @@ public class ParquetTransformer extends Transformer { } @Override - public void configureValueConverter(final Map config, final AbstractConfig sourceConfig) { + public void configureValueConverter(final Map config, final SourceCommonConfig sourceConfig) { config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL)); } @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, - final AbstractConfig sourceConfig) { + final SourceCommonConfig sourceConfig) { return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8)); } @Override public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final AbstractConfig sourceConfig) { + final int topicPartition, final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java index 09e8c0ca5..7dfc5f327 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java @@ -24,18 +24,20 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.connect.data.SchemaAndValue; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; + import org.apache.commons.io.function.IOSupplier; import org.slf4j.Logger; public abstract class Transformer { - public abstract void configureValueConverter(Map config, AbstractConfig sourceConfig); + public abstract void configureValueConverter(Map config, SourceCommonConfig sourceConfig); public final Stream getRecords(final IOSupplier inputStreamIOSupplier, - final String topic, final int topicPartition, final AbstractConfig sourceConfig, final long skipRecords) { + final String topic, final int topicPartition, final SourceCommonConfig sourceConfig, + final long skipRecords) { final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition, sourceConfig); @@ -56,9 +58,9 @@ public final Stream getRecords(final IOSupplier inp * @return a StreamSpliterator instance. */ protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, String topic, - int topicPartition, AbstractConfig sourceConfig); + int topicPartition, SourceCommonConfig sourceConfig); - public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, AbstractConfig sourceConfig); + public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, SourceCommonConfig sourceConfig); /** * A Spliterator that performs various checks on the opening/closing of the input stream. diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java new file mode 100644 index 000000000..6c12a3b93 --- /dev/null +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.common.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import org.junit.jupiter.api.Test; + +public class SchemaRegistryFragmentTest {// NOPMD + + @Test + void validateCorrectBufferSizeIsAccepted() { + final int bufferSize = 50; + final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef()); + final Map props = new HashMap<>(); + props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, bufferSize); + + final SchemaRegistryFragment schemaReg = new SchemaRegistryFragment(new AbstractConfig(configDef, props)); + assertThat(schemaReg.getByteArrayTransformerMaxBufferSize()).isEqualTo(bufferSize); + } + + @Test + void validateInvalidBufferSizeThrowsConfigException() { + final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef()); + final Map props = new HashMap<>(); + props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, 0); + + assertThatThrownBy(() -> new SchemaRegistryFragment(new AbstractConfig(configDef, props))) + .isInstanceOf(ConfigException.class); + } + +} diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java index 80820e13b..4d8aed153 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java @@ -17,6 +17,9 @@ package io.aiven.kafka.connect.common.source.input; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -29,9 +32,12 @@ import io.aiven.kafka.connect.common.config.SourceCommonConfig; import org.apache.commons.io.function.IOSupplier; +import org.apache.http.util.ByteArrayBuffer; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -54,7 +60,7 @@ void testGetRecordsSingleChunk() { final byte[] data = { 1, 2, 3, 4, 5 }; final InputStream inputStream = new ByteArrayInputStream(data); final IOSupplier inputStreamIOSupplier = () -> inputStream; - + when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(4096); final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, sourceCommonConfig, 0); @@ -74,4 +80,34 @@ void testGetRecordsEmptyInputStream() { assertThat(records).hasSize(0); } + + /** + * @param maxBufferSize + * the maximum buffer size + * @param numberOfExpectedRecords + * the number of records the byte array is split into based off the max buffer size + */ + @ParameterizedTest + @CsvSource({ "1,10", "2,5", "3,4", "4,3", "5,2", "6,2", "7,2", "8,2", "9,2", "10,1", "11,1", "12,1" }) + void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int numberOfExpectedRecords) { + final byte[] data = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + final InputStream inputStream = new ByteArrayInputStream(data); + final IOSupplier inputStreamIOSupplier = () -> inputStream; + when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(maxBufferSize); + + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, + sourceCommonConfig, 0); + + final List recs = records.collect(Collectors.toList()); + assertThat(recs).hasSize(numberOfExpectedRecords); + final ByteArrayBuffer processedData = new ByteArrayBuffer(10); + + recs.forEach(rec -> { + final byte[] val = (byte[]) rec.value(); + processedData.append(val, 0, val.length); + }); + assertThat(processedData.buffer()).isEqualTo(data); + // Should only get called once per splitIterator + verify(sourceCommonConfig, times(1)).getByteArrayTransformerMaxBufferSize(); + } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java index 73b27b01f..a4b3c522a 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.common.source.input; +import static io.aiven.kafka.connect.common.config.OutputFormatFragmentFixture.OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -32,11 +33,12 @@ import java.util.List; import java.util.stream.Stream; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.data.SchemaAndValue; -import io.aiven.kafka.connect.common.config.CommonConfig; +import io.aiven.kafka.connect.common.config.OutputFormatFragment; +import io.aiven.kafka.connect.common.config.SchemaRegistryFragment; +import io.aiven.kafka.connect.common.config.SourceCommonConfig; import org.apache.commons.io.function.IOSupplier; import org.junit.jupiter.params.ParameterizedTest; @@ -50,8 +52,8 @@ class TransformerStreamingTest { @ParameterizedTest @MethodSource("testData") - void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData, final AbstractConfig config, - final int expectedCount) throws IOException { + void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] testData, + final SourceCommonConfig config, final int expectedCount) throws IOException { final IOSupplier ioSupplier = mock(IOSupplier.class); when(ioSupplier.get()).thenThrow(new IOException("Test IOException during initialization")); final Stream objStream = transformer.getRecords(ioSupplier, "topic", 1, config, 0); @@ -60,8 +62,8 @@ void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] tes @ParameterizedTest @MethodSource("testData") - void verifyExceptionDuringRead(final Transformer transformer, final byte[] testData, final AbstractConfig config, - final int expectedCount) throws IOException { + void verifyExceptionDuringRead(final Transformer transformer, final byte[] testData, + final SourceCommonConfig config, final int expectedCount) throws IOException { try (InputStream inputStream = mock(InputStream.class)) { when(inputStream.read()).thenThrow(new IOException("Test IOException during read")); when(inputStream.read(any())).thenThrow(new IOException("Test IOException during read")); @@ -81,7 +83,7 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD @ParameterizedTest @MethodSource("testData") - void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final AbstractConfig config, + void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); @@ -93,7 +95,7 @@ void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData @ParameterizedTest @MethodSource("testData") void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] testData, - final AbstractConfig config, final int expectedCount) throws IOException { + final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); final Iterator iter = objStream.iterator(); @@ -108,19 +110,23 @@ void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] static Stream testData() throws IOException { final List lst = new ArrayList<>(); + final var props = new HashMap<>(); + props.put(FORMAT_OUTPUT_TYPE_CONFIG.key(), "avro"); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.AVRO), AvroTransformerTest.generateMockAvroData(100).toByteArray(), - new CommonConfig(new ConfigDef(), new HashMap<>()) { + new SourceCommonConfig(OutputFormatFragment.update(new ConfigDef(), null), props) { }, 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.BYTES), - "Hello World".getBytes(StandardCharsets.UTF_8), new CommonConfig(new ConfigDef(), new HashMap<>()) { + "Hello World".getBytes(StandardCharsets.UTF_8), new SourceCommonConfig( + SchemaRegistryFragment.update(OutputFormatFragment.update(new ConfigDef(), null)), props) { }, 1)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.JSONL), JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8), - new CommonConfig(new ConfigDef(), new HashMap<>()) { + new SourceCommonConfig(OutputFormatFragment.update(new ConfigDef(), null), props) { }, 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.PARQUET), - ParquetTransformerTest.generateMockParquetData(), new CommonConfig(new ConfigDef(), new HashMap<>()) { + ParquetTransformerTest.generateMockParquetData(), + new SourceCommonConfig(OutputFormatFragment.update(new ConfigDef(), null), props) { }, 100)); return lst.stream(); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index f7559ddfd..a5a1179f3 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -85,6 +85,7 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); + when(mockConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(4096); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); final Pattern filePattern = mock(Pattern.class); From f6b6adf27211bd77c43368fbf0a6989655a1af27 Mon Sep 17 00:00:00 2001 From: Aindriu Lavelle Date: Sat, 11 Jan 2025 03:01:33 +0000 Subject: [PATCH 2/2] Add length to the Transformer to allow improvements to the byte array in the future. Signed-off-by: Aindriu Lavelle --- .../common/config/SourceCommonConfig.java | 16 +++--- ...Fragment.java => TransformerFragment.java} | 40 +++++++------- .../common/source/input/AvroTransformer.java | 7 +-- .../source/input/ByteArrayTransformer.java | 44 +++++++++++++-- .../common/source/input/JsonTransformer.java | 5 +- .../source/input/ParquetTransformer.java | 7 +-- .../common/source/input/Transformer.java | 19 ++++--- .../source/input/TransformerFactory.java | 2 +- ...Test.java => TransformerFragmentTest.java} | 23 +++++--- .../source/input/AvroTransformerTest.java | 19 +++---- .../input/ByteArrayTransformerTest.java | 16 +++--- .../source/input/JsonTransformerTest.java | 25 ++++----- .../source/input/ParquetTransformerTest.java | 22 ++++---- .../input/TransformerStreamingTest.java | 15 +++--- s3-source-connector/README.md | 4 ++ .../connect/s3/source/AwsIntegrationTest.java | 4 +- .../connect/s3/source/IntegrationBase.java | 14 +++++ .../connect/s3/source/IntegrationTest.java | 53 +++++++++++++++++-- .../s3/source/config/S3SourceConfig.java | 6 +-- .../s3/source/utils/SourceRecordIterator.java | 5 +- .../connect/s3/source/S3SourceTaskTest.java | 2 +- .../s3/source/config/S3SourceConfigTest.java | 4 +- .../utils/SourceRecordIteratorTest.java | 16 +++--- 23 files changed, 246 insertions(+), 122 deletions(-) rename commons/src/main/java/io/aiven/kafka/connect/common/config/{SchemaRegistryFragment.java => TransformerFragment.java} (70%) rename commons/src/test/java/io/aiven/kafka/connect/common/config/{SchemaRegistryFragmentTest.java => TransformerFragmentTest.java} (55%) diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java index db6f2d15e..12a45ea64 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/SourceCommonConfig.java @@ -28,7 +28,7 @@ public class SourceCommonConfig extends CommonConfig { - private final SchemaRegistryFragment schemaRegistryFragment; + private final TransformerFragment transformerFragment; private final SourceConfigFragment sourceConfigFragment; private final FileNameFragment fileNameFragment; private final OutputFormatFragment outputFormatFragment; @@ -36,7 +36,7 @@ public class SourceCommonConfig extends CommonConfig { public SourceCommonConfig(ConfigDef definition, Map originals) {// NOPMD super(definition, originals); // Construct Fragments - schemaRegistryFragment = new SchemaRegistryFragment(this); + transformerFragment = new TransformerFragment(this); sourceConfigFragment = new SourceConfigFragment(this); fileNameFragment = new FileNameFragment(this); outputFormatFragment = new OutputFormatFragment(this); @@ -45,18 +45,18 @@ public SourceCommonConfig(ConfigDef definition, Map originals) {// NOPMD } private void validate() { - schemaRegistryFragment.validate(); + transformerFragment.validate(); sourceConfigFragment.validate(); fileNameFragment.validate(); outputFormatFragment.validate(); } public InputFormat getInputFormat() { - return schemaRegistryFragment.getInputFormat(); + return transformerFragment.getInputFormat(); } public String getSchemaRegistryUrl() { - return schemaRegistryFragment.getSchemaRegistryUrl(); + return transformerFragment.getSchemaRegistryUrl(); } public String getTargetTopics() { @@ -79,11 +79,11 @@ public int getMaxPollRecords() { } public Transformer getTransformer() { - return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat()); + return TransformerFactory.getTransformer(transformerFragment.getInputFormat()); } - public int getByteArrayTransformerMaxBufferSize() { - return schemaRegistryFragment.getByteArrayTransformerMaxBufferSize(); + public int getTransformerMaxBufferSize() { + return transformerFragment.getTransformerMaxBufferSize(); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java b/commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java similarity index 70% rename from commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java rename to commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java index 48d5e139f..4651244b1 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragment.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/config/TransformerFragment.java @@ -24,15 +24,15 @@ import io.aiven.kafka.connect.common.source.input.InputFormat; -public final class SchemaRegistryFragment extends ConfigFragment { - private static final String SCHEMAREGISTRY_GROUP = "Schema registry group"; +public final class TransformerFragment extends ConfigFragment { + private static final String TRANSFORMER_GROUP = "Transformer group"; public static final String SCHEMA_REGISTRY_URL = "schema.registry.url"; public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url"; public static final String AVRO_VALUE_SERIALIZER = "value.serializer"; public static final String INPUT_FORMAT_KEY = "input.format"; public static final String SCHEMAS_ENABLE = "schemas.enable"; - public static final String BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE = "byte.array.transformer.max.buffer.size"; - private static final int MAX_BUFFER_SIZE = 4096; + public static final String TRANSFORMER_MAX_BUFFER_SIZE = "transformer.max.buffer.size"; + private static final int DEFAULT_MAX_BUFFER_SIZE = 4096; /** * Construct the ConfigFragment.. @@ -40,29 +40,28 @@ public final class SchemaRegistryFragment extends ConfigFragment { * @param cfg * the configuration that this fragment is associated with. */ - public SchemaRegistryFragment(final AbstractConfig cfg) { + public TransformerFragment(final AbstractConfig cfg) { super(cfg); } public static ConfigDef update(final ConfigDef configDef) { - int srCounter = 0; + int transformerCounter = 0; configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), - ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++, + ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, transformerCounter++, ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL); configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, - new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", - SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL); + new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, + transformerCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL); configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(), new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, - "Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD - ConfigDef.Width.NONE, INPUT_FORMAT_KEY); - configDef.define(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, MAX_BUFFER_SIZE, + "Input format of messages read from source avro/json/parquet/bytes", TRANSFORMER_GROUP, + transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY); + configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, DEFAULT_MAX_BUFFER_SIZE, new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM, - "Max Size of the byte buffer when using the BYTE Transformer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + "Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY); - configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, - "Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD + "Avro value serializer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD // UnusedAssignment ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER); return configDef; @@ -80,17 +79,18 @@ public Class getAvroValueSerializer() { return cfg.getClass(AVRO_VALUE_SERIALIZER); } - public int getByteArrayTransformerMaxBufferSize() { - return cfg.getInt(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE); + public int getTransformerMaxBufferSize() { + return cfg.getInt(TRANSFORMER_MAX_BUFFER_SIZE); } private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator { @Override public void ensureValid(final String name, final Object value) { - final int size = (int) value; - if (size <= 0) { - throw new ConfigException(String.format("%s must be larger then 0", name)); + // ConfigDef will throw an error if this is not an int that is supplied + if ((int) value <= 0) { + throw new ConfigException( + String.format("%s must be larger then 0 and less then %s", name, Integer.MAX_VALUE)); } } diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java index 35039c083..2125acfec 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/AvroTransformer.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; import java.io.IOException; import java.io.InputStream; @@ -55,8 +55,9 @@ public void configureValueConverter(final Map config, final Sour } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { private DataFileStream dataFileStream; private final DatumReader datumReader = new GenericDatumReader<>(); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java index 98e01b15e..d76b019df 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformer.java @@ -32,6 +32,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * ByteArrayTransformer chunks an entire object into a maximum size specified by the + * {@link io.aiven.kafka.connect.common.config.TransformerFragment#TRANSFORMER_MAX_BUFFER_SIZE} configuration option. + * This will split large files into multiple records and each record will have the same key. + */ public class ByteArrayTransformer extends Transformer { private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class); @@ -42,10 +47,17 @@ public void configureValueConverter(final Map config, final Sour } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { + if (streamLength == 0) { + LOGGER.warn( + "Object sent for processing has an invalid streamLength of {}, object is empty returning an empty spliterator.", + streamLength); + return emptySpliterator(inputStreamIOSupplier); + } // The max buffer size for the byte array the default is 4096 if not set by the user. - final int maxBufferSize = sourceConfig.getByteArrayTransformerMaxBufferSize(); + final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize(); return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { @Override protected InputStream inputOpened(final InputStream input) { @@ -79,6 +91,32 @@ protected boolean doAdvance(final Consumer action) { }; } + /** + * This method returns an empty spliterator when an empty input stream is supplied to be split + * + * @param inputStreamIOSupplier + * The empty input stream that was supplied + * @return an Empty spliterator to return to the calling method. + */ + private static StreamSpliterator emptySpliterator(final IOSupplier inputStreamIOSupplier) { + return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { + @Override + protected boolean doAdvance(final Consumer action) { + return false; + } + + @Override + protected void doClose() { + // nothing to do + } + + @Override + protected InputStream inputOpened(final InputStream input) throws IOException { + return InputStream.nullInputStream(); + } + }; + } + @Override public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic, final SourceCommonConfig sourceConfig) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java index 8228038e9..03fbaec56 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/JsonTransformer.java @@ -53,8 +53,9 @@ public void configureValueConverter(final Map config, final Sour } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { BufferedReader reader; diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java index eb4b1abf8..05512e9e3 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/ParquetTransformer.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; import java.io.File; import java.io.IOException; @@ -66,8 +66,9 @@ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topi } @Override - public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, final String topic, - final int topicPartition, final SourceCommonConfig sourceConfig) { + public StreamSpliterator createSpliterator(final IOSupplier inputStreamIOSupplier, + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig) { return new StreamSpliterator(LOGGER, inputStreamIOSupplier) { diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java index 7dfc5f327..64e8a90c2 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/Transformer.java @@ -33,14 +33,16 @@ public abstract class Transformer { + public final static long UNKNOWN_STREAM_LENGTH = -1; + public abstract void configureValueConverter(Map config, SourceCommonConfig sourceConfig); public final Stream getRecords(final IOSupplier inputStreamIOSupplier, - final String topic, final int topicPartition, final SourceCommonConfig sourceConfig, - final long skipRecords) { + final long streamLength, final String topic, final int topicPartition, + final SourceCommonConfig sourceConfig, final long skipRecords) { - final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition, - sourceConfig); + final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, streamLength, topic, + topicPartition, sourceConfig); return StreamSupport.stream(spliterator, false).onClose(spliterator::close).skip(skipRecords); } @@ -49,16 +51,19 @@ public final Stream getRecords(final IOSupplier inp * * @param inputStreamIOSupplier * the input stream supplier. + * @param streamLength + * the length of the input stream, {@link #UNKNOWN_STREAM_LENGTH} may be used to specify a stream with an + * unknown length, streams of length zero will log an error and return an empty stream * @param topic * the topic. * @param topicPartition * the partition. * @param sourceConfig - * the source configuraiton. + * the source configuration. * @return a StreamSpliterator instance. */ - protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, String topic, - int topicPartition, SourceCommonConfig sourceConfig); + protected abstract StreamSpliterator createSpliterator(IOSupplier inputStreamIOSupplier, + long streamLength, String topic, int topicPartition, SourceCommonConfig sourceConfig); public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, SourceCommonConfig sourceConfig); diff --git a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java index 574604306..06d872be2 100644 --- a/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java +++ b/commons/src/main/java/io/aiven/kafka/connect/common/source/input/TransformerFactory.java @@ -16,7 +16,7 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE; import java.util.Map; diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java similarity index 55% rename from commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java rename to commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java index 6c12a3b93..1daeb3c8a 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/config/SchemaRegistryFragmentTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/config/TransformerFragmentTest.java @@ -28,27 +28,34 @@ import org.junit.jupiter.api.Test; -public class SchemaRegistryFragmentTest {// NOPMD +class TransformerFragmentTest { @Test void validateCorrectBufferSizeIsAccepted() { final int bufferSize = 50; - final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef()); + final ConfigDef configDef = TransformerFragment.update(new ConfigDef()); final Map props = new HashMap<>(); - props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, bufferSize); + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, bufferSize); - final SchemaRegistryFragment schemaReg = new SchemaRegistryFragment(new AbstractConfig(configDef, props)); - assertThat(schemaReg.getByteArrayTransformerMaxBufferSize()).isEqualTo(bufferSize); + final TransformerFragment schemaReg = new TransformerFragment(new AbstractConfig(configDef, props)); + assertThat(schemaReg.getTransformerMaxBufferSize()).isEqualTo(bufferSize); } @Test void validateInvalidBufferSizeThrowsConfigException() { - final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef()); + final ConfigDef configDef = TransformerFragment.update(new ConfigDef()); final Map props = new HashMap<>(); - props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, 0); + // Too small + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, 0); - assertThatThrownBy(() -> new SchemaRegistryFragment(new AbstractConfig(configDef, props))) + assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props))) .isInstanceOf(ConfigException.class); + // Too large + props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, Integer.MAX_VALUE + "1"); + assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props))) + .isInstanceOf(ConfigException.class) + .hasMessage( + "Invalid value 21474836471 for configuration transformer.max.buffer.size: Not a number of type INT"); } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java index 617dd290a..885a3beb8 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/AvroTransformerTest.java @@ -16,7 +16,8 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; @@ -78,8 +79,8 @@ void testConfigureValueConverter() { void testReadAvroRecordsInvalidData() { final InputStream inputStream = new ByteArrayInputStream("mock-avro-data".getBytes(StandardCharsets.UTF_8)); - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 0); + final Stream records = avroTransformer.getRecords(() -> inputStream, UNKNOWN_STREAM_LENGTH, "", + 0, sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).isEmpty(); @@ -95,8 +96,8 @@ void testReadAvroRecords() throws Exception { expected.add("Hello, Kafka Connect S3 Source! object " + i); } - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 0); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), "", 0, + sourceCommonConfig, 0); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Struct) sv).getString("message")) @@ -112,8 +113,8 @@ void testReadAvroRecordsSkipFew() throws Exception { for (int i = 5; i < 20; i++) { expected.add("Hello, Kafka Connect S3 Source! object " + i); } - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 5); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), "", 0, + sourceCommonConfig, 5); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Struct) sv).getString("message")) @@ -125,8 +126,8 @@ void testReadAvroRecordsSkipMoreRecordsThanExist() throws Exception { final ByteArrayOutputStream avroData = generateMockAvroData(20); final InputStream inputStream = new ByteArrayInputStream(avroData.toByteArray()); - final Stream records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig, - 25); + final Stream records = avroTransformer.getRecords(() -> inputStream, avroData.size(), "", 0, + sourceCommonConfig, 25); assertThat(records).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java index 4d8aed153..93c0b5f2c 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ByteArrayTransformerTest.java @@ -60,9 +60,9 @@ void testGetRecordsSingleChunk() { final byte[] data = { 1, 2, 3, 4, 5 }; final InputStream inputStream = new ByteArrayInputStream(data); final IOSupplier inputStreamIOSupplier = () -> inputStream; - when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(4096); - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, - sourceCommonConfig, 0); + when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(4096); + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, + (long) data.length, TEST_TOPIC, 0, sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).hasSize(1); @@ -75,7 +75,7 @@ void testGetRecordsEmptyInputStream() { final IOSupplier inputStreamIOSupplier = () -> inputStream; - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, 0, TEST_TOPIC, 0, sourceCommonConfig, 0); assertThat(records).hasSize(0); @@ -93,10 +93,10 @@ void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int final byte[] data = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; final InputStream inputStream = new ByteArrayInputStream(data); final IOSupplier inputStreamIOSupplier = () -> inputStream; - when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(maxBufferSize); + when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(maxBufferSize); - final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0, - sourceCommonConfig, 0); + final Stream records = byteArrayTransformer.getRecords(inputStreamIOSupplier, + (long) data.length, TEST_TOPIC, 0, sourceCommonConfig, 0); final List recs = records.collect(Collectors.toList()); assertThat(recs).hasSize(numberOfExpectedRecords); @@ -108,6 +108,6 @@ void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int }); assertThat(processedData.buffer()).isEqualTo(data); // Should only get called once per splitIterator - verify(sourceCommonConfig, times(1)).getByteArrayTransformerMaxBufferSize(); + verify(sourceCommonConfig, times(1)).getTransformerMaxBufferSize(); } } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java index e482fd61c..10ef3cbd7 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/JsonTransformerTest.java @@ -16,7 +16,8 @@ package io.aiven.kafka.connect.common.source.input; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -74,7 +75,7 @@ void destroy() { } @Test - void testHandleValueDataWithValidJson() { + void testHandleValueDataWithValidJson() throws IOException { final InputStream validJsonInputStream = new ByteArrayInputStream( getJsonRecs(100).getBytes(StandardCharsets.UTF_8)); @@ -83,8 +84,8 @@ void testHandleValueDataWithValidJson() { expected.add("value" + i); } - final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, TESTTOPIC, 1, - sourceCommonConfig, 0); + final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, + UNKNOWN_STREAM_LENGTH, TESTTOPIC, 1, sourceCommonConfig, 0); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Map) sv).get("key")) @@ -92,7 +93,7 @@ void testHandleValueDataWithValidJson() { } @Test - void testHandleValueDataWithValidJsonSkipFew() { + void testHandleValueDataWithValidJsonSkipFew() throws IOException { final InputStream validJsonInputStream = new ByteArrayInputStream( getJsonRecs(100).getBytes(StandardCharsets.UTF_8)); @@ -101,8 +102,8 @@ void testHandleValueDataWithValidJsonSkipFew() { expected.add("value" + i); } - final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, TESTTOPIC, 1, - sourceCommonConfig, 25L); + final Stream records = jsonTransformer.getRecords(() -> validJsonInputStream, + UNKNOWN_STREAM_LENGTH, TESTTOPIC, 1, sourceCommonConfig, 25L); assertThat(records).extracting(SchemaAndValue::value) .extracting(sv -> ((Map) sv).get("key")) @@ -111,13 +112,13 @@ void testHandleValueDataWithValidJsonSkipFew() { } @Test - void testHandleValueDataWithInvalidJson() { + void testHandleValueDataWithInvalidJson() throws IOException { final InputStream invalidJsonInputStream = new ByteArrayInputStream( "invalid-json".getBytes(StandardCharsets.UTF_8)); final IOSupplier inputStreamIOSupplier = () -> invalidJsonInputStream; - final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, TESTTOPIC, 1, - sourceCommonConfig, 0); + final Stream jsonNodes = jsonTransformer.getRecords(inputStreamIOSupplier, + UNKNOWN_STREAM_LENGTH, TESTTOPIC, 1, sourceCommonConfig, 0); assertThat(jsonNodes).isEmpty(); @@ -126,7 +127,7 @@ void testHandleValueDataWithInvalidJson() { @Test void testGetRecordsWithIOException() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, UNKNOWN_STREAM_LENGTH, "topic", 0, null, 0); assertThat(resultStream).isEmpty(); } @@ -134,7 +135,7 @@ void testGetRecordsWithIOException() throws IOException { @Test void testCustomSpliteratorWithIOExceptionDuringInitialization() throws IOException { when(inputStreamIOSupplierMock.get()).thenThrow(new IOException("Test IOException during initialization")); - final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, "topic", 0, null, 0); + final Stream resultStream = jsonTransformer.getRecords(inputStreamIOSupplierMock, UNKNOWN_STREAM_LENGTH, "topic", 0, null, 0); assertThat(resultStream).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java index 2f7a405fe..0a9037855 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/ParquetTransformerTest.java @@ -58,7 +58,7 @@ public void setUp() { } @Test - void testHandleValueDataWithZeroBytes() { + void testHandleValueDataWithZeroBytes() throws IOException { final byte[] mockParquetData = new byte[0]; final InputStream inputStream = new ByteArrayInputStream(mockParquetData); final IOSupplier inputStreamIOSupplier = () -> inputStream; @@ -66,8 +66,8 @@ void testHandleValueDataWithZeroBytes() { final String topic = "test-topic"; final int topicPartition = 0; - final Stream recs = parquetTransformer.getRecords(inputStreamIOSupplier, topic, topicPartition, - s3SourceConfig, 0L); + final Stream recs = parquetTransformer.getRecords(inputStreamIOSupplier, mockParquetData.length, + topic, topicPartition, s3SourceConfig, 0L); assertThat(recs).isEmpty(); } @@ -86,7 +86,7 @@ void testGetRecordsWithValidData() throws Exception { expected.add("name" + i); } final List records = parquetTransformer - .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 0L) + .getRecords(inputStreamIOSupplier, mockParquetData.length, topic, topicPartition, s3SourceConfig, 0L) .collect(Collectors.toList()); assertThat(records).extracting(SchemaAndValue::value) @@ -110,7 +110,7 @@ void testGetRecordsWithValidDataSkipFew() throws Exception { } final List records = parquetTransformer - .getRecords(inputStreamIOSupplier, topic, topicPartition, s3SourceConfig, 25L) + .getRecords(inputStreamIOSupplier, mockParquetData.length, topic, topicPartition, s3SourceConfig, 25L) .collect(Collectors.toList()); assertThat(records).extracting(SchemaAndValue::value) @@ -129,8 +129,8 @@ void testGetRecordsWithInvalidData() { final String topic = "test-topic"; final int topicPartition = 0; - final Stream records = parquetTransformer.getRecords(inputStreamIOSupplier, topic, - topicPartition, s3SourceConfig, 0L); + final Stream records = parquetTransformer.getRecords(inputStreamIOSupplier, invalidData.length, + topic, topicPartition, s3SourceConfig, 0L); assertThat(records).isEmpty(); } @@ -155,8 +155,8 @@ void testIOExceptionCreatingTempFile() { .thenThrow(new IOException("Test IOException for temp file")); final IOSupplier inputStreamSupplier = mock(IOSupplier.class); - final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", - 1, null, 0L); + final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, 0L, + "test-topic", 1, null, 0L); assertThat(resultStream).isEmpty(); } @@ -168,8 +168,8 @@ void testIOExceptionDuringDataCopy() throws IOException { when(inputStreamMock.read(any(byte[].class))).thenThrow(new IOException("Test IOException during copy")); final IOSupplier inputStreamSupplier = () -> inputStreamMock; - final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, "test-topic", - 1, null, 0L); + final Stream resultStream = parquetTransformer.getRecords(inputStreamSupplier, 0L, + "test-topic", 1, null, 0L); assertThat(resultStream).isEmpty(); } diff --git a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java index a4b3c522a..8895f4d2c 100644 --- a/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java +++ b/commons/src/test/java/io/aiven/kafka/connect/common/source/input/TransformerStreamingTest.java @@ -17,6 +17,7 @@ package io.aiven.kafka.connect.common.source.input; import static io.aiven.kafka.connect.common.config.OutputFormatFragmentFixture.OutputFormatArgs.FORMAT_OUTPUT_TYPE_CONFIG; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -37,8 +38,8 @@ import org.apache.kafka.connect.data.SchemaAndValue; import io.aiven.kafka.connect.common.config.OutputFormatFragment; -import io.aiven.kafka.connect.common.config.SchemaRegistryFragment; import io.aiven.kafka.connect.common.config.SourceCommonConfig; +import io.aiven.kafka.connect.common.config.TransformerFragment; import org.apache.commons.io.function.IOSupplier; import org.junit.jupiter.params.ParameterizedTest; @@ -56,7 +57,7 @@ void verifyExceptionDuringIOOpen(final Transformer transformer, final byte[] tes final SourceCommonConfig config, final int expectedCount) throws IOException { final IOSupplier ioSupplier = mock(IOSupplier.class); when(ioSupplier.get()).thenThrow(new IOException("Test IOException during initialization")); - final Stream objStream = transformer.getRecords(ioSupplier, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(ioSupplier, UNKNOWN_STREAM_LENGTH, "topic", 1, config, 0); assertThat(objStream).isEmpty(); } @@ -74,7 +75,8 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD when(inputStream.readNBytes(anyInt())).thenThrow(new IOException("Test IOException during read")); when(inputStream.readAllBytes()).thenThrow(new IOException("Test IOException during read")); try (CloseTrackingStream stream = new CloseTrackingStream(inputStream)) { - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, "topic", 1, + config, 0); assertThat(objStream).isEmpty(); assertThat(stream.closeCount).isGreaterThan(0); } @@ -86,7 +88,7 @@ void verifyExceptionDuringRead(final Transformer transformer, final byte[] testD void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, "topic", 1, config, 0); final long count = objStream.count(); assertThat(count).isEqualTo(expectedCount); assertThat(stream.closeCount).isGreaterThan(0); @@ -97,7 +99,8 @@ void verifyCloseCalledAtEnd(final Transformer transformer, final byte[] testData void verifyCloseCalledAtIteratorEnd(final Transformer transformer, final byte[] testData, final SourceCommonConfig config, final int expectedCount) throws IOException { final CloseTrackingStream stream = new CloseTrackingStream(new ByteArrayInputStream(testData)); - final Stream objStream = transformer.getRecords(() -> stream, "topic", 1, config, 0); + final Stream objStream = transformer.getRecords(() -> stream, UNKNOWN_STREAM_LENGTH, "topic", 1, + config, 0); final Iterator iter = objStream.iterator(); long count = 0L; while (iter.hasNext()) { @@ -118,7 +121,7 @@ static Stream testData() throws IOException { }, 100)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.BYTES), "Hello World".getBytes(StandardCharsets.UTF_8), new SourceCommonConfig( - SchemaRegistryFragment.update(OutputFormatFragment.update(new ConfigDef(), null)), props) { + TransformerFragment.update(OutputFormatFragment.update(new ConfigDef(), null)), props) { }, 1)); lst.add(Arguments.of(TransformerFactory.getTransformer(InputFormat.JSONL), JsonTransformerTest.getJsonRecs(100).getBytes(StandardCharsets.UTF_8), diff --git a/s3-source-connector/README.md b/s3-source-connector/README.md index 3c236d4d0..a5952008d 100644 --- a/s3-source-connector/README.md +++ b/s3-source-connector/README.md @@ -201,6 +201,10 @@ List of new configuration parameters: - `aws.sts.role.session.name` - AWS session name for cross-account access role - `aws.sts.role.session.duration` - Session duration for cross-account access role in Seconds. Minimum value - 900. - `aws.sts.config.endpoint` - AWS STS endpoint for cross-account access role. +- `transformer.max.buffer.size` - [Optional] When using the ByteArrayTransformer you can alter the buffer size from 1 up to 2147483647 default is 4096 +- `input.format` - Specify the format of the files being read from S3 supported values are avro, parquet, jsonl, and bytes, bytes is also the default +- `schema.registry.url` [Optional] The url of the schema registry you want to use +- `` ## Configuration diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java index 5d95d6ebd..a90582a83 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java @@ -16,10 +16,10 @@ package io.aiven.kafka.connect.s3.source; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java index fa4f60b76..f0ae307f1 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationBase.java @@ -210,6 +210,20 @@ static List consumeByteMessages(final String topic, final int expectedMe return objects.stream().map(String::new).collect(Collectors.toList()); } + static List consumeRawByteMessages(final String topic, final int expectedMessageCount, + String bootstrapServers) { + final Properties consumerProperties = getConsumerProperties(bootstrapServers, ByteArrayDeserializer.class, + ByteArrayDeserializer.class); + final List objects = consumeMessages(topic, expectedMessageCount, Duration.ofSeconds(60), + consumerProperties); + return objects.stream().map(obj -> { + final byte[] byteArray = new byte[obj.length]; + System.arraycopy(obj, 0, byteArray, 0, obj.length); + return byteArray; + }).collect(Collectors.toList()); + + } + static List consumeAvroMessages(final String topic, final int expectedMessageCount, final Duration expectedMaxDuration, final String bootstrapServers, final String schemaRegistryUrl) { final Properties consumerProperties = getConsumerProperties(bootstrapServers, StringDeserializer.class, diff --git a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java index ad31acc88..c68abc79d 100644 --- a/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java +++ b/s3-source-connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/IntegrationTest.java @@ -18,13 +18,13 @@ import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_NAME_TEMPLATE_CONFIG; import static io.aiven.kafka.connect.common.config.FileNameFragment.FILE_PATH_PREFIX_TEMPLATE_CONFIG; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.AVRO_VALUE_SERIALIZER; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.OBJECT_DISTRIBUTION_STRATEGY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.AVRO_VALUE_SERIALIZER; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; +import static io.aiven.kafka.connect.common.config.TransformerFragment.VALUE_CONVERTER_SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG; import static io.aiven.kafka.connect.config.s3.S3ConfigFragment.AWS_S3_ENDPOINT_CONFIG; @@ -42,6 +42,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -207,6 +208,50 @@ void bytesTest(final boolean addPrefix) { verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers()); } + @Test + void bytesDefaultBufferTest() { + final int maxBufferSize = 4096; + final var topicName = IntegrationBase.topicName(testInfo); + final ObjectDistributionStrategy objectDistributionStrategy; + final String prefixPattern = "topics/{{topic}}/partition={{partition}}/"; + objectDistributionStrategy = ObjectDistributionStrategy.PARTITION_IN_FILENAME; + + final String fileNamePatternSeparator = "_"; + + final Map connectorConfig = getConfig(CONNECTOR_NAME, topicName, 1, objectDistributionStrategy, + false, null, prefixPattern, fileNamePatternSeparator); + + connectorConfig.put(INPUT_FORMAT_KEY, InputFormat.BYTES.getValue()); + connectRunner.configureConnector(CONNECTOR_NAME, connectorConfig); + + final int byteArraySize = 6000; + final byte[] testData1 = new byte[byteArraySize]; + for (int i = 0; i < byteArraySize; i++) { + testData1[i] = ((Integer) i).byteValue(); + } + final List offsetKeys = new ArrayList<>(); + + offsetKeys.add(writeToS3(topicName, testData1, "0", s3Prefix, fileNamePatternSeparator)); + + assertThat(testBucketAccessor.listObjects()).hasSize(1); + + // Poll messages from the Kafka topic and verify the consumed data + final List records = IntegrationBase.consumeRawByteMessages(topicName, 2, + connectRunner.getBootstrapServers()); + + assertThat(records.get(0)).hasSize(maxBufferSize); + assertThat(records.get(1)).hasSize(byteArraySize - maxBufferSize); + + assertThat(records.get(0)).isEqualTo(Arrays.copyOfRange(testData1, 0, maxBufferSize)); + assertThat(records.get(1)).isEqualTo(Arrays.copyOfRange(testData1, maxBufferSize, testData1.length)); + + // Verify offset positions + final Map expectedOffsetRecords = offsetKeys.subList(0, 0) + .stream() + .collect(Collectors.toMap(Function.identity(), s -> 1)); + verifyOffsetPositions(expectedOffsetRecords, connectRunner.getBootstrapServers()); + } + @Test void avroTest(final TestInfo testInfo) throws IOException { final var topicName = IntegrationBase.topicName(testInfo); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index ebcffdba5..dae9b6b06 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -25,9 +25,9 @@ import io.aiven.kafka.connect.common.config.FileNameFragment; import io.aiven.kafka.connect.common.config.OutputFieldType; import io.aiven.kafka.connect.common.config.OutputFormatFragment; -import io.aiven.kafka.connect.common.config.SchemaRegistryFragment; import io.aiven.kafka.connect.common.config.SourceCommonConfig; import io.aiven.kafka.connect.common.config.SourceConfigFragment; +import io.aiven.kafka.connect.common.config.TransformerFragment; import io.aiven.kafka.connect.config.s3.S3ConfigFragment; import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; import io.aiven.kafka.connect.iam.AwsStsRole; @@ -56,7 +56,7 @@ public static ConfigDef configDef() { S3ConfigFragment.update(configDef); SourceConfigFragment.update(configDef); FileNameFragment.update(configDef); - SchemaRegistryFragment.update(configDef); + TransformerFragment.update(configDef); OutputFormatFragment.update(configDef, OutputFieldType.VALUE); return configDef; @@ -66,7 +66,7 @@ private void validate() { // s3ConfigFragment is validated in this method as it is created here. // Other Fragments created in the ConfigDef are validated in the parent classes their instances are created in. - // e.g. SourceConfigFragment, FileNameFragment, SchemaRegistryFragment and OutputFormatFragment are all + // e.g. SourceConfigFragment, FileNameFragment, TransformerFragment and OutputFormatFragment are all // validated in SourceCommonConfig. s3ConfigFragment.validate(); } diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java index 820be20aa..655567aed 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java @@ -138,8 +138,9 @@ private Stream convert(final S3Object s3Object) { final SchemaAndValue keyData = transformer.getKeyData(s3Object.key(), topic, s3SourceConfig); - return transformer - .getRecords(sourceClient.getObject(s3Object.key()), topic, partitionId, s3SourceConfig, recordCount) + return transformer // s3Object.Size() in bytes of the object + .getRecords(sourceClient.getObject(s3Object.key()), s3Object.size(), topic, partitionId, s3SourceConfig, + recordCount) .map(new Mapper(partitionMap, recordCount, keyData, s3Object.key())); } diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java index c915376c9..d007a713a 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java @@ -16,9 +16,9 @@ package io.aiven.kafka.connect.s3.source; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java index 10939c511..031b4d1c3 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfigTest.java @@ -16,10 +16,10 @@ package io.aiven.kafka.connect.s3.source.config; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.INPUT_FORMAT_KEY; -import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPICS; import static io.aiven.kafka.connect.common.config.SourceConfigFragment.TARGET_TOPIC_PARTITIONS; +import static io.aiven.kafka.connect.common.config.TransformerFragment.INPUT_FORMAT_KEY; +import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL; import static org.assertj.core.api.Assertions.assertThat; import java.util.HashMap; diff --git a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java index a5a1179f3..ebf66f727 100644 --- a/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java +++ b/s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIteratorTest.java @@ -16,6 +16,7 @@ package io.aiven.kafka.connect.s3.source.utils; +import static io.aiven.kafka.connect.common.source.input.Transformer.UNKNOWN_STREAM_LENGTH; import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_PARTITION_KEY; import static io.aiven.kafka.connect.common.source.input.utils.FilePatternUtils.PATTERN_TOPIC_KEY; import static io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator.BYTES_TRANSFORMATION_NUM_OF_RECS; @@ -85,7 +86,7 @@ void testIteratorProcessesS3Objects() throws Exception { when(mockSourceApiClient.getObject(anyString())).thenReturn(() -> mockInputStream); mockTransformer = TransformerFactory.getTransformer(InputFormat.BYTES); - when(mockConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(4096); + when(mockConfig.getTransformerMaxBufferSize()).thenReturn(4096); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); final Pattern filePattern = mock(Pattern.class); @@ -98,7 +99,7 @@ mockSourceApiClient, new HashDistributionStrategy(1), assertThat(iterator.hasNext()).isFalse(); mockPatternMatcher(filePattern); - final S3Object obj = S3Object.builder().key(key).build(); + final S3Object obj = S3Object.builder().key(key).size(UNKNOWN_STREAM_LENGTH).build(); final ByteArrayInputStream bais = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8)); when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(obj).iterator()); @@ -114,7 +115,7 @@ mockSourceApiClient, new HashDistributionStrategy(1), @Test void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { final String key = "topic-00001-abc123.txt"; - final S3Object s3Object = S3Object.builder().key(key).build(); + final S3Object s3Object = S3Object.builder().key(key).size(UNKNOWN_STREAM_LENGTH).build(); // With ByteArrayTransformer try (InputStream inputStream = new ByteArrayInputStream("Hello World".getBytes(StandardCharsets.UTF_8))) { @@ -124,7 +125,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { when(mockSourceApiClient.getS3ObjectIterator(any())).thenReturn(Arrays.asList(s3Object).iterator()); mockTransformer = mock(ByteArrayTransformer.class); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + when(mockTransformer.getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Stream.of(SchemaAndValue.NULL)); when(mockOffsetManager.getOffsets()).thenReturn(Collections.emptyMap()); @@ -140,7 +141,7 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isFalse(); verify(mockSourceApiClient, never()).getObject(any()); - verify(mockTransformer, never()).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + verify(mockTransformer, never()).getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong()); } // With AvroTransformer @@ -157,14 +158,15 @@ void testIteratorProcessesS3ObjectsForByteArrayTransformer() throws Exception { mockPatternMatcher(filePattern); when(mockTransformer.getKeyData(anyString(), anyString(), any())).thenReturn(SchemaAndValue.NULL); - when(mockTransformer.getRecords(any(), anyString(), anyInt(), any(), anyLong())) + when(mockTransformer.getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong())) .thenReturn(Arrays.asList(SchemaAndValue.NULL).stream()); final Iterator iterator = new SourceRecordIterator(mockConfig, mockOffsetManager, mockTransformer, mockSourceApiClient, new HashDistributionStrategy(1), filePattern, 0); assertThat(iterator.hasNext()).isFalse(); - verify(mockTransformer, times(0)).getRecords(any(), anyString(), anyInt(), any(), anyLong()); + verify(mockTransformer, times(0)).getRecords(any(), anyLong(), anyString(), anyInt(), any(), anyLong()); + } }