Skip to content

Commit

Permalink
CORE-20795: Add exception handling and retry to KafkaAdmin and Virtua…
Browse files Browse the repository at this point in the history
…lNodeInfoProcessor (#6296)

This issue appeared while running kafka connection tests which kill the kafka broker.

In a flow worker compacted subscription we got a org.apache.kafka.common.errors.TimeoutException and handled it as fatal.

This is because we have no error handling in our KafkaAdmin class for the exceptions that can be thrown by KafkaFuture.get() . The KafkaAdmin client is called from the VirtualNodeInfoProcessor which also does not catch this.

This PR adds retry logic to the KafkaAdmin for getTopics and in the case of exhausting these retries, error handling in VirtualNodeInfoProcessor to handle this better and prevent this exception bubbling further.
  • Loading branch information
emilybowe authored Jul 26, 2024
1 parent b6110f9 commit 4b87ed3
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,13 @@ class VirtualNodeInfoProcessor(private val onStatusUpCallback: () -> Unit, priva
}

val currentSnapshot = virtualNodeInfoMap.getAllAsCordaObjects()
listeners.forEach { it.value.onUpdate(setOf(newRecord.key.toCorda()), currentSnapshot) }
try {
listeners.forEach { it.value.onUpdate(setOf(newRecord.key.toCorda()), currentSnapshot) }
} catch (exception: Exception) {
log.error("Virtual Node Info service could not update", exception)
onErrorCallback()
return
}
}

fun getAll(): List<VirtualNodeInfo> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import net.corda.crypto.core.SecureHashImpl
import net.corda.libs.packaging.core.CpiIdentifier
import net.corda.messaging.api.records.Record
import net.corda.test.util.identity.createTestHoldingIdentity
import net.corda.v5.base.exceptions.CordaRuntimeException
import net.corda.virtualnode.HoldingIdentity
import net.corda.virtualnode.VirtualNodeInfo
import net.corda.virtualnode.read.VirtualNodeInfoListener
import net.corda.virtualnode.toAvro
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals
Expand All @@ -16,6 +18,10 @@ import org.junit.jupiter.api.Assertions.assertNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertDoesNotThrow
import org.mockito.Mockito
import org.mockito.Mockito.mock
import org.mockito.kotlin.any
import java.time.Instant
import java.util.UUID

Expand Down Expand Up @@ -351,4 +357,24 @@ class VirtualNodeInfoProcessorTest {
processor.onSnapshot(mapOf(holdingIdentityOther.toAvro() to virtualNodeInfo.toAvro()))
assertThat(onError).isTrue
}

@Test
fun `exception is caught and onError called if update fails`() {
val exceptionalListener = mock<VirtualNodeInfoListener>()

var onError = false
val processor = VirtualNodeInfoProcessor({ /* don't care */ }, { onError = true })

processor.registerCallback(exceptionalListener)

Mockito.`when`(exceptionalListener.onUpdate(any(), any())).thenThrow(CordaRuntimeException(""))

val holdingIdentity = createTestHoldingIdentity("CN=Bob, O=Bob Corp, L=LDN, C=GB", "groupId")
val virtualNodeInfo =
newVirtualNodeInfo(holdingIdentity, CpiIdentifier("name", "version", secureHash))

assertThat(onError).isFalse
assertDoesNotThrow { processor.onNext(Record("", holdingIdentity.toAvro(), virtualNodeInfo.toAvro()), null, emptyMap()) }
assertThat(onError).isTrue
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,49 @@
package net.corda.messagebus.kafka.admin

import net.corda.messagebus.api.admin.Admin
import net.corda.utilities.retry.Linear
import net.corda.utilities.retry.tryWithBackoff
import net.corda.v5.base.exceptions.CordaRuntimeException
import org.apache.kafka.clients.admin.AdminClient
import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutionException

class KafkaAdmin(private val adminClient: AdminClient) : Admin {

companion object {
private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass)
}

override fun getTopics(): Set<String> {
return adminClient.listTopics().names().get()
return tryWithBackoff(
logger = logger,
maxRetries = 3,
maxTimeMillis = 3000,
backoffStrategy = Linear(200),
shouldRetry = { _, _, throwable -> throwable.isRecoverable() },
onRetryAttempt = { attempt, delayMillis, throwable ->
logger.warn("Attempt $attempt failed with \"${throwable.message}\", will try again after $delayMillis milliseconds")
},
onRetryExhaustion = { attempts, elapsedMillis, throwable ->
val errorMessage =
"Execution failed with \"${throwable.message}\" after retrying $attempts times for $elapsedMillis milliseconds."
logger.warn(errorMessage)
CordaRuntimeException(errorMessage, throwable)
},
{
adminClient.listTopics().names().get()
}
)
}

private fun Throwable.isRecoverable(): Boolean {
return when (this) {
is ExecutionException -> true
else -> false
}
}


override fun close() {
adminClient.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package net.corda.messagebus.kafka.admin

import net.corda.v5.base.exceptions.CordaRuntimeException
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.ListTopicsResult
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.errors.TimeoutException
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import java.util.concurrent.ExecutionException

class KafkaAdminTest {

private var adminClient = mock<AdminClient>()

@Test
fun `When list topics then return all none internal topics from kafka`() {
fun `listTopics returns all internal topics from kafka`() {
var adminClient = mock<AdminClient>()

val kafkaFuture = mock<KafkaFuture<Set<String>>>().apply {
whenever(get()).thenReturn(setOf("topic1"))
}
Expand All @@ -23,8 +29,49 @@ class KafkaAdminTest {

whenever(adminClient.listTopics()).thenReturn(result)

val target = KafkaAdmin(adminClient)
val admin = KafkaAdmin(adminClient)

assertThat(admin.getTopics()).containsOnly("topic1")
}

@Test
fun `getTopics will retry an exception and be successful when retries not exceeded`() {
val adminClient = mock<AdminClient>()
val kafkaFuture = mock<KafkaFuture<Set<String>>>()

val topicResult = mock<ListTopicsResult>()
whenever(topicResult.names()).thenReturn(kafkaFuture)
whenever(adminClient.listTopics()).thenReturn(topicResult)

//retries hardcoded in getTopics to max 3 attempts

whenever(kafkaFuture.get())
.thenThrow(ExecutionException(TimeoutException("timed out")))
.thenThrow(ExecutionException(TimeoutException("timed out")))
.thenReturn(setOf("topic1"))

val admin = KafkaAdmin(adminClient)

assertThat(admin.getTopics()).containsOnly("topic1")
}

@Test
fun `getTopics will retry an exception and rethrow when retries exceeded`() {
val adminClient = mock<AdminClient>()
val kafkaFuture = mock<KafkaFuture<Set<String>>>()

val topicResult = mock<ListTopicsResult>()
whenever(topicResult.names()).thenReturn(kafkaFuture)
whenever(adminClient.listTopics()).thenReturn(topicResult)

//retries hardcoded in getTopics to max 3 attempts

whenever(kafkaFuture.get())
.thenThrow(ExecutionException(TimeoutException("timed out")))

val admin = KafkaAdmin(adminClient)

assertThat(target.getTopics()).containsOnly("topic1")
assertThrows<CordaRuntimeException> { admin.getTopics() }
verify(kafkaFuture, times(3)).get()
}
}

0 comments on commit 4b87ed3

Please sign in to comment.