diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 7930d78ba3..e093139b71 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -4,7 +4,6 @@ */ package org.opensearch.dataprepper.plugins.kafka.consumer; -import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonParser; @@ -37,6 +36,7 @@ import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.AccessDeniedException; import java.time.Duration; import java.time.Instant; @@ -53,6 +53,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCause; + /** * * A utility class which will handle the core Kafka consumer operation. */ @@ -180,11 +182,10 @@ public void consumeRecords() throws Exception { topicMetrics.getNumberOfPollAuthErrors().increment(); Thread.sleep(10000); } catch (RecordDeserializationException e) { - LOG.warn("Deserialization error - topic {} partition {} offset {}. Error message: {}", e.topicPartition().topic(), e.topicPartition().partition(), e.offset(), e.getMessage()); - if (e.getCause() instanceof AWSSchemaRegistryException) { - LOG.warn("Retrying after 30 seconds"); + if (getRootCause(e) instanceof AccessDeniedException) { + LOG.warn("AccessDenied for AWSGlue schema registry, retrying after 30 seconds"); Thread.sleep(30000); } else { LOG.warn("Seeking past the error record"); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index 9d2a2a9e37..43312becfa 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.kafka.consumer; +import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -207,7 +208,7 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted } // Wait for acknowledgement callback function to run try { - Thread.sleep(10000); + Thread.sleep(100); } catch (Exception e){} consumer.processAcknowledgedOffsets(); @@ -254,7 +255,7 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int } // Wait for acknowledgement callback function to run try { - Thread.sleep(10000); + Thread.sleep(100); } catch (Exception e){} consumer.processAcknowledgedOffsets(); @@ -352,7 +353,74 @@ public void testJsonDeserializationErrorWithAcknowledgements() throws Exception } // Wait for acknowledgement callback function to run try { - Thread.sleep(10000); + Thread.sleep(100); + } catch (Exception e){} + + consumer.processAcknowledgedOffsets(); + offsetsToCommit = consumer.getOffsetsToCommit(); + Assertions.assertEquals(offsetsToCommit.size(), 1); + offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> { + Assertions.assertEquals(topicPartition.partition(), testJsonPartition); + Assertions.assertEquals(topicPartition.topic(), topic); + Assertions.assertEquals(103L, offsetAndMetadata.offset()); + }); + } + + @Test + public void testAwsGlueErrorWithAcknowledgements() throws Exception { + String topic = topicConfig.getName(); + final ObjectMapper mapper = new ObjectMapper(); + when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON); + when(topicConfig.getKafkaKeyMode()).thenReturn(KafkaKeyMode.INCLUDE_AS_FIELD); + + consumer = createObjectUnderTest("json", true); + consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testJsonPartition))); + + // Send one json record + Map> records = new HashMap<>(); + ConsumerRecord record1 = new ConsumerRecord<>(topic, testJsonPartition, 100L, testKey1, mapper.convertValue(testMap1, JsonNode.class)); + records.put(new TopicPartition(topic, testJsonPartition), Arrays.asList(record1)); + consumerRecords = new ConsumerRecords(records); + when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords); + consumer.consumeRecords(); + + // Send non-json record that results in deser exception + RecordDeserializationException exc = new RecordDeserializationException(new TopicPartition(topic, testJsonPartition), + 101L, "Deserializedation exception", new AWSSchemaRegistryException("AWS glue parse exception")); + when(kafkaConsumer.poll(any(Duration.class))).thenThrow(exc); + consumer.consumeRecords(); + + // Send one more json record + ConsumerRecord record2 = new ConsumerRecord<>(topic, testJsonPartition, 102L, testKey2, + mapper.convertValue(testMap2, JsonNode.class)); + records.clear(); + records.put(new TopicPartition(topic, testJsonPartition), Arrays.asList(record2)); + consumerRecords = new ConsumerRecords(records); + when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords); + consumer.consumeRecords(); + + Map.Entry>, CheckpointState> bufferRecords = buffer.read(1000); + ArrayList> bufferedRecords = new ArrayList<>(bufferRecords.getKey()); + Assertions.assertEquals(2, bufferedRecords.size()); + Map offsetsToCommit = consumer.getOffsetsToCommit(); + Assertions.assertEquals(offsetsToCommit.size(), 0); + + for (Record record: bufferedRecords) { + Event event = record.getData(); + Map eventMap = event.toMap(); + String kafkaKey = event.get("kafka_key", String.class); + assertTrue(kafkaKey.equals(testKey1) || kafkaKey.equals(testKey2)); + if (kafkaKey.equals(testKey1)) { + testMap1.forEach((k, v) -> assertThat(eventMap, hasEntry(k,v))); + } + if (kafkaKey.equals(testKey2)) { + testMap2.forEach((k, v) -> assertThat(eventMap, hasEntry(k,v))); + } + event.getEventHandle().release(true); + } + // Wait for acknowledgement callback function to run + try { + Thread.sleep(100); } catch (Exception e){} consumer.processAcknowledgedOffsets();