Skip to content

Commit

Permalink
Support larger message sizes in Kafka Buffer (#3916)
Browse files Browse the repository at this point in the history
* Support larger message sizes in Kafka Buffer

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments and added new integration tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Jan 9, 2024
1 parent e6df3eb commit 774fa21
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.codec.ByteDecoder;
import org.opensearch.dataprepper.model.codec.JsonDecoder;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
Expand All @@ -26,6 +25,8 @@
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kafka.util.TestConsumer;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,7 +56,12 @@
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.lenient;

@ExtendWith(MockitoExtension.class)
public class KafkaBufferIT {
Expand All @@ -68,6 +74,9 @@ public class KafkaBufferIT {
private PluginFactory pluginFactory;
@Mock
private AcknowledgementSetManager acknowledgementSetManager;
@Mock
private AcknowledgementSet acknowledgementSet;

private Random random;

private BufferTopicConfig topicConfig;
Expand All @@ -81,6 +90,12 @@ public class KafkaBufferIT {
@BeforeEach
void setUp() {
random = new Random();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
acknowledgementSet = mock(AcknowledgementSet.class);
lenient().doAnswer((a) -> {
return null;
}).when(acknowledgementSet).complete();
lenient().when(acknowledgementSetManager.create(any(), any(Duration.class))).thenReturn(acknowledgementSet);
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());

when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString());
Expand Down Expand Up @@ -135,6 +150,42 @@ void write_and_read() throws TimeoutException {
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}

@Test
void write_and_read_max_request_test() throws TimeoutException, NoSuchFieldException, IllegalAccessException {
KafkaProducerProperties kafkaProducerProperties = new KafkaProducerProperties();
setField(KafkaProducerProperties.class, kafkaProducerProperties, "maxRequestSize", 4*1024*1024);
final Map<String, Object> topicConfigMap = Map.of(
"name", topicName,
"group_id", "buffergroup-" + RandomStringUtils.randomAlphabetic(6),
"create_topic", false
);
final Map<String, Object> bufferConfigMap = Map.of(
"topics", List.of(topicConfigMap),
"producer_properties", kafkaProducerProperties,
"bootstrap_servers", List.of(bootstrapServersCommaDelimited),
"encryption", Map.of("type", "none")
);
kafkaBufferConfig = objectMapper.convertValue(bufferConfigMap, KafkaBufferConfig.class);
KafkaBuffer objectUnderTest = createObjectUnderTest();

Record<Event> record = createLargeRecord();
objectUnderTest.write(record, 1_000);

Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = objectUnderTest.read(10_000);

assertThat(readResult, notNullValue());
assertThat(readResult.getKey(), notNullValue());
assertThat(readResult.getKey().size(), equalTo(1));

Record<Event> onlyResult = readResult.getKey().stream().iterator().next();

assertThat(onlyResult, notNullValue());
assertThat(onlyResult.getData(), notNullValue());
// TODO: The metadata is not included. It needs to be included in the Buffer, though not in the Sink. This may be something we make configurable in the consumer/producer - whether to serialize the metadata or not.
//assertThat(onlyResult.getData().getMetadata(), equalTo(record.getData().getMetadata()));
assertThat(onlyResult.getData().toMap(), equalTo(record.getData().toMap()));
}

@Test
void writeBytes_and_read() throws Exception {
byteDecoder = new JsonDecoder();
Expand Down Expand Up @@ -362,6 +413,11 @@ private byte[] createRandomBytes() {
return writtenBytes;
}

private Record<Event> createLargeRecord() {
Event event = JacksonEvent.fromMessage(RandomStringUtils.randomAlphabetic(3_000_000));
return new Record<>(event);
}

private Record<Event> createRecord() {
Event event = JacksonEvent.fromMessage(UUID.randomUUID().toString());
return new Record<>(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);


@JsonProperty("encryption_key")
private String encryptionKey;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,22 @@
import java.util.List;

public class KafkaProducerProperties {
private static final String DEFAULT_BYTE_CAPACITY = "50mb";
static final String DEFAULT_BYTE_CAPACITY = "50mb";
static final Duration DEFAULT_DELIVERY_TIMEOUT_MS = Duration.ofMillis(120000);
static final Long DEFAULT_LINGER_MS = 0L;
static final Duration DEFAULT_MAX_BLOCK_MS = Duration.ofMillis(60000);
static final Duration DEFAULT_CONNECTION_MAX_IDLE_MS = Duration.ofMillis(540000);
static final Duration DEFAULT_REQUEST_TIMEOUT_MS = Duration.ofMillis(30000);
static final Duration DEFAULT_SOCKET_CONNECTION_SETUP_MAX_TIMEOUT = Duration.ofMillis(30000);
static final Duration DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT = Duration.ofMillis(10000);
static final Duration DEFAULT_METADATA_MAX_AGE_MS = Duration.ofMillis(300000);
static final Duration DEFAULT_METADATA_MAX_IDLE_MS = Duration.ofMillis(300000);
static final Duration DEFAULT_METRICS_SAMPLE_WINDOW_MS = Duration.ofMillis(300000);
static final Duration DEFAULT_PARTITIONER_AVAILABILITY_TIMEOUT_MS = Duration.ofMillis(0);
static final Duration DEFAULT_RECONNECT_BACKOFF_MAX_MS = Duration.ofMillis(1000);
static final Duration DEFAULT_RECONNECT_BACKOFF_MS = Duration.ofMillis(50);
static final Duration DEFAULT_RETRY_BACKOFF_MS = Duration.ofMillis(100);
public static final int DEFAULT_MAX_REQUEST_SIZE = 1024*1024;

@JsonProperty("buffer_memory")
private String bufferMemory = DEFAULT_BYTE_CAPACITY;
Expand All @@ -27,19 +42,19 @@ public class KafkaProducerProperties {
private String clientId;

@JsonProperty("connections_max_idle")
private Duration connectionsMaxIdleMs;
private Duration connectionsMaxIdleMs = DEFAULT_CONNECTION_MAX_IDLE_MS;

@JsonProperty("delivery_timeout")
private Duration deliveryTimeoutMs;
private Duration deliveryTimeoutMs = DEFAULT_DELIVERY_TIMEOUT_MS;

@JsonProperty("linger_ms")
private Long lingerMs;
private Long lingerMs = DEFAULT_LINGER_MS;

@JsonProperty("max_block")
private Duration maxBlockMs;
private Duration maxBlockMs = DEFAULT_MAX_BLOCK_MS;

@JsonProperty("max_request_size")
private int maxRequestSize;
private int maxRequestSize = DEFAULT_MAX_REQUEST_SIZE;

@JsonProperty("partitioner_class")
private Class partitionerClass;
Expand All @@ -51,16 +66,16 @@ public class KafkaProducerProperties {
private String receiveBufferBytes=DEFAULT_BYTE_CAPACITY;

@JsonProperty("request_timeout")
private Duration requestTimeoutMs;
private Duration requestTimeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;

@JsonProperty("send_buffer")
private String sendBufferBytes=DEFAULT_BYTE_CAPACITY;

@JsonProperty("socket_connection_setup_timeout_max")
private Duration socketConnectionSetupMaxTimeout;
private Duration socketConnectionSetupMaxTimeout = DEFAULT_SOCKET_CONNECTION_SETUP_MAX_TIMEOUT;

@JsonProperty("socket_connection_setup_timeout")
private Duration socketConnectionSetupTimeout;
private Duration socketConnectionSetupTimeout = DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT;

@JsonProperty("acks")
private String acks;
Expand All @@ -75,10 +90,10 @@ public class KafkaProducerProperties {
private int maxInFlightRequestsPerConnection;

@JsonProperty("metadata_max_age")
private Duration metadataMaxAgeMs;
private Duration metadataMaxAgeMs = DEFAULT_METADATA_MAX_AGE_MS;

@JsonProperty("metadata_max_idle")
private Duration metadataMaxIdleMs;
private Duration metadataMaxIdleMs = DEFAULT_METADATA_MAX_IDLE_MS;

@JsonProperty("metric_reporters")
private List metricReporters;
Expand All @@ -90,22 +105,22 @@ public class KafkaProducerProperties {
private String metricsRecordingLevel;

@JsonProperty("metrics_sample_window")
private Duration metricsSampleWindowMs;
private Duration metricsSampleWindowMs = DEFAULT_METRICS_SAMPLE_WINDOW_MS;

@JsonProperty("partitioner_adaptive_partitioning_enable")
private boolean partitionerAdaptivePartitioningEnable;

@JsonProperty("partitioner_availability_timeout")
private Duration partitionerAvailabilityTimeoutMs;
private Duration partitionerAvailabilityTimeoutMs = DEFAULT_PARTITIONER_AVAILABILITY_TIMEOUT_MS;

@JsonProperty("reconnect_backoff_max")
private Duration reconnectBackoffMaxMs;
private Duration reconnectBackoffMaxMs = DEFAULT_RECONNECT_BACKOFF_MAX_MS;

@JsonProperty("reconnect_backoff")
private Duration reconnectBackoffMs;
private Duration reconnectBackoffMs = DEFAULT_RECONNECT_BACKOFF_MS;

@JsonProperty("retry_backoff")
private Duration retryBackoffMs;
private Duration retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;


public String getCompressionType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import org.opensearch.dataprepper.plugins.kafka.common.key.KeyFactory;
import org.opensearch.dataprepper.plugins.kafka.common.serialization.SerializationFactory;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerProperties;

import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.service.SchemaService;
import org.opensearch.dataprepper.plugins.kafka.service.TopicService;
Expand Down Expand Up @@ -53,11 +54,23 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
final boolean topicNameInMetrics) {
AwsContext awsContext = new AwsContext(kafkaProducerConfig, awsCredentialsSupplier);
KeyFactory keyFactory = new KeyFactory(awsContext);
prepareTopicAndSchema(kafkaProducerConfig);
// If either or both of Producer's max_request_size or
// Topic's max_message_bytes is set, then maximum of the
// two is set for both. If neither is set, then defaults are used.
Integer maxRequestSize = null;
KafkaProducerProperties producerProperties = kafkaProducerConfig.getKafkaProducerProperties();
if (producerProperties != null) {
int producerMaxRequestSize = producerProperties.getMaxRequestSize();
if (producerMaxRequestSize > KafkaProducerProperties.DEFAULT_MAX_REQUEST_SIZE) {
maxRequestSize = Integer.valueOf(producerMaxRequestSize);
}
}
prepareTopicAndSchema(kafkaProducerConfig, maxRequestSize);
Properties properties = SinkPropertyConfigurer.getProducerProperties(kafkaProducerConfig);
KafkaSecurityConfigurer.setAuthProperties(properties, kafkaProducerConfig, LOG);
properties = Objects.requireNonNull(properties);
TopicConfig topic = kafkaProducerConfig.getTopic();
KafkaSecurityConfigurer.setAuthProperties(properties, kafkaProducerConfig, LOG);

TopicProducerConfig topic = kafkaProducerConfig.getTopic();
KafkaDataConfig dataConfig = new KafkaDataConfigAdapter(keyFactory, topic);
Serializer<Object> keyDeserializer = (Serializer<Object>) serializationFactory.getSerializer(PlaintextKafkaDataConfig.plaintextDataConfig(dataConfig));
Serializer<Object> valueSerializer = (Serializer<Object>) serializationFactory.getSerializer(dataConfig);
Expand All @@ -70,8 +83,8 @@ public KafkaCustomProducer createProducer(final KafkaProducerConfig kafkaProduce
kafkaProducerConfig, dlqSink,
expressionEvaluator, Objects.nonNull(sinkContext) ? sinkContext.getTagsTargetKey() : null, topicMetrics, schemaService);
}
private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig) {
checkTopicCreationCriteriaAndCreateTopic(kafkaProducerConfig);
private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) {
checkTopicCreationCriteriaAndCreateTopic(kafkaProducerConfig, maxRequestSize);
final SchemaConfig schemaConfig = kafkaProducerConfig.getSchemaConfig();
if (schemaConfig != null) {
if (schemaConfig.isCreate()) {
Expand All @@ -87,14 +100,18 @@ private void prepareTopicAndSchema(final KafkaProducerConfig kafkaProducerConfig

}

private void checkTopicCreationCriteriaAndCreateTopic(final KafkaProducerConfig kafkaProducerConfig) {
private void checkTopicCreationCriteriaAndCreateTopic(final KafkaProducerConfig kafkaProducerConfig, final Integer maxRequestSize) {
final TopicProducerConfig topic = kafkaProducerConfig.getTopic();
if (!topic.isCreateTopic()) {
final TopicService topicService = new TopicService(kafkaProducerConfig);
topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor());
Long maxMessageBytes = null;
if (maxRequestSize != null) {
maxMessageBytes = Long.valueOf(maxRequestSize);
}
topicService.createTopic(kafkaProducerConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor(), maxMessageBytes);
topicService.closeAdminClient();
}


}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.config.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.util.SinkPropertyConfigurer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

public class TopicService {
private static final Logger LOG = LoggerFactory.getLogger(TopicService.class);
Expand All @@ -22,9 +25,15 @@ public TopicService(final KafkaProducerConfig kafkaProducerConfig) {
this.adminClient = AdminClient.create(SinkPropertyConfigurer.getPropertiesForAdminClient(kafkaProducerConfig));
}

public void createTopic(final String topicName, final Integer numberOfPartitions, final Short replicationFactor) {
public void createTopic(final String topicName, final Integer numberOfPartitions, final Short replicationFactor, final Long maxMessageBytes) {
try {
final NewTopic newTopic = new NewTopic(topicName, numberOfPartitions, replicationFactor);
if (maxMessageBytes != null) {
Map<String, String> configOptions = new HashMap<>();
configOptions.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, Long.toString(maxMessageBytes));
newTopic.configs(configOptions);
}

adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
LOG.info(topicName + " created successfully");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ public void doOutput(Collection<Record<Event>> records) {
return;
}
try {
// TODO: Looks like this call to prepareTopicAndSchema is unnecessary as it is
// done in createProducer().
prepareTopicAndSchema();
final KafkaCustomProducer producer = createProducer();
records.forEach(record -> {
Expand Down Expand Up @@ -152,7 +154,7 @@ private void checkTopicCreationCriteriaAndCreateTopic() {
final TopicProducerConfig topic = kafkaSinkConfig.getTopic();
if (topic.isCreateTopic()) {
final TopicService topicService = new TopicService(kafkaSinkConfig);
topicService.createTopic(kafkaSinkConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor());
topicService.createTopic(kafkaSinkConfig.getTopic().getName(), topic.getNumberOfPartitions(), topic.getReplicationFactor(), null);
topicService.closeAdminClient();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.KmsConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicProducerConfig;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.model.types.ByteCount;

public class SinkTopicConfig extends CommonTopicConfig implements TopicProducerConfig {
private static final Integer DEFAULT_NUM_OF_PARTITIONS = 1;
private static final Short DEFAULT_REPLICATION_FACTOR = 1;
private static final Long DEFAULT_RETENTION_PERIOD = 604800000L;
static final ByteCount DEFAULT_MAX_MESSAGE_BYTES = ByteCount.parse("1mb");

@JsonProperty("serde_format")
private MessageFormat serdeFormat = MessageFormat.PLAINTEXT;
Expand All @@ -31,6 +33,9 @@ public class SinkTopicConfig extends CommonTopicConfig implements TopicProducerC
@JsonProperty("create_topic")
private boolean isCreateTopic = false;

@JsonProperty("max_message_bytes")
private ByteCount maxMessageBytes = DEFAULT_MAX_MESSAGE_BYTES;

@Override
public MessageFormat getSerdeFormat() {
return serdeFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ void invalid_getFetchMaxBytes_zero_bytes() throws NoSuchFieldException, IllegalA
setField(BufferTopicConfig.class, objectUnderTest, "fetchMaxBytes", ByteCount.zeroBytes());
assertThrows(RuntimeException.class, () -> objectUnderTest.getFetchMaxBytes());
}
}

}
Loading

0 comments on commit 774fa21

Please sign in to comment.