Skip to content

Commit

Permalink
Rebasing changes from main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthjg committed Jun 3, 2024
1 parent 1bae313 commit fc15b70
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,8 @@ public void setAttribute(final String key, final Object value) {
public Object getAttribute(final String attributeKey) {
String key = (attributeKey.charAt(0) == '/') ? attributeKey.substring(1) : attributeKey;

Map<String, Object> mapObject = attributes;
if (key.contains("/")) {
String[] keys = key.split("/");
for (int i = 0; i < keys.length-1; i++) {
Object value = mapObject.get(keys[i]);
if (value == null || !(value instanceof Map)) {
return null;
}
mapObject = (Map<String, Object>)value;
key = keys[i+1];
}
}
return mapObject.get(key);
// Does not support recursive or inner-object lookups for now.
return attributes.get(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,25 +134,6 @@ public void testSetAttribute(String key, final Object value) {
assertThat(eventMetadata.getAttribute(key), equalTo(value));
}

private static Stream<Arguments> getNestedAttributeTestInputs() {
return Stream.of(Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", "v3")), "k1", "v1"),
Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", "v3")), "k2/k3", "v3"),
Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", Map.of("k4", 4))), "k2/k3/k4", 4),
Arguments.of(Map.of("k1", "v1", "k2", Map.of("k3", 4)), "k2/k3/k4", null),
Arguments.of(Map.of("k1","v1"),"k1", "v1"));
}

@ParameterizedTest
@MethodSource("getNestedAttributeTestInputs")
public void testNestedGetAttribute(Map<String, Object> attributes, final String key, final Object expectedValue) {
eventMetadata = DefaultEventMetadata.builder()
.withEventType(testEventType)
.withTimeReceived(testTimeReceived)
.withAttributes(attributes)
.build();
assertThat(eventMetadata.getAttribute(key), equalTo(expectedValue));
}

@Test
public void test_with_ExternalOriginationTime() {
Instant now = Instant.now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -39,10 +36,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand All @@ -54,15 +49,13 @@

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;


public class KafkaSourceJsonTypeIT {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceJsonTypeIT.class);
private static final int TEST_ID = 123456;
Expand Down Expand Up @@ -105,21 +98,13 @@ public class KafkaSourceJsonTypeIT {
private String testKey;
private String testTopic;
private String testGroup;
private String headerKey1;
private byte[] headerValue1;
private String headerKey2;
private byte[] headerValue2;

public KafkaSource createObjectUnderTest() {
return new KafkaSource(sourceConfig, pluginMetrics, acknowledgementSetManager, pipelineDescription, kafkaClusterConfigSupplier, pluginConfigObservable);
}

@BeforeEach
public void setup() throws Throwable {
headerKey1 = RandomStringUtils.randomAlphabetic(6);
headerValue1 = RandomStringUtils.randomAlphabetic(10).getBytes(StandardCharsets.UTF_8);
headerKey2 = RandomStringUtils.randomAlphabetic(5);
headerValue2 = RandomStringUtils.randomAlphabetic(15).getBytes(StandardCharsets.UTF_8);
sourceConfig = mock(KafkaSourceConfig.class);
pluginMetrics = mock(PluginMetrics.class);
counter = mock(Counter.class);
Expand Down Expand Up @@ -224,13 +209,6 @@ public void TestJsonRecordsWithNullKey() throws Exception {
assertThat(map.get("kafka_key"), equalTo(null));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
}
}

Expand Down Expand Up @@ -262,13 +240,6 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
event.getEventHandle().release(false);
}
receivedRecords.clear();
Expand All @@ -287,13 +258,6 @@ public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
event.getEventHandle().release(true);
}
}
Expand Down Expand Up @@ -325,13 +289,6 @@ public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
}
}

Expand Down Expand Up @@ -363,13 +320,6 @@ public void TestJsonRecordsWithKafkaKeyModeAsField() throws Exception {
assertThat(map.get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
}
}

Expand Down Expand Up @@ -401,13 +351,6 @@ public void TestJsonRecordsWithKafkaKeyModeAsMetadata() throws Exception {
assertThat(metadata.getAttributes().get("kafka_key"), equalTo(testKey));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(testTopic));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
Map<String, byte[]> kafkaHeaders = (Map<String, byte[]>) metadata.getAttributes().get("kafka_headers");
assertThat(kafkaHeaders.get(headerKey1), equalTo(headerValue1));
assertThat(kafkaHeaders.get(headerKey2), equalTo(headerValue2));
assertThat(metadata.getAttributes().get("kafka_timestamp"), not(equalTo(null)));
assertThat(metadata.getAttributes().get("kafka_timestamp_type"), equalTo("CreateTime"));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey1), equalTo(headerValue1));
assertThat(metadata.getAttribute("kafka_headers/"+headerKey2), equalTo(headerValue2));
}
}

Expand All @@ -421,12 +364,8 @@ public void produceJsonRecords(final String servers, final String topicName, fin
KafkaProducer producer = new KafkaProducer(props);
for (int i = 0; i < numRecords; i++) {
String value = "{\"name\":\"testName" + i + "\", \"id\":" + (TEST_ID + i) + ", \"status\":true}";
List<Header> headers = Arrays.asList(
new RecordHeader(headerKey1, headerValue1),
new RecordHeader(headerKey2, headerValue2)
);
ProducerRecord<String, String> record =
new ProducerRecord<>(topicName, null, testKey, value, new RecordHeaders(headers));
new ProducerRecord<>(topicName, testKey, value);
producer.send(record);
try {
Thread.sleep(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,6 @@ class BufferTopicConfig extends CommonTopicConfig implements TopicProducerConfig
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
private String groupId;

@JsonProperty("client_id")
@Valid
@Size(min = 1, max = 255, message = "size of client id should be between 1 and 255")
private String clientId;

@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
Expand Down Expand Up @@ -140,11 +135,6 @@ public String getGroupId() {
return groupId;
}

@Override
public String getClientId() {
return clientId;
}

@Override
public Duration getCommitInterval() {
return commitInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ public interface TopicConsumerConfig extends TopicConfig {

String getGroupId();

String getClientId();

Boolean getAutoCommit();

String getAutoOffsetReset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.RecordDeserializationException;
Expand Down Expand Up @@ -428,16 +426,6 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
if (kafkaKeyMode == KafkaKeyMode.INCLUDE_AS_METADATA) {
eventMetadata.setAttribute("kafka_key", key);
}
Headers headers = consumerRecord.headers();
if (headers != null) {
Map<String, byte[]> headerData = new HashMap<>();
for (Header header: headers) {
headerData.put(header.key(), header.value());
}
eventMetadata.setAttribute("kafka_headers", headerData);
}
eventMetadata.setAttribute("kafka_timestamp", consumerRecord.timestamp());
eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString());
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,14 @@ private Properties getConsumerProperties(final KafkaConsumerConfig sourceConfig,
break;
}
}
setConsumerTopicProperties(properties, topicConfig, topicConfig.getGroupId());
setConsumerTopicProperties(properties, topicConfig);
setSchemaRegistryProperties(sourceConfig, properties, topicConfig);
LOG.debug("Starting consumer with the properties : {}", properties);
return properties;
}


public static void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig,
final String groupId) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
if (Objects.nonNull(topicConfig.getClientId())) {
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, topicConfig.getClientId());
}
private void setConsumerTopicProperties(final Properties properties, final TopicConsumerConfig topicConfig) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, topicConfig.getGroupId());
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaRegistryType;
import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConfig;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumer;
import org.opensearch.dataprepper.plugins.kafka.consumer.KafkaCustomConsumerFactory;
import org.opensearch.dataprepper.plugins.kafka.consumer.PauseConsumePredicate;
import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier;
import org.opensearch.dataprepper.plugins.kafka.util.ClientDNSLookupType;
Expand Down Expand Up @@ -319,7 +318,25 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic
}

private void setConsumerTopicProperties(Properties properties, TopicConsumerConfig topicConfig) {
KafkaCustomConsumerFactory.setConsumerTopicProperties(properties, topicConfig, consumerGroupID);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID);
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
topicConfig.getAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
((Long) topicConfig.getCommitInterval().toMillis()).intValue());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
((Long) topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes());
}

private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ class SourceTopicConfig extends CommonTopicConfig implements TopicConsumerConfig
@Size(min = 1, max = 255, message = "size of group id should be between 1 and 255")
private String groupId;

@JsonProperty("client_id")
@Valid
@Size(min = 1, max = 255, message = "size of client id should be between 1 and 255")
private String clientId;

@JsonProperty("workers")
@Valid
@Size(min = 1, max = 200, message = "Number of worker threads should lies between 1 and 200")
Expand Down Expand Up @@ -126,11 +121,6 @@ public String getGroupId() {
return groupId;
}

@Override
public String getClientId() {
return clientId;
}

@Override
public MessageFormat getSerdeFormat() {
return serdeFormat;
Expand Down
Loading

0 comments on commit fc15b70

Please sign in to comment.