Skip to content

Commit

Permalink
Fix for kafka source issue #3264 (aws glue excetion handling)
Browse files Browse the repository at this point in the history
Signed-off-by: Hardeep Singh <[email protected]>
  • Loading branch information
hshardeesi committed Aug 26, 2023
1 parent 8114ab4 commit c28a7c5
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package org.opensearch.dataprepper.plugins.kafka.consumer;

import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
Expand Down Expand Up @@ -37,6 +36,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.glue.model.AccessDeniedException;

import java.time.Duration;
import java.time.Instant;
Expand All @@ -53,6 +53,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.commons.lang3.exception.ExceptionUtils.getRootCause;

/**
* * A utility class which will handle the core Kafka consumer operation.
*/
Expand Down Expand Up @@ -180,11 +182,10 @@ public <T> void consumeRecords() throws Exception {
topicMetrics.getNumberOfPollAuthErrors().increment();
Thread.sleep(10000);
} catch (RecordDeserializationException e) {

LOG.warn("Deserialization error - topic {} partition {} offset {}. Error message: {}",
e.topicPartition().topic(), e.topicPartition().partition(), e.offset(), e.getMessage());
if (e.getCause() instanceof AWSSchemaRegistryException) {
LOG.warn("Retrying after 30 seconds");
if (getRootCause(e) instanceof AccessDeniedException) {
LOG.warn("AccessDenied for AWSGlue schema registry, retrying after 30 seconds");
Thread.sleep(30000);
} else {
LOG.warn("Seeking past the error record");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.kafka.consumer;

import com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -207,7 +208,7 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(10000);
Thread.sleep(100);
} catch (Exception e){}

consumer.processAcknowledgedOffsets();
Expand Down Expand Up @@ -254,7 +255,7 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(10000);
Thread.sleep(100);
} catch (Exception e){}

consumer.processAcknowledgedOffsets();
Expand Down Expand Up @@ -352,7 +353,74 @@ public void testJsonDeserializationErrorWithAcknowledgements() throws Exception
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(10000);
Thread.sleep(100);
} catch (Exception e){}

consumer.processAcknowledgedOffsets();
offsetsToCommit = consumer.getOffsetsToCommit();
Assertions.assertEquals(offsetsToCommit.size(), 1);
offsetsToCommit.forEach((topicPartition, offsetAndMetadata) -> {
Assertions.assertEquals(topicPartition.partition(), testJsonPartition);
Assertions.assertEquals(topicPartition.topic(), topic);
Assertions.assertEquals(103L, offsetAndMetadata.offset());
});
}

@Test
public void testAwsGlueErrorWithAcknowledgements() throws Exception {
String topic = topicConfig.getName();
final ObjectMapper mapper = new ObjectMapper();
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.JSON);
when(topicConfig.getKafkaKeyMode()).thenReturn(KafkaKeyMode.INCLUDE_AS_FIELD);

consumer = createObjectUnderTest("json", true);
consumer.onPartitionsAssigned(List.of(new TopicPartition(topic, testJsonPartition)));

// Send one json record
Map<TopicPartition, List<ConsumerRecord>> records = new HashMap<>();
ConsumerRecord<String, JsonNode> record1 = new ConsumerRecord<>(topic, testJsonPartition, 100L, testKey1, mapper.convertValue(testMap1, JsonNode.class));
records.put(new TopicPartition(topic, testJsonPartition), Arrays.asList(record1));
consumerRecords = new ConsumerRecords(records);
when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
consumer.consumeRecords();

// Send non-json record that results in deser exception
RecordDeserializationException exc = new RecordDeserializationException(new TopicPartition(topic, testJsonPartition),
101L, "Deserializedation exception", new AWSSchemaRegistryException("AWS glue parse exception"));
when(kafkaConsumer.poll(any(Duration.class))).thenThrow(exc);
consumer.consumeRecords();

// Send one more json record
ConsumerRecord<String, JsonNode> record2 = new ConsumerRecord<>(topic, testJsonPartition, 102L, testKey2,
mapper.convertValue(testMap2, JsonNode.class));
records.clear();
records.put(new TopicPartition(topic, testJsonPartition), Arrays.asList(record2));
consumerRecords = new ConsumerRecords(records);
when(kafkaConsumer.poll(any(Duration.class))).thenReturn(consumerRecords);
consumer.consumeRecords();

Map.Entry<Collection<Record<Event>>, CheckpointState> bufferRecords = buffer.read(1000);
ArrayList<Record<Event>> bufferedRecords = new ArrayList<>(bufferRecords.getKey());
Assertions.assertEquals(2, bufferedRecords.size());
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = consumer.getOffsetsToCommit();
Assertions.assertEquals(offsetsToCommit.size(), 0);

for (Record<Event> record: bufferedRecords) {
Event event = record.getData();
Map<String, Object> eventMap = event.toMap();
String kafkaKey = event.get("kafka_key", String.class);
assertTrue(kafkaKey.equals(testKey1) || kafkaKey.equals(testKey2));
if (kafkaKey.equals(testKey1)) {
testMap1.forEach((k, v) -> assertThat(eventMap, hasEntry(k,v)));
}
if (kafkaKey.equals(testKey2)) {
testMap2.forEach((k, v) -> assertThat(eventMap, hasEntry(k,v)));
}
event.getEventHandle().release(true);
}
// Wait for acknowledgement callback function to run
try {
Thread.sleep(100);
} catch (Exception e){}

consumer.processAcknowledgedOffsets();
Expand Down

0 comments on commit c28a7c5

Please sign in to comment.