Skip to content

Commit

Permalink
Minor fixes to Kafka Source (#3174)
Browse files Browse the repository at this point in the history
* Minor fixes to Kafka Source

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed unused configs

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 16, 2023
1 parent e3b425e commit 1f0ad76
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void TestJsonRecordsWithNullKey() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(null));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -235,7 +235,7 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
assertThat(map.get("id"), equalTo(TEST_ID+i));
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -356,7 +356,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,29 @@ public boolean hasOnlyOneConfig() {
}


public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication
/*
* TODO
public static class SslAuthConfig {
// TODO Add Support for SSL authentication types like
// one-way or two-way authentication
public SslAuthConfig() {
public SslAuthConfig() {
}
}
@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;
public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}
}
@JsonProperty("ssl")
private SslAuthConfig sslAuthConfig;
*/

@Valid
@JsonProperty("sasl")
private SaslAuthConfig saslAuthConfig;

public SslAuthConfig getSslAuthConfig() {
return sslAuthConfig;
}

public SaslAuthConfig getSaslAuthConfig() {
return saslAuthConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@ public class TopicConfig {
static final boolean DEFAULT_AUTO_COMMIT = false;
static final Duration DEFAULT_COMMIT_INTERVAL = Duration.ofSeconds(5);
static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofSeconds(45);
static final int DEFAULT_MAX_RETRY_ATTEMPT = Integer.MAX_VALUE;
static final String DEFAULT_AUTO_OFFSET_RESET = "earliest";
static final Duration DEFAULT_THREAD_WAITING_TIME = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_RECORD_FETCH_TIME = Duration.ofSeconds(4);
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(5);
static final Duration DEFAULT_MAX_RETRY_DELAY = Duration.ofSeconds(1);
static final Integer DEFAULT_FETCH_MAX_BYTES = 52428800;
static final Integer DEFAULT_FETCH_MAX_WAIT = 500;
static final Integer DEFAULT_FETCH_MIN_BYTES = 1;
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10);
static final Integer DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300);
static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);
Expand All @@ -53,16 +50,6 @@ public class TopicConfig {
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
private Integer workers = DEFAULT_NUM_OF_WORKERS;

@JsonProperty("max_retry_attempts")
@Valid
@Size(min = 1, max = Integer.MAX_VALUE, message = " Max retry attempts should lies between 1 and Integer.MAX_VALUE")
private Integer maxRetryAttempts = DEFAULT_MAX_RETRY_ATTEMPT;

@JsonProperty("max_retry_delay")
@Valid
@Size(min = 1)
private Duration maxRetryDelay = DEFAULT_MAX_RETRY_DELAY;

@JsonProperty("serde_format")
private MessageFormat serdeFormat= MessageFormat.PLAINTEXT;

Expand All @@ -82,25 +69,12 @@ public class TopicConfig {
@JsonProperty("auto_offset_reset")
private String autoOffsetReset = DEFAULT_AUTO_OFFSET_RESET;

@JsonProperty("group_name")
@Valid
@Size(min = 1, max = 255, message = "size of group name should be between 1 and 255")
private String groupName;

@JsonProperty("thread_waiting_time")
private Duration threadWaitingTime = DEFAULT_THREAD_WAITING_TIME;

@JsonProperty("max_record_fetch_time")
private Duration maxRecordFetchTime = DEFAULT_MAX_RECORD_FETCH_TIME;

@JsonProperty("max_partition_fetch_bytes")
private Integer maxPartitionFetchBytes = DEFAULT_MAX_PARTITION_FETCH_BYTES;

@JsonProperty("buffer_default_timeout")
@Valid
@Size(min = 1)
private Duration bufferDefaultTimeout = DEFAULT_BUFFER_TIMEOUT;

@JsonProperty("fetch_max_bytes")
@Valid
@Size(min = 1, max = 52428800)
Expand Down Expand Up @@ -144,10 +118,6 @@ public void setGroupId(String groupId) {
this.groupId = groupId;
}

public void setMaxRetryAttempts(Integer maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
}

public MessageFormat getSerdeFormat() {
return serdeFormat;
}
Expand Down Expand Up @@ -176,14 +146,6 @@ public void setAutoOffsetReset(String autoOffsetReset) {
this.autoOffsetReset = autoOffsetReset;
}

public String getGroupName() {
return groupName;
}

public void setGroupName(String groupName) {
this.groupName = groupName;
}

public Duration getThreadWaitingTime() {
return threadWaitingTime;
}
Expand All @@ -192,26 +154,10 @@ public void setThreadWaitingTime(Duration threadWaitingTime) {
this.threadWaitingTime = threadWaitingTime;
}

public Duration getMaxRecordFetchTime() {
return maxRecordFetchTime;
}

public Integer getMaxPartitionFetchBytes() {
return maxPartitionFetchBytes;
}

public void setMaxRecordFetchTime(Duration maxRecordFetchTime) {
this.maxRecordFetchTime = maxRecordFetchTime;
}

public Duration getBufferDefaultTimeout() {
return bufferDefaultTimeout;
}

public void setBufferDefaultTimeout(Duration bufferDefaultTimeout) {
this.bufferDefaultTimeout = bufferDefaultTimeout;
}

public Integer getFetchMaxBytes() {
return fetchMaxBytes;
}
Expand Down Expand Up @@ -264,14 +210,6 @@ public void setWorkers(Integer workers) {
this.workers = workers;
}

public Duration getMaxRetryDelay() {
return maxRetryDelay;
}

public void setMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}

public Duration getHeartBeatInterval() {
return heartBeatInterval;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,20 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
@Override
public void run() {
consumer.subscribe(Arrays.asList(topicName));
boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
try {
if (retryingAfterException) {
Thread.sleep(10000);
}
resetOffsets();
commitOffsets();
consumeRecords();
topicMetrics.update(consumer);
retryingAfterException = false;
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic {}", topicName, exp);
LOG.error("Error while reading the records from the topic {}. Retry after 10 seconds", topicName, exp);
retryingAfterException = true;
}
}
}
Expand Down Expand Up @@ -292,7 +298,7 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
eventMetadata.setAttribute("kafka_key", key);
}
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", partition);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));

