Skip to content

Commit

Permalink
CORE-20794 Catching ProducerFencedException and rethrowing as a Produ…
Browse files Browse the repository at this point in the history
…cerRequiresReset
  • Loading branch information
ben-millar committed Jul 29, 2024
1 parent 4b87ed3 commit 3dcecb9
Showing 1 changed file with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class CordaKafkaProducerImpl(
const val asyncChunkErrorMessage = "Tried to send record which requires chunking using an asynchronous producer"

val fatalExceptions: Set<Class<out Throwable>> = setOf(
ProducerFencedException::class.java,
UnsupportedVersionException::class.java,
UnsupportedForMessageFormatException::class.java,
AuthorizationException::class.java,
Expand Down Expand Up @@ -386,20 +385,35 @@ class CordaKafkaProducerImpl(
throw CordaMessageAPIFatalException("FatalError occurred $errorString", ex)
}

IllegalStateException::class.java -> {
// It's not clear whether the producer is ok to abort and continue or not in this case, so play it safe
// and let the client know to create a new one.
throw CordaMessageAPIProducerRequiresReset("Error occurred $errorString", ex)
}

in transientExceptions -> {
if (abortTransaction) {
abortTransaction()
}
throw CordaMessageAPIIntermittentException("Error occurred $errorString", ex)
}

in ApiExceptions -> { throw ex }

IllegalStateException::class.java -> {
// It's not clear whether the producer is ok to abort and continue or not in this case, so play it safe
// and let the client know to create a new one.
throw CordaMessageAPIProducerRequiresReset("Error occurred $errorString", ex)
}

ProducerFencedException::class.java -> {
// There are two scenarios in which a ProducerFencedException can be thrown:
//
// 1. The producer is fenced because another producer with the same transactional.id has been started.
// 2. The producer is fenced due to a timeout on the broker side.
//
// There should be no way for another producer to be started with the same transactional.id, so we can
// assume that the producer is fenced because of a timeout and trigger a reset.
throw CordaMessageAPIProducerRequiresReset(
"ProducerFencedException thrown, likely due to a timeout on the broker side. " +
"Triggering a reset of the producer. $errorString", ex
)
}

else -> {
// Here we do not know what the exact cause of the exception is, but we do know Kafka has not told us we
// must close down, nor has it told us we can abort and retry. In this instance the most sensible thing
Expand Down

0 comments on commit 3dcecb9

Please sign in to comment.