From 3dcecb9e8540c4501e2e4064f95f7081d71c4894 Mon Sep 17 00:00:00 2001 From: ben-millar Date: Tue, 23 Jul 2024 15:56:32 +0100 Subject: [PATCH 1/2] CORE-20794 Catching ProducerFencedException and rethrowing as a ProducerRequiresReset --- .../kafka/producer/CordaKafkaProducerImpl.kt | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImpl.kt b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImpl.kt index 0d33746e35f..c104a478169 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImpl.kt +++ b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImpl.kt @@ -65,7 +65,6 @@ class CordaKafkaProducerImpl( const val asyncChunkErrorMessage = "Tried to send record which requires chunking using an asynchronous producer" val fatalExceptions: Set> = setOf( - ProducerFencedException::class.java, UnsupportedVersionException::class.java, UnsupportedForMessageFormatException::class.java, AuthorizationException::class.java, @@ -386,20 +385,35 @@ class CordaKafkaProducerImpl( throw CordaMessageAPIFatalException("FatalError occurred $errorString", ex) } - IllegalStateException::class.java -> { - // It's not clear whether the producer is ok to abort and continue or not in this case, so play it safe - // and let the client know to create a new one. - throw CordaMessageAPIProducerRequiresReset("Error occurred $errorString", ex) - } - in transientExceptions -> { if (abortTransaction) { abortTransaction() } throw CordaMessageAPIIntermittentException("Error occurred $errorString", ex) } + in ApiExceptions -> { throw ex } + IllegalStateException::class.java -> { + // It's not clear whether the producer is ok to abort and continue or not in this case, so play it safe + // and let the client know to create a new one. + throw CordaMessageAPIProducerRequiresReset("Error occurred $errorString", ex) + } + + ProducerFencedException::class.java -> { + // There are two scenarios in which a ProducerFencedException can be thrown: + // + // 1. The producer is fenced because another producer with the same transactional.id has been started. + // 2. The producer is fenced due to a timeout on the broker side. + // + // There should be no way for another producer to be started with the same transactional.id, so we can + // assume that the producer is fenced because of a timeout and trigger a reset. + throw CordaMessageAPIProducerRequiresReset( + "ProducerFencedException thrown, likely due to a timeout on the broker side. " + + "Triggering a reset of the producer. $errorString", ex + ) + } + else -> { // Here we do not know what the exact cause of the exception is, but we do know Kafka has not told us we // must close down, nor has it told us we can abort and retry. In this instance the most sensible thing From 0a03f8d847e00f3a8fdbcef9bc5cc80b6cde111f Mon Sep 17 00:00:00 2001 From: ben-millar Date: Mon, 29 Jul 2024 13:13:16 +0100 Subject: [PATCH 2/2] CORE-20794 Updating unit tests --- .../producer/CordaKafkaProducerImplTest.kt | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImplTest.kt b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImplTest.kt index a9408c4b365..7bd83c62e8e 100644 --- a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImplTest.kt +++ b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/producer/CordaKafkaProducerImplTest.kt @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerGroupMetadata import org.apache.kafka.clients.producer.MockProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.errors.AuthorizationException import org.apache.kafka.common.errors.InterruptException import org.apache.kafka.common.errors.InvalidProducerEpochException import org.apache.kafka.common.errors.ProducerFencedException @@ -160,7 +161,7 @@ class CordaKafkaProducerImplTest { @Test fun testBeginTransactionFatal() { - doThrow(ProducerFencedException("")).whenever(producer).beginTransaction() + doThrow(AuthorizationException("")).whenever(producer).beginTransaction() assertThrows { cordaKafkaProducer.beginTransaction() } verify(producer, times(1)).beginTransaction() } @@ -180,9 +181,9 @@ class CordaKafkaProducerImplTest { } @Test - fun testBeginTransactionZombieProducerThrowsFatalException() { + fun testBeginTransactionZombieProducerThrowsProducerResetException() { doThrow(ProducerFencedException("")).whenever(producer).beginTransaction() - assertThrows { cordaKafkaProducer.beginTransaction() } + assertThrows { cordaKafkaProducer.beginTransaction() } verify(producer, times(1)).beginTransaction() } @@ -218,9 +219,9 @@ class CordaKafkaProducerImplTest { } @Test - fun testCommitTransactionZombieProducerThrowsFatalException() { + fun testCommitTransactionZombieProducerThrowsProducerResetException() { doThrow(ProducerFencedException("")).whenever(producer).commitTransaction() - assertThrows { cordaKafkaProducer.commitTransaction() } + assertThrows { cordaKafkaProducer.commitTransaction() } verify(producer, times(1)).commitTransaction() } @@ -245,9 +246,9 @@ class CordaKafkaProducerImplTest { } @Test - fun testAbortTransactionZombieProducerThrowsFatalException() { + fun testAbortTransactionZombieProducerThrowsProducerResetException() { doThrow(ProducerFencedException("")).whenever(producer).abortTransaction() - assertThrows { cordaKafkaProducer.abortTransaction() } + assertThrows { cordaKafkaProducer.abortTransaction() } verify(producer, times(1)).abortTransaction() } @@ -265,10 +266,10 @@ class CordaKafkaProducerImplTest { } @Test - fun testSendAllOffsetsToTransactionsZombieProducerThrowsFatalException() { + fun testSendAllOffsetsToTransactionsZombieProducerThrowsProducerResetException() { doThrow(ProducerFencedException("")).whenever(producer) .sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java)) - assertThrows { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) } + assertThrows { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) } verify(producer, times(1)).sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java)) } @@ -304,10 +305,10 @@ class CordaKafkaProducerImplTest { } @Test - fun testSendRecordOffsetsToTransactionsZombieProducerThrowsFatalException() { + fun testSendRecordOffsetsToTransactionsZombieProducerThrowsProducerResetException() { doThrow(ProducerFencedException("")).whenever(producer) .sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java)) - assertThrows { + assertThrows { cordaKafkaProducer.sendRecordOffsetsToTransaction( cordaConsumer, generateMockConsumerRecordList(3, "TOPIC1", 0).map { @@ -342,10 +343,10 @@ class CordaKafkaProducerImplTest { } @Test - fun testSendOffsetsZombieProducerThrowsFatalException() { + fun testSendOffsetsZombieProducerThrowsProducerResetException() { doThrow(ProducerFencedException("")).whenever(producer) .sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java)) - assertThrows { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) } + assertThrows { cordaKafkaProducer.sendAllOffsetsToTransaction(cordaConsumer) } verify(producer, times(1)).sendOffsetsToTransaction(any(), Mockito.any(ConsumerGroupMetadata::class.java)) }