return new Record<Event>(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package org.opensearch.dataprepper.plugins.kafka.util;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -19,19 +20,18 @@ public enum MessageFormat {
PLAINTEXT("plaintext"), JSON("json"), AVRO("avro");

private static final Map<String, MessageFormat> MESSAGE_FORMAT_MAP = Arrays.stream(MessageFormat.values())
.collect(Collectors.toMap(MessageFormat::toString, Function.identity()));

private final String messageFormatName;
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

MessageFormat(final String name) {
this.messageFormatName = name;
}
private final String type;

@Override
public String toString() {
return this.messageFormatName;
MessageFormat(final String type) {
this.type = type;
}

@JsonCreator
public static MessageFormat getByMessageFormatByName(final String name) {
return MESSAGE_FORMAT_MAP.get(name.toLowerCase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ void testConfigValues_default() {
assertEquals(TopicConfig.DEFAULT_SESSION_TIMEOUT, topicConfig.getSessionTimeOut());
assertEquals(TopicConfig.DEFAULT_AUTO_OFFSET_RESET, topicConfig.getAutoOffsetReset());
assertEquals(TopicConfig.DEFAULT_THREAD_WAITING_TIME, topicConfig.getThreadWaitingTime());
assertEquals(TopicConfig.DEFAULT_MAX_RECORD_FETCH_TIME, topicConfig.getMaxRecordFetchTime());
assertEquals(TopicConfig.DEFAULT_BUFFER_TIMEOUT, topicConfig.getBufferDefaultTimeout());
assertEquals(TopicConfig.DEFAULT_FETCH_MAX_BYTES, topicConfig.getFetchMaxBytes());
assertEquals(TopicConfig.DEFAULT_FETCH_MAX_WAIT, topicConfig.getFetchMaxWait());
assertEquals(TopicConfig.DEFAULT_FETCH_MIN_BYTES, topicConfig.getFetchMinBytes());
Expand All @@ -96,13 +94,11 @@ void testConfigValues_from_yaml() {
assertEquals(45000, topicConfig.getSessionTimeOut().toMillis());
assertEquals("earliest", topicConfig.getAutoOffsetReset());
assertEquals(Duration.ofSeconds(1), topicConfig.getThreadWaitingTime());
assertEquals(Duration.ofSeconds(4), topicConfig.getMaxRecordFetchTime());
assertEquals(Duration.ofSeconds(5), topicConfig.getBufferDefaultTimeout());
assertEquals(52428800, topicConfig.getFetchMaxBytes().longValue());
assertEquals(500L, topicConfig.getFetchMaxWait().longValue());
assertEquals(1L, topicConfig.getFetchMinBytes().longValue());
assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff());
assertEquals(Duration.ofSeconds(300000), topicConfig.getMaxPollInterval());
assertEquals(Duration.ofSeconds(300), topicConfig.getMaxPollInterval());
assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue());
assertEquals(5, topicConfig.getWorkers().intValue());
assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval());
Expand All @@ -118,8 +114,6 @@ void testConfigValues_from_yaml_not_null() {
assertNotNull(topicConfig.getSessionTimeOut());
assertNotNull(topicConfig.getAutoOffsetReset());
assertNotNull(topicConfig.getThreadWaitingTime());
assertNotNull(topicConfig.getMaxRecordFetchTime());
assertNotNull(topicConfig.getBufferDefaultTimeout());
assertNotNull(topicConfig.getFetchMaxBytes());
assertNotNull(topicConfig.getFetchMaxWait());
assertNotNull(topicConfig.getFetchMinBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ log-pipeline:
- 127.0.0.1:9093
topics:
- name: my-topic-2
group_name: kafka-consumer-group-2
group_id: my-test-group
- name: my-topic-1
group_id: my-test-group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ log-pipeline:
auto_commit: false
commit_interval: PT5S
session_timeout: PT45S
max_retry_attempts: 1000
auto_offset_reset: earliest
thread_waiting_time: PT1S
max_record_fetch_time: PT4S
heart_beat_interval: PT3S
buffer_default_timeout: PT5S
fetch_max_bytes: 52428800
fetch_max_wait: 500
fetch_min_bytes: 1
Expand Down

0 comments on commit 1f0ad76

Please sign in to comment.