diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java index a615a24d5..211214f55 100644 --- a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/KafkaStreamsApp.java @@ -128,7 +128,7 @@ protected void doStop() { @Override public boolean healthCheck() { - return true; + return app.state().isRunningOrRebalancing(); } /** diff --git a/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreProductionExceptionHandler.java b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreProductionExceptionHandler.java new file mode 100644 index 000000000..151a5b4dc --- /dev/null +++ b/kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreProductionExceptionHandler.java @@ -0,0 +1,49 @@ +package org.hypertrace.core.kafkastreams.framework.exceptionhandlers; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class IgnoreProductionExceptionHandler implements ProductionExceptionHandler { + private static final String IGNORE_PRODUCTION_EXCEPTION_CLASSES = "ignore.production.exception.classes"; + + private static final Logger LOGGER = LoggerFactory.getLogger(IgnoreProductionExceptionHandler.class); + + private List> ignoreExceptionClasses = new ArrayList<>(); + + @Override + public ProductionExceptionHandlerResponse handle(ProducerRecord record, Exception exception) { + for (Class exceptionClass : ignoreExceptionClasses) { + if (exceptionClass.isInstance(exception)) { + LOGGER.error("Failed to produce record to topic={}, partition={}, size={} due to exception {}. will skip this record.", + record.topic(), record.partition(), record.value().length, exception.getLocalizedMessage()); + return ProductionExceptionHandlerResponse.CONTINUE; + } + } + return ProductionExceptionHandlerResponse.FAIL; + } + + @Override + public void configure(Map configs) { + if (configs.containsKey(IGNORE_PRODUCTION_EXCEPTION_CLASSES)) { + Object configValue = configs.get(IGNORE_PRODUCTION_EXCEPTION_CLASSES); + if (configValue instanceof String) { + LOGGER.info("found {}={}", IGNORE_PRODUCTION_EXCEPTION_CLASSES, configValue); + List classNameList = Arrays.asList(((String) configValue).trim().split("\\s*,\\s*")); + for (String className : classNameList) { + try { + ignoreExceptionClasses.add((Class) Class.forName(className)); + } catch (ClassNotFoundException e) { + LOGGER.error("Class with name {} not found.", className); + } + } + } + } + } +} diff --git a/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreProductionExceptionHandlerTest.java b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreProductionExceptionHandlerTest.java new file mode 100644 index 000000000..d40770bd9 --- /dev/null +++ b/kafka-streams-framework/src/test/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreProductionExceptionHandlerTest.java @@ -0,0 +1,90 @@ +package org.hypertrace.core.kafkastreams.framework.exceptionhandlers; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.streams.errors.ProductionExceptionHandler; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class IgnoreProductionExceptionHandlerTest { + + private static final String IGNORE_PRODUCTION_EXCEPTION_CLASSES = "ignore.production.exception.classes"; + private static final String TOPIC = "test-topic"; + private static final byte[] KEY = new byte[]{0, 1, 2, 3}; + private static final byte[] VALUE = new byte[]{0, 1, 2, 3}; + + @Test + public void failWithoutConfiguredException() { + IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler(); + + ProducerRecord record = new ProducerRecord(TOPIC, KEY, VALUE); + Exception exception = new RecordTooLargeException(); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception); + assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL); + } + + @Test + public void continueWithConfiguredException() { + Map configs = new HashMap<>(); + configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.RecordTooLargeException"); + + IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler(); + handler.configure(configs); + + ProducerRecord record = new ProducerRecord(TOPIC, KEY, VALUE); + Exception exception = new RecordTooLargeException(); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception); + assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE); + } + + @Test + public void failWithConfiguredException() { + Map configs = new HashMap<>(); + configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.RecordBatchTooLargeException"); + + IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler(); + handler.configure(configs); + + ProducerRecord record = new ProducerRecord(TOPIC, KEY, VALUE); + Exception exception = new RecordTooLargeException(); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception); + assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL); + } + + @Test + public void continueWithConfiguredMultipleExceptions() { + Map configs = new HashMap<>(); + configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.ProducerFencedException,org.apache.kafka.common.errors.RecordTooLargeException"); + + IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler(); + handler.configure(configs); + + ProducerRecord record = new ProducerRecord(TOPIC, KEY, VALUE); + Exception exception = new RecordTooLargeException(); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception); + assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE); + } + + @Test + public void failWithConfiguredMultipleExceptions() { + Map configs = new HashMap<>(); + configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.ProducerFencedException,org.apache.kafka.common.errors.RecordBatchTooLargeException"); + + IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler(); + handler.configure(configs); + + ProducerRecord record = new ProducerRecord(TOPIC, KEY, VALUE); + Exception exception = new RecordTooLargeException(); + + ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception); + assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL); + } +}