Skip to content

Commit

Permalink
merged with latest (#3182)
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
Co-authored-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka and Krishna Kondaka authored Aug 17, 2023
1 parent 1f4b48e commit 3527424
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventMetadata;
import org.opensearch.dataprepper.acknowledgements.DefaultAcknowledgementSetManager;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaSourceConfig;
import org.opensearch.dataprepper.plugins.kafka.configuration.KafkaKeyMode;
import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionType;
Expand All @@ -45,6 +46,8 @@
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import java.time.Duration;
Expand Down Expand Up @@ -97,7 +100,8 @@ public void setup() {
buffer = mock(Buffer.class);
encryptionConfig = mock(KafkaSourceConfig.EncryptionConfig.class);
receivedRecords = new ArrayList<>();
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
ExecutorService executor = Executors.newFixedThreadPool(2);
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
pipelineDescription = mock(PipelineDescription.class);
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(false);
when(sourceConfig.getAcknowledgementsTimeout()).thenReturn(KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT);
Expand Down Expand Up @@ -193,15 +197,94 @@ public void TestJsonRecordsWithNullKey() throws Exception {
}

@Test
public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
public void TestJsonRecordsWithNegativeAcknowledgements() throws Exception {
final int numRecords = 1;
when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE);
when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.DISCARD);
when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic));
when(sourceConfig.getAuthConfig()).thenReturn(null);
when(sourceConfig.getAcknowledgementsEnabled()).thenReturn(true);
kafkaSource = createObjectUnderTest();

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AtomicBoolean created = new AtomicBoolean(false);
final String topicName = jsonTopic.getName();
try (AdminClient adminClient = AdminClient.create(props)) {
try {
adminClient.createTopics(
Collections.singleton(new NewTopic(topicName, 1, (short)1)))
.all().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
created.set(true);
}
while (created.get() != true) {
Thread.sleep(1000);
}
kafkaSource.start(buffer);
produceJsonRecords(bootstrapServers, topicName, numRecords);
int numRetries = 0;
while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) {
Thread.sleep(1000);
}
assertThat(receivedRecords.size(), equalTo(numRecords));
for (int i = 0; i < numRecords; i++) {
Record<Event> record = receivedRecords.get(i);
Event event = (Event)record.getData();
EventMetadata metadata = event.getMetadata();
Map<String, Object> map = event.toMap();
assertThat(map.get("name"), equalTo("testName"+i));
assertThat(map.get("id"), equalTo(TEST_ID+i));
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
event.getEventHandle().release(false);
}
receivedRecords.clear();
Thread.sleep(10000);
while (numRetries++ < 10 && (receivedRecords.size() != numRecords)) {
Thread.sleep(1000);
}
assertThat(receivedRecords.size(), equalTo(numRecords));
for (int i = 0; i < numRecords; i++) {
Record<Event> record = receivedRecords.get(i);
Event event = (Event)record.getData();
EventMetadata metadata = event.getMetadata();
Map<String, Object> map = event.toMap();
assertThat(map.get("name"), equalTo("testName"+i));
assertThat(map.get("id"), equalTo(TEST_ID+i));
assertThat(map.get("status"), equalTo(true));
assertThat(metadata.getAttributes().get("kafka_topic"), equalTo(topicName));
assertThat(metadata.getAttributes().get("kafka_partition"), equalTo("0"));
event.getEventHandle().release(true);
}
try (AdminClient adminClient = AdminClient.create(props)) {
try {
adminClient.deleteTopics(Collections.singleton(topicName))
.all().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
created.set(false);
}
while (created.get() != false) {
Thread.sleep(1000);
}
}

@Test
public void TestJsonRecordsWithKafkaKeyModeDiscard() throws Exception {
final int numRecords = 1;
when(encryptionConfig.getType()).thenReturn(EncryptionType.NONE);
when(jsonTopic.getConsumerMaxPollRecords()).thenReturn(numRecords);
when(jsonTopic.getKafkaKeyMode()).thenReturn(KafkaKeyMode.DISCARD);
when(sourceConfig.getTopics()).thenReturn(List.of(jsonTopic));
when(sourceConfig.getAuthConfig()).thenReturn(null);
kafkaSource = createObjectUnderTest();

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
AtomicBoolean created = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public <T> void consumeRecords() throws Exception {
LOG.warn("Deserialization error - topic {} partition {} offset {}",
e.topicPartition().topic(), e.topicPartition().partition(), e.offset());
if (e.getCause() instanceof AWSSchemaRegistryException) {
LOG.warn("AWSSchemaRegistryException. Retrying after 30 seconds");
LOG.warn("AWSSchemaRegistryException: {}. Retrying after 30 seconds", e.getMessage());
Thread.sleep(30000);
} else {
LOG.warn("Seeking past the error record", e);
Expand All @@ -180,7 +180,11 @@ private void resetOffsets() {
partitionsToReset.forEach(partition -> {
try {
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
consumer.seek(partition, offsetAndMetadata);
if (Objects.isNull(offsetAndMetadata)) {
consumer.seek(partition, 0L);
} else {
consumer.seek(partition, offsetAndMetadata);
}
} catch (Exception e) {
LOG.error("Failed to seek to last committed offset upon negative acknowledgement {}", partition, e);
}
Expand Down

0 comments on commit 3527424

Please sign in to comment.