Skip to content

Commit

Permalink
Minor fixes to Kafka Source
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 16, 2023
1 parent f11d882 commit eb2af77
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void TestJsonRecordsWithNullKey() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(null));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -235,7 +235,7 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
assertThat(map.get("id"), equalTo(TEST_ID+i));
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -295,7 +295,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(map.get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down Expand Up @@ -356,7 +356,7 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo(0));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TopicConfig {
static final Duration DEFAULT_RETRY_BACKOFF = Duration.ofSeconds(10);
static final Duration DEFAULT_RECONNECT_BACKOFF = Duration.ofSeconds(10);
static final Integer DEFAULT_MAX_PARTITION_FETCH_BYTES = 1048576;
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300000);
static final Duration DEFAULT_MAX_POLL_INTERVAL = Duration.ofSeconds(300);
static final Integer DEFAULT_CONSUMER_MAX_POLL_RECORDS = 500;
static final Integer DEFAULT_NUM_OF_WORKERS = 2;
static final Duration DEFAULT_HEART_BEAT_INTERVAL_DURATION = Duration.ofSeconds(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,20 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
@Override
public void run() {
consumer.subscribe(Arrays.asList(topicName));
boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
try {
if (retryingAfterException) {
Thread.sleep(10000);
}
resetOffsets();
commitOffsets();
consumeRecords();
topicMetrics.update(consumer);
retryingAfterException = false;
} catch (Exception exp) {
LOG.error("Error while reading the records from the topic {}", topicName, exp);
LOG.error("Error while reading the records from the topic {}. Retry after 10 seconds", topicName, exp);
retryingAfterException = true;
}
}
}
Expand Down Expand Up @@ -292,7 +298,7 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
eventMetadata.setAttribute("kafka_key", key);
}
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", partition);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));

return new Record<Event>(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package org.opensearch.dataprepper.plugins.kafka.util;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -19,19 +20,18 @@ public enum MessageFormat {
PLAINTEXT("plaintext"), JSON("json"), AVRO("avro");

private static final Map<String, MessageFormat> MESSAGE_FORMAT_MAP = Arrays.stream(MessageFormat.values())
.collect(Collectors.toMap(MessageFormat::toString, Function.identity()));

private final String messageFormatName;
.collect(Collectors.toMap(
value -> value.type,
value -> value
));

MessageFormat(final String name) {
this.messageFormatName = name;
}
private final String type;

@Override
public String toString() {
return this.messageFormatName;
MessageFormat(final String type) {
this.type = type;
}

@JsonCreator
public static MessageFormat getByMessageFormatByName(final String name) {
return MESSAGE_FORMAT_MAP.get(name.toLowerCase());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void testConfigValues_from_yaml() {
assertEquals(500L, topicConfig.getFetchMaxWait().longValue());
assertEquals(1L, topicConfig.getFetchMinBytes().longValue());
assertEquals(Duration.ofSeconds(100), topicConfig.getRetryBackoff());
assertEquals(Duration.ofSeconds(300000), topicConfig.getMaxPollInterval());
assertEquals(Duration.ofSeconds(300), topicConfig.getMaxPollInterval());
assertEquals(500L, topicConfig.getConsumerMaxPollRecords().longValue());
assertEquals(5, topicConfig.getWorkers().intValue());
assertEquals(Duration.ofSeconds(3), topicConfig.getHeartBeatInterval());
Expand Down

0 comments on commit eb2af77

Please sign in to comment.