Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CORE-20794 Catching ProducerFencedException and rethrowing as a ProducerRequiresReset #6303

Merged
merged 2 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class CordaKafkaProducerImpl(
const val asyncChunkErrorMessage = "Tried to send record which requires chunking using an asynchronous producer"

val fatalExceptions: Set<Class<out Throwable>> = setOf(
ProducerFencedException::class.java,
UnsupportedVersionException::class.java,
UnsupportedForMessageFormatException::class.java,
AuthorizationException::class.java,
Expand Down Expand Up @@ -386,20 +385,35 @@ class CordaKafkaProducerImpl(
throw CordaMessageAPIFatalException("FatalError occurred $errorString", ex)
}

IllegalStateException::class.java -> {
// It's not clear whether the producer is ok to abort and continue or not in this case, so play it safe
// and let the client know to create a new one.
throw CordaMessageAPIProducerRequiresReset("Error occurred $errorString", ex)
}

in transientExceptions -> {
if (abortTransaction) {
abortTransaction()
}
throw CordaMessageAPIIntermittentException("Error occurred $errorString", ex)
}

in ApiExceptions -> { throw ex }

IllegalStateException::class.java -> {
// It's not clear whether the producer is ok to abort and continue or not in this case, so play it safe
// and let the client know to create a new one.
throw CordaMessageAPIProducerRequiresReset("Error occurred $errorString", ex)
}

ProducerFencedException::class.java -> {
// There are two scenarios in which a ProducerFencedException can be thrown:
//
// 1. The producer is fenced because another producer with the same transactional.id has been started.
// 2. The producer is fenced due to a timeout on the broker side.
//
// There should be no way for another producer to be started with the same transactional.id, so we can
// assume that the producer is fenced because of a timeout and trigger a reset.
throw CordaMessageAPIProducerRequiresReset(
"ProducerFencedException thrown, likely due to a timeout on the broker side. " +
"Triggering a reset of the producer. $errorString", ex
)
}

else -> {
// Here we do not know what the exact cause of the exception is, but we do know Kafka has not told us we
// must close down, nor has it told us we can abort and retry. In this instance the most sensible thing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata
import org.apache.kafka.clients.producer.MockProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.AuthorizationException
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.InvalidProducerEpochException
import org.apache.kafka.common.errors.ProducerFencedException
Expand Down Expand Up @@ -160,7 +161,7 @@ class CordaKafkaProducerImplTest {

@Test
fun testBeginTransactionFatal() {
doThrow(ProducerFencedException("")).whenever(producer).beginTransaction()
doThrow(AuthorizationException("")).whenever(producer).beginTransaction()
assertThrows<CordaMessageAPIFatalException> { cordaKafkaProducer.beginTransaction() }
verify(producer, times(1)).beginTransaction()
}
Expand All @@ -180,9 +181,9 @@ class CordaKafkaProducerImplTest {
}

@Test
fun testBeginTransactionZombieProducerThrowsFatalException() {
fun testBeginTransactionZombieProducerThrowsProducerResetException() {
doThrow(ProducerFencedException("")).whenever(producer).beginTransaction()
assertThrows<CordaMessageAPIFatalException> { cordaKafkaProducer.beginTransaction() }
assertThrows<CordaMessageAPIProducerRequiresReset> { cordaKafkaProducer.beginTransaction() }
verify(producer, times(1)).beginTransaction()
}

Expand Down Expand Up @@ -218,9 +219,9 @@ class CordaKafkaProducerImplTest {
}

@Test
fun testCommitTransactionZombieProducerThrowsFatalException() {
fun testCommitTransactionZombieProducerThrowsProducerResetException() {
doThrow(ProducerFencedException("")).whenever(producer).commitTransaction()
assertThrows<CordaMessageAPIFatalException> { cordaKafkaProducer.commitTransaction() }
assertThrows<CordaMessageAPIProducerRequiresReset> { cordaKafkaProducer.commitTransaction() }
verify(producer, times(1)).commitTransaction()
}

Expand All @@ -245,9 +246,9 @@ class CordaKafkaProducerImplTest {
}

@Test
fun testAbortTransactionZombieProducerThrowsFatalException() {
fun testAbortTransactionZombieProducerThrowsProducerResetException() {
doThrow(ProducerFencedException("")).whenever(producer).abortTransaction()
assertThrows<CordaMessageAPIFatalException> { cordaKafkaProducer.abortTransaction() }
assertThrows<CordaMessageAPIProducerRequiresReset> { cordaKafkaProducer.abortTransaction() }
verify(producer, times(1)).abortTransaction()
}

Expand All @@ -265,10 +266,10 @@ class CordaKafkaProducerImplTest {
}

@Test
fun testSendAllOffsetsToTransactionsZombieProducerThrowsFatalException() {
fun testSendAllOffsetsToTransactionsZombieProducerThrowsProducerResetException() {
doThrow(ProducerFencedException("")).whenever(producer)
.sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java))
assertThrows<CordaMessageAPIFatalException> { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) }
assertThrows<CordaMessageAPIProducerRequiresReset> { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) }
verify(producer, times(1)).sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java))
}

Expand Down Expand Up @@ -304,10 +305,10 @@ class CordaKafkaProducerImplTest {
}

@Test
fun testSendRecordOffsetsToTransactionsZombieProducerThrowsFatalException() {
fun testSendRecordOffsetsToTransactionsZombieProducerThrowsProducerResetException() {
doThrow(ProducerFencedException("")).whenever(producer)
.sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java))
assertThrows<CordaMessageAPIFatalException> {
assertThrows<CordaMessageAPIProducerRequiresReset> {
cordaKafkaProducer.sendRecordOffsetsToTransaction(
cordaConsumer,
generateMockConsumerRecordList(3, "TOPIC1", 0).map {
Expand Down Expand Up @@ -342,10 +343,10 @@ class CordaKafkaProducerImplTest {
}

@Test
fun testSendOffsetsZombieProducerThrowsFatalException() {
fun testSendOffsetsZombieProducerThrowsProducerResetException() {
doThrow(ProducerFencedException("")).whenever(producer)
.sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java))
assertThrows<CordaMessageAPIFatalException> { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) }
assertThrows<CordaMessageAPIProducerRequiresReset> { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) }
verify(producer, times(1)).sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java))
}

Expand Down