Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow users to vary the size of the byteArrayBuffer in source connectors #386

Open
wants to merge 2 commits into
base: s3-source-release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@

public class SourceCommonConfig extends CommonConfig {

private final SchemaRegistryFragment schemaRegistryFragment;
private final TransformerFragment transformerFragment;
private final SourceConfigFragment sourceConfigFragment;
private final FileNameFragment fileNameFragment;
private final OutputFormatFragment outputFormatFragment;

public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
super(definition, originals);
// Construct Fragments
schemaRegistryFragment = new SchemaRegistryFragment(this);
transformerFragment = new TransformerFragment(this);
sourceConfigFragment = new SourceConfigFragment(this);
fileNameFragment = new FileNameFragment(this);
outputFormatFragment = new OutputFormatFragment(this);
Expand All @@ -45,18 +45,18 @@ public SourceCommonConfig(ConfigDef definition, Map<?, ?> originals) {// NOPMD
}

private void validate() {
schemaRegistryFragment.validate();
transformerFragment.validate();
sourceConfigFragment.validate();
fileNameFragment.validate();
outputFormatFragment.validate();
}

public InputFormat getInputFormat() {
return schemaRegistryFragment.getInputFormat();
return transformerFragment.getInputFormat();
}

public String getSchemaRegistryUrl() {
return schemaRegistryFragment.getSchemaRegistryUrl();
return transformerFragment.getSchemaRegistryUrl();
}

public String getTargetTopics() {
Expand All @@ -79,7 +79,11 @@ public int getMaxPollRecords() {
}

public Transformer getTransformer() {
return TransformerFactory.getTransformer(schemaRegistryFragment.getInputFormat());
return TransformerFactory.getTransformer(transformerFragment.getInputFormat());
}

public int getTransformerMaxBufferSize() {
return transformerFragment.getTransformerMaxBufferSize();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,42 +20,48 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import io.aiven.kafka.connect.common.source.input.InputFormat;

public final class SchemaRegistryFragment extends ConfigFragment {
private static final String SCHEMAREGISTRY_GROUP = "Schema registry group";
public final class TransformerFragment extends ConfigFragment {
private static final String TRANSFORMER_GROUP = "Transformer group";
public static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
public static final String VALUE_CONVERTER_SCHEMA_REGISTRY_URL = "value.converter.schema.registry.url";
public static final String AVRO_VALUE_SERIALIZER = "value.serializer";
public static final String INPUT_FORMAT_KEY = "input.format";
public static final String SCHEMAS_ENABLE = "schemas.enable";
public static final String TRANSFORMER_MAX_BUFFER_SIZE = "transformer.max.buffer.size";
aindriu-aiven marked this conversation as resolved.
Show resolved Hide resolved
private static final int DEFAULT_MAX_BUFFER_SIZE = 4096;

/**
* Construct the ConfigFragment..
*
* @param cfg
* the configuration that this fragment is associated with.
*/
public SchemaRegistryFragment(final AbstractConfig cfg) {
public TransformerFragment(final AbstractConfig cfg) {
super(cfg);
}

public static ConfigDef update(final ConfigDef configDef) {
int srCounter = 0;
int transformerCounter = 0;
configDef.define(SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", SCHEMAREGISTRY_GROUP, srCounter++,
ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP, transformerCounter++,
ConfigDef.Width.NONE, SCHEMA_REGISTRY_URL);
configDef.define(VALUE_CONVERTER_SCHEMA_REGISTRY_URL, ConfigDef.Type.STRING, null,
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL",
SCHEMAREGISTRY_GROUP, srCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM, "SCHEMA REGISTRY URL", TRANSFORMER_GROUP,
transformerCounter++, ConfigDef.Width.NONE, VALUE_CONVERTER_SCHEMA_REGISTRY_URL);
configDef.define(INPUT_FORMAT_KEY, ConfigDef.Type.STRING, InputFormat.BYTES.getValue(),
new ConfigDef.NonEmptyString(), ConfigDef.Importance.MEDIUM,
"Input format of messages read from source avro/json/parquet/bytes", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Input format of messages read from source avro/json/parquet/bytes", TRANSFORMER_GROUP,
transformerCounter++, ConfigDef.Width.NONE, INPUT_FORMAT_KEY);
configDef.define(TRANSFORMER_MAX_BUFFER_SIZE, ConfigDef.Type.INT, DEFAULT_MAX_BUFFER_SIZE,
new ByteArrayTransformerMaxBufferSizeValidator(), ConfigDef.Importance.MEDIUM,
"Max Size of the byte buffer when using the BYTE Transformer", TRANSFORMER_GROUP, transformerCounter++,
ConfigDef.Width.NONE, INPUT_FORMAT_KEY);

configDef.define(AVRO_VALUE_SERIALIZER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM,
"Avro value serializer", SCHEMAREGISTRY_GROUP, srCounter++, // NOPMD
"Avro value serializer", TRANSFORMER_GROUP, transformerCounter++, // NOPMD
// UnusedAssignment
ConfigDef.Width.NONE, AVRO_VALUE_SERIALIZER);
return configDef;
Expand All @@ -73,4 +79,21 @@ public Class<?> getAvroValueSerializer() {
return cfg.getClass(AVRO_VALUE_SERIALIZER);
}

public int getTransformerMaxBufferSize() {
return cfg.getInt(TRANSFORMER_MAX_BUFFER_SIZE);
}

private static class ByteArrayTransformerMaxBufferSizeValidator implements ConfigDef.Validator {
@Override
public void ensureValid(final String name, final Object value) {

// ConfigDef will throw an error if this is not an int that is supplied
aindriu-aiven marked this conversation as resolved.
Show resolved Hide resolved
if ((int) value <= 0) {
throw new ConfigException(
String.format("%s must be larger then 0 and less then %s", name, Integer.MAX_VALUE));
}

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import io.confluent.connect.avro.AvroData;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -49,13 +50,14 @@ public class AvroTransformer extends Transformer {
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
public void configureValueConverter(final Map<String, String> config, final SourceCommonConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final AbstractConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
private DataFileStream<GenericRecord> dataFileStream;
private final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
Expand Down Expand Up @@ -91,7 +93,7 @@ protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
final SourceCommonConfig sourceConfig) {
return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA,
((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,41 @@
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.function.IOSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* ByteArrayTransformer chunks an entire object into a maximum size specified by the
* {@link io.aiven.kafka.connect.common.config.TransformerFragment#TRANSFORMER_MAX_BUFFER_SIZE} configuration option.
* This will split large files into multiple records and each record will have the same key.
*/
public class ByteArrayTransformer extends Transformer {
aindriu-aiven marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger LOGGER = LoggerFactory.getLogger(ByteArrayTransformer.class);

private static final int MAX_BUFFER_SIZE = 4096;

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
public void configureValueConverter(final Map<String, String> config, final SourceCommonConfig sourceConfig) {
// For byte array transformations, ByteArrayConverter is the converter which is the default config.

}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final AbstractConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
if (streamLength == 0) {
LOGGER.warn(
"Object sent for processing has an invalid streamLength of {}, object is empty returning an empty spliterator.",
streamLength);
return emptySpliterator(inputStreamIOSupplier);
}
// The max buffer size for the byte array the default is 4096 if not set by the user.
final int maxBufferSize = sourceConfig.getTransformerMaxBufferSize();
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected InputStream inputOpened(final InputStream input) {
Expand All @@ -57,13 +71,13 @@ protected void doClose() {

@Override
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
final byte[] buffer = new byte[MAX_BUFFER_SIZE];
final byte[] buffer = new byte[maxBufferSize];
aindriu-aiven marked this conversation as resolved.
Show resolved Hide resolved
try {
final int bytesRead = IOUtils.read(inputStream, buffer);
if (bytesRead == 0) {
return false;
}
if (bytesRead < MAX_BUFFER_SIZE) {
if (bytesRead < maxBufferSize) {
aindriu-aiven marked this conversation as resolved.
Show resolved Hide resolved
action.accept(new SchemaAndValue(null, Arrays.copyOf(buffer, bytesRead)));
} else {
action.accept(new SchemaAndValue(null, buffer));
Expand All @@ -77,9 +91,35 @@ protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
};
}

/**
* This method returns an empty spliterator when an empty input stream is supplied to be split
*
* @param inputStreamIOSupplier
* The empty input stream that was supplied
* @return an Empty spliterator to return to the calling method.
*/
private static StreamSpliterator emptySpliterator(final IOSupplier<InputStream> inputStreamIOSupplier) {
aindriu-aiven marked this conversation as resolved.
Show resolved Hide resolved
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
@Override
protected boolean doAdvance(final Consumer<? super SchemaAndValue> action) {
return false;
}

@Override
protected void doClose() {
// nothing to do
}

@Override
protected InputStream inputOpened(final InputStream input) throws IOException {
return InputStream.nullInputStream();
}
};
}

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
final SourceCommonConfig sourceConfig) {
return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.function.IOSupplier;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -48,12 +49,13 @@ public class JsonTransformer extends Transformer {
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
public void configureValueConverter(final Map<String, String> config, final SourceCommonConfig sourceConfig) {
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final AbstractConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {
return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {
BufferedReader reader;

Expand Down Expand Up @@ -99,7 +101,7 @@ public boolean doAdvance(final Consumer<? super SchemaAndValue> action) {

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
final SourceCommonConfig sourceConfig) {
return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package io.aiven.kafka.connect.common.source.input;

import static io.aiven.kafka.connect.common.config.SchemaRegistryFragment.SCHEMA_REGISTRY_URL;
import static io.aiven.kafka.connect.common.config.TransformerFragment.SCHEMA_REGISTRY_URL;

import java.io.File;
import java.io.IOException;
Expand All @@ -29,9 +29,9 @@
import java.util.Map;
import java.util.function.Consumer;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.SchemaAndValue;

import io.aiven.kafka.connect.common.config.SourceCommonConfig;
import io.aiven.kafka.connect.common.source.input.parquet.LocalInputFile;

import io.confluent.connect.avro.AvroData;
Expand All @@ -55,19 +55,20 @@ public class ParquetTransformer extends Transformer {
}

@Override
public void configureValueConverter(final Map<String, String> config, final AbstractConfig sourceConfig) {
public void configureValueConverter(final Map<String, String> config, final SourceCommonConfig sourceConfig) {
config.put(SCHEMA_REGISTRY_URL, sourceConfig.getString(SCHEMA_REGISTRY_URL));
}

@Override
public SchemaAndValue getKeyData(final Object cloudStorageKey, final String topic,
final AbstractConfig sourceConfig) {
final SourceCommonConfig sourceConfig) {
return new SchemaAndValue(null, ((String) cloudStorageKey).getBytes(StandardCharsets.UTF_8));
}

@Override
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier, final String topic,
final int topicPartition, final AbstractConfig sourceConfig) {
public StreamSpliterator createSpliterator(final IOSupplier<InputStream> inputStreamIOSupplier,
final long streamLength, final String topic, final int topicPartition,
final SourceCommonConfig sourceConfig) {

return new StreamSpliterator(LOGGER, inputStreamIOSupplier) {

Expand Down
Loading
Loading