Skip to content

Commit

Permalink
add production exception handler (#21)
Browse files Browse the repository at this point in the history
* add RecordTooLargeException exception handler

* Update kafka-streams-framework/src/main/java/org/hypertrace/core/kafkastreams/framework/exceptionhandlers/IgnoreRecordTooLargeHandler.java

Co-authored-by: surajpuvvada <[email protected]>

* externalized list of ignorable exception classes

Co-authored-by: surajpuvvada <[email protected]>
  • Loading branch information
ravisingal and surajpuvvada authored Oct 1, 2020
1 parent 99dc66b commit 499d199
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ protected void doStop() {

@Override
public boolean healthCheck() {
return true;
return app.state().isRunningOrRebalancing();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.hypertrace.core.kafkastreams.framework.exceptionhandlers;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class IgnoreProductionExceptionHandler implements ProductionExceptionHandler {
private static final String IGNORE_PRODUCTION_EXCEPTION_CLASSES = "ignore.production.exception.classes";

private static final Logger LOGGER = LoggerFactory.getLogger(IgnoreProductionExceptionHandler.class);

private List<Class<Exception>> ignoreExceptionClasses = new ArrayList<>();

@Override
public ProductionExceptionHandlerResponse handle(ProducerRecord<byte[], byte[]> record, Exception exception) {
for (Class<Exception> exceptionClass : ignoreExceptionClasses) {
if (exceptionClass.isInstance(exception)) {
LOGGER.error("Failed to produce record to topic={}, partition={}, size={} due to exception {}. will skip this record.",
record.topic(), record.partition(), record.value().length, exception.getLocalizedMessage());
return ProductionExceptionHandlerResponse.CONTINUE;
}
}
return ProductionExceptionHandlerResponse.FAIL;
}

@Override
public void configure(Map<String, ?> configs) {
if (configs.containsKey(IGNORE_PRODUCTION_EXCEPTION_CLASSES)) {
Object configValue = configs.get(IGNORE_PRODUCTION_EXCEPTION_CLASSES);
if (configValue instanceof String) {
LOGGER.info("found {}={}", IGNORE_PRODUCTION_EXCEPTION_CLASSES, configValue);
List<String> classNameList = Arrays.asList(((String) configValue).trim().split("\\s*,\\s*"));
for (String className : classNameList) {
try {
ignoreExceptionClasses.add((Class<Exception>) Class.forName(className));
} catch (ClassNotFoundException e) {
LOGGER.error("Class with name {} not found.", className);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.hypertrace.core.kafkastreams.framework.exceptionhandlers;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class IgnoreProductionExceptionHandlerTest {

private static final String IGNORE_PRODUCTION_EXCEPTION_CLASSES = "ignore.production.exception.classes";
private static final String TOPIC = "test-topic";
private static final byte[] KEY = new byte[]{0, 1, 2, 3};
private static final byte[] VALUE = new byte[]{0, 1, 2, 3};

@Test
public void failWithoutConfiguredException() {
IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler();

ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC, KEY, VALUE);
Exception exception = new RecordTooLargeException();

ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception);
assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL);
}

@Test
public void continueWithConfiguredException() {
Map<String, String> configs = new HashMap<>();
configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.RecordTooLargeException");

IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler();
handler.configure(configs);

ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC, KEY, VALUE);
Exception exception = new RecordTooLargeException();

ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception);
assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE);
}

@Test
public void failWithConfiguredException() {
Map<String, String> configs = new HashMap<>();
configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.RecordBatchTooLargeException");

IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler();
handler.configure(configs);

ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC, KEY, VALUE);
Exception exception = new RecordTooLargeException();

ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception);
assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL);
}

@Test
public void continueWithConfiguredMultipleExceptions() {
Map<String, String> configs = new HashMap<>();
configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.ProducerFencedException,org.apache.kafka.common.errors.RecordTooLargeException");

IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler();
handler.configure(configs);

ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC, KEY, VALUE);
Exception exception = new RecordTooLargeException();

ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception);
assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.CONTINUE);
}

@Test
public void failWithConfiguredMultipleExceptions() {
Map<String, String> configs = new HashMap<>();
configs.put(IGNORE_PRODUCTION_EXCEPTION_CLASSES, "org.apache.kafka.common.errors.ProducerFencedException,org.apache.kafka.common.errors.RecordBatchTooLargeException");

IgnoreProductionExceptionHandler handler = new IgnoreProductionExceptionHandler();
handler.configure(configs);

ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC, KEY, VALUE);
Exception exception = new RecordTooLargeException();

ProductionExceptionHandler.ProductionExceptionHandlerResponse response = handler.handle(record, exception);
assertEquals(response, ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL);
}
}

0 comments on commit 499d199

Please sign in to comment.