Skip to content

Commit

Permalink
Add length to the Transformer to allow improvements to the byte array…
Browse files Browse the repository at this point in the history
… in the future.

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Jan 11, 2025
1 parent f2274a5 commit e45686e
Show file tree
Hide file tree
Showing 21 changed files with 131 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@

public class SourceCommonConfig extends CommonConfig {

private final SchemaRegistryFragment schemaRegistryFragment;
private final TransformerFragment transformerFragment;
private final SourceConfigFragment sourceConfigFragment;
private final FileNameFragment fileNameFragment;
private final OutputFormatFragment outputFormatFragment;

public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
super(definition, originals);
// Construct Fragments
schemaRegistryFragment = new SchemaRegistryFragment(this);
transformerFragment = new TransformerFragment(this);
sourceConfigFragment = new SourceConfigFragment(this);
fileNameFragment = new FileNameFragment(this);
outputFormatFragment = new OutputFormatFragment(this);
Expand All @@ -44,18 +44,18 @@ public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
}

private void validate() {
schemaRegistryFragment.validate();
transformerFragment.validate();
sourceConfigFragment.validate();
fileNameFragment.validate();
outputFormatFragment.validate();
}

public InputFormat getInputFormat() {
return schemaRegistryFragment.getInputFormat();
return transformerFragment.getInputFormat();
}

public String getSchemaRegistryUrl() {
return schemaRegistryFragment.getSchemaRegistryUrl();
return transformerFragment.getSchemaRegistryUrl();
}

public String getTargetTopics() {
Expand All @@ -74,11 +74,11 @@ public int getMaxPollRecords() {
}

public Transformer getTransformer() {
return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
return TransformerFactory.getTransformer(transformerFragment.getInputFormat());
}

public int getByteArrayTransformerMaxBufferSize() {
return schemaRegistryFragment.getByteArrayTransformerMaxBufferSize();
public int getTransformerMaxBufferSize() {
return transformerFragment.getTransformerMaxBufferSize();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@

import io.aiven.kafka.connect.common.source.input.InputFormat;

public final class SchemaRegistryFragment extends ConfigFragment {
private static final String SCHEMAREGISTRY_GROUP = "Schema registry group";
public final class TransformerFragment extends ConfigFragment {
private static final String TRANSFORMER_GROUP = "Transformer group";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url";
public static final String AVRO_VALUE_SERIALIZER = "value.serializer";
public static final String INPUT_FORMAT_KEY = "input.format";
public static final String SCHEMAS_ENABLE = "schemas.enable";
public static final String BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE = "byte.array.transformer.max.buffer.size";
public static final String TRANSFORMER_MAX_BUFFER_SIZE = "transformer.max.buffer.size";
private static final int MAX_BUFFER_SIZE = 4096;

/**
Expand All @@ -40,29 +40,30 @@ public final class SchemaRegistryFragment extends ConfigFragment {
* @param cfg
* the configuration that this fragment is associated with.
*/
public SchemaRegistryFragment(final AbstractConfig cfg) {
public TransformerFragment(final AbstractConfig cfg) {
super(cfg);
}

public static ConfigDef update(final ConfigDef configDef) {
int srCounter = 0;
int transformerCounter = 0;
configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++,
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, transformerCounter++,
ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL);
configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL",
SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP,
transformerCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(),
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Input format of messages read from source avro/json/parquet/bytes", TRANSFORMER_GROUP,
transformerCounter++, // NOPMD
ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
configDef.define(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, MAX_BUFFER_SIZE,
configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, MAX_BUFFER_SIZE,
new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM,
"Max Size of the byte buffer when using the BYTE Transformer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD
ConfigDef.Width.NONE, INPUT_FORMAT_KEY);

configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM,
"Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Avro value serializer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER);
return configDef;
Expand All @@ -80,8 +81,8 @@ public Class<?> getAvroValueSerializer() {
return cfg.getClass(AVRO_VALUE_SERIALIZER);
}

public int getByteArrayTransformerMaxBufferSize() {
return cfg.getInt(BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE);
public int getTransformerMaxBufferSize() {
return cfg.getInt(TRANSFORMER_MAX_BUFFER_SIZE);
}

private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -55,8 +55,9 @@ public void configureValueConverter(final Map<String, String> config, final Sour
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
private DataFileStream<GenericRecord> dataFileStream;
private final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public void configureValueConverter(final Map<String, String> config, final Sour
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
// The max buffer size for the byte array the default is 4096 if not set by the user.
final int maxBufferSize = sourceConfig.getByteArrayTransformerMaxBufferSize();
final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize();
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected InputStream inputOpened(final InputStream input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public void configureValueConverter(final Map<String, String> config, final Sour
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
BufferedReader reader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -66,8 +66,9 @@ public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topi
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final SourceCommonConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {

return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@

public abstract class Transformer {

public final static long UNKNOWN_STREAM_LENGTH = -1;

public abstract void configureValueConverter(Map<String, String> config, SourceCommonConfig sourceConfig);

public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inputStreamIOSupplier,
final String topic, final int topicPartition, final SourceCommonConfig sourceConfig,
final long skipRecords) {
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig, final long skipRecords) {

final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, topic, topicPartition,
sourceConfig);
final StreamSpliterator spliterator = createSpliterator(inputStreamIOSupplier, streamLength, topic,
topicPartition, sourceConfig);
return StreamSupport.stream(spliterator, false).onClose(spliterator::close).skip(skipRecords);
}

Expand All @@ -57,8 +59,8 @@ public final Stream<SchemaAndValue> getRecords(final IOSupplier<InputStream> inp
* the source configuraiton.
* @return a StreamSpliterator instance.
*/
protected abstract StreamSpliterator createSpliterator(IOSupplier<InputStream> inputStreamIOSupplier, String topic,
int topicPartition, SourceCommonConfig sourceConfig);
protected abstract StreamSpliterator createSpliterator(IOSupplier<InputStream> inputStreamIOSupplier,
long streamLength, String topic, int topicPartition, SourceCommonConfig sourceConfig);

public abstract SchemaAndValue getKeyData(Object cloudStorageKey, String topic, SourceCommonConfig sourceConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMAS_ENABLE;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMAS_ENABLE;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,26 @@

import org.junit.jupiter.api.Test;

public class SchemaRegistryFragmentTest {// NOPMD
class TransformerFragmentTest {

@Test
void validateCorrectBufferSizeIsAccepted() {
final int bufferSize = 50;
final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef());
final ConfigDef configDef = TransformerFragment.update(new ConfigDef());
final Map<String, Object> props = new HashMap<>();
props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, bufferSize);
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, bufferSize);

final SchemaRegistryFragment schemaReg = new SchemaRegistryFragment(new AbstractConfig(configDef, props));
assertThat(schemaReg.getByteArrayTransformerMaxBufferSize()).isEqualTo(bufferSize);
final TransformerFragment schemaReg = new TransformerFragment(new AbstractConfig(configDef, props));
assertThat(schemaReg.getTransformerMaxBufferSize()).isEqualTo(bufferSize);
}

@Test
void validateInvalidBufferSizeThrowsConfigException() {
final ConfigDef configDef = SchemaRegistryFragment.update(new ConfigDef());
final ConfigDef configDef = TransformerFragment.update(new ConfigDef());
final Map<String, Object> props = new HashMap<>();
props.put(SchemaRegistryFragment.BYTE_ARRAY_TRANSFORMER_MAX_BUFFER_SIZE, 0);
props.put(TransformerFragment.TRANSFORMER_MAX_BUFFER_SIZE, 0);

assertThatThrownBy(() -> new SchemaRegistryFragment(new AbstractConfig(configDef, props)))
assertThatThrownBy(() -> new TransformerFragment(new AbstractConfig(configDef, props)))
.isInstanceOf(ConfigException.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -75,11 +75,11 @@ void testConfigureValueConverter() {
}

@Test
void testReadAvroRecordsInvalidData() {
void testReadAvroRecordsInvalidData() throws IOException {
final InputStream inputStream = new ByteArrayInputStream("mock-avro-data".getBytes(StandardCharsets.UTF_8));

final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig,
0);
final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream,
(long) inputStream.available(), "", 0, sourceCommonConfig, 0);

final List<Object> recs = records.collect(Collectors.toList());
assertThat(recs).isEmpty();
Expand All @@ -95,8 +95,8 @@ void testReadAvroRecords() throws Exception {
expected.add("Hello, Kafka Connect S3 Source! object " + i);
}

final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig,
0);
final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream,
(long) inputStream.available(), "", 0, sourceCommonConfig, 0);

assertThat(records).extracting(SchemaAndValue::value)
.extracting(sv -> ((Struct) sv).getString("message"))
Expand All @@ -112,8 +112,8 @@ void testReadAvroRecordsSkipFew() throws Exception {
for (int i = 5; i < 20; i++) {
expected.add("Hello, Kafka Connect S3 Source! object " + i);
}
final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig,
5);
final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream,
(long) inputStream.available(), "", 0, sourceCommonConfig, 5);

assertThat(records).extracting(SchemaAndValue::value)
.extracting(sv -> ((Struct) sv).getString("message"))
Expand All @@ -125,8 +125,8 @@ void testReadAvroRecordsSkipMoreRecordsThanExist() throws Exception {
final ByteArrayOutputStream avroData = generateMockAvroData(20);
final InputStream inputStream = new ByteArrayInputStream(avroData.toByteArray());

final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream, "", 0, sourceCommonConfig,
25);
final Stream<SchemaAndValue> records = avroTransformer.getRecords(() -> inputStream,
(long) inputStream.available(), "", 0, sourceCommonConfig, 25);

assertThat(records).isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ void testGetRecordsSingleChunk() {
final byte[] data = { 1, 2, 3, 4, 5 };
final InputStream inputStream = new ByteArrayInputStream(data);
final IOSupplier<InputStream> inputStreamIOSupplier = () -> inputStream;
when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(4096);
final Stream<SchemaAndValue> records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0,
sourceCommonConfig, 0);
when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(4096);
final Stream<SchemaAndValue> records = byteArrayTransformer.getRecords(inputStreamIOSupplier,
(long) data.length, TEST_TOPIC, 0, sourceCommonConfig, 0);

final List<SchemaAndValue> recs = records.collect(Collectors.toList());
assertThat(recs).hasSize(1);
Expand All @@ -75,7 +75,7 @@ void testGetRecordsEmptyInputStream() {

final IOSupplier<InputStream> inputStreamIOSupplier = () -> inputStream;

final Stream<SchemaAndValue> records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0,
final Stream<SchemaAndValue> records = byteArrayTransformer.getRecords(inputStreamIOSupplier, 0, TEST_TOPIC, 0,
sourceCommonConfig, 0);

assertThat(records).hasSize(0);
Expand All @@ -93,10 +93,10 @@ void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int
final byte[] data = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
final InputStream inputStream = new ByteArrayInputStream(data);
final IOSupplier<InputStream> inputStreamIOSupplier = () -> inputStream;
when(sourceCommonConfig.getByteArrayTransformerMaxBufferSize()).thenReturn(maxBufferSize);
when(sourceCommonConfig.getTransformerMaxBufferSize()).thenReturn(maxBufferSize);

final Stream<SchemaAndValue> records = byteArrayTransformer.getRecords(inputStreamIOSupplier, TEST_TOPIC, 0,
sourceCommonConfig, 0);
final Stream<SchemaAndValue> records = byteArrayTransformer.getRecords(inputStreamIOSupplier,
(long) data.length, TEST_TOPIC, 0, sourceCommonConfig, 0);

final List<SchemaAndValue> recs = records.collect(Collectors.toList());
assertThat(recs).hasSize(numberOfExpectedRecords);
Expand All @@ -108,6 +108,6 @@ void testGetRecordsWithVariableMaxBufferSize(final int maxBufferSize, final int
});
assertThat(processedData.buffer()).isEqualTo(data);
// Should only get called once per splitIterator
verify(sourceCommonConfig, times(1)).getByteArrayTransformerMaxBufferSize();
verify(sourceCommonConfig, times(1)).getTransformerMaxBufferSize();
}
}
Loading

0 comments on commit e45686e

Please sign in to comment.