diff --git a/components/virtual-node/virtual-node-info-read-service/src/main/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessor.kt b/components/virtual-node/virtual-node-info-read-service/src/main/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessor.kt index 95bad8a87bd..af3d173c8a7 100644 --- a/components/virtual-node/virtual-node-info-read-service/src/main/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessor.kt +++ b/components/virtual-node/virtual-node-info-read-service/src/main/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessor.kt @@ -120,7 +120,13 @@ class VirtualNodeInfoProcessor(private val onStatusUpCallback: () -> Unit, priva } val currentSnapshot = virtualNodeInfoMap.getAllAsCordaObjects() - listeners.forEach { it.value.onUpdate(setOf(newRecord.key.toCorda()), currentSnapshot) } + try { + listeners.forEach { it.value.onUpdate(setOf(newRecord.key.toCorda()), currentSnapshot) } + } catch (exception: Exception) { + log.error("Virtual Node Info service could not update", exception) + onErrorCallback() + return + } } fun getAll(): List = diff --git a/components/virtual-node/virtual-node-info-read-service/src/test/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessorTest.kt b/components/virtual-node/virtual-node-info-read-service/src/test/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessorTest.kt index c0eeda368e6..495c8a67afa 100644 --- a/components/virtual-node/virtual-node-info-read-service/src/test/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessorTest.kt +++ b/components/virtual-node/virtual-node-info-read-service/src/test/kotlin/net/corda/virtualnode/read/impl/VirtualNodeInfoProcessorTest.kt @@ -4,8 +4,10 @@ import net.corda.crypto.core.SecureHashImpl import net.corda.libs.packaging.core.CpiIdentifier import net.corda.messaging.api.records.Record import net.corda.test.util.identity.createTestHoldingIdentity +import net.corda.v5.base.exceptions.CordaRuntimeException import net.corda.virtualnode.HoldingIdentity import net.corda.virtualnode.VirtualNodeInfo +import net.corda.virtualnode.read.VirtualNodeInfoListener import net.corda.virtualnode.toAvro import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions.assertEquals @@ -16,6 +18,10 @@ import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertDoesNotThrow +import org.mockito.Mockito +import org.mockito.Mockito.mock +import org.mockito.kotlin.any import java.time.Instant import java.util.UUID @@ -351,4 +357,24 @@ class VirtualNodeInfoProcessorTest { processor.onSnapshot(mapOf(holdingIdentityOther.toAvro() to virtualNodeInfo.toAvro())) assertThat(onError).isTrue } + + @Test + fun `exception is caught and onError called if update fails`() { + val exceptionalListener = mock() + + var onError = false + val processor = VirtualNodeInfoProcessor({ /* don't care */ }, { onError = true }) + + processor.registerCallback(exceptionalListener) + + Mockito.`when`(exceptionalListener.onUpdate(any(), any())).thenThrow(CordaRuntimeException("")) + + val holdingIdentity = createTestHoldingIdentity("CN=Bob, O=Bob Corp, L=LDN, C=GB", "groupId") + val virtualNodeInfo = + newVirtualNodeInfo(holdingIdentity, CpiIdentifier("name", "version", secureHash)) + + assertThat(onError).isFalse + assertDoesNotThrow { processor.onNext(Record("", holdingIdentity.toAvro(), virtualNodeInfo.toAvro()), null, emptyMap()) } + assertThat(onError).isTrue + } } diff --git a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/admin/KafkaAdmin.kt b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/admin/KafkaAdmin.kt index a067dbf36f3..d1b8d47a609 100644 --- a/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/admin/KafkaAdmin.kt +++ b/libs/messaging/kafka-message-bus-impl/src/main/kotlin/net/corda/messagebus/kafka/admin/KafkaAdmin.kt @@ -1,13 +1,49 @@ package net.corda.messagebus.kafka.admin import net.corda.messagebus.api.admin.Admin +import net.corda.utilities.retry.Linear +import net.corda.utilities.retry.tryWithBackoff +import net.corda.v5.base.exceptions.CordaRuntimeException import org.apache.kafka.clients.admin.AdminClient +import org.slf4j.LoggerFactory +import java.util.concurrent.ExecutionException class KafkaAdmin(private val adminClient: AdminClient) : Admin { + + companion object { + private val logger = LoggerFactory.getLogger(this::class.java.enclosingClass) + } + override fun getTopics(): Set { - return adminClient.listTopics().names().get() + return tryWithBackoff( + logger = logger, + maxRetries = 3, + maxTimeMillis = 3000, + backoffStrategy = Linear(200), + shouldRetry = { _, _, throwable -> throwable.isRecoverable() }, + onRetryAttempt = { attempt, delayMillis, throwable -> + logger.warn("Attempt $attempt failed with \"${throwable.message}\", will try again after $delayMillis milliseconds") + }, + onRetryExhaustion = { attempts, elapsedMillis, throwable -> + val errorMessage = + "Execution failed with \"${throwable.message}\" after retrying $attempts times for $elapsedMillis milliseconds." + logger.warn(errorMessage) + CordaRuntimeException(errorMessage, throwable) + }, + { + adminClient.listTopics().names().get() + } + ) } + private fun Throwable.isRecoverable(): Boolean { + return when (this) { + is ExecutionException -> true + else -> false + } + } + + override fun close() { adminClient.close() } diff --git a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/admin/KafkaAdminTest.kt b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/admin/KafkaAdminTest.kt index 2a59a414be1..82d23844c49 100644 --- a/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/admin/KafkaAdminTest.kt +++ b/libs/messaging/kafka-message-bus-impl/src/test/kotlin/net/corda/messagebus/kafka/admin/KafkaAdminTest.kt @@ -1,19 +1,25 @@ package net.corda.messagebus.kafka.admin +import net.corda.v5.base.exceptions.CordaRuntimeException import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.ListTopicsResult import org.apache.kafka.common.KafkaFuture +import org.apache.kafka.common.errors.TimeoutException import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import org.mockito.kotlin.mock +import org.mockito.kotlin.times +import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import java.util.concurrent.ExecutionException class KafkaAdminTest { - private var adminClient = mock() - @Test - fun `When list topics then return all none internal topics from kafka`() { + fun `listTopics returns all internal topics from kafka`() { + var adminClient = mock() + val kafkaFuture = mock>>().apply { whenever(get()).thenReturn(setOf("topic1")) } @@ -23,8 +29,49 @@ class KafkaAdminTest { whenever(adminClient.listTopics()).thenReturn(result) - val target = KafkaAdmin(adminClient) + val admin = KafkaAdmin(adminClient) + + assertThat(admin.getTopics()).containsOnly("topic1") + } + + @Test + fun `getTopics will retry an exception and be successful when retries not exceeded`() { + val adminClient = mock() + val kafkaFuture = mock>>() + + val topicResult = mock() + whenever(topicResult.names()).thenReturn(kafkaFuture) + whenever(adminClient.listTopics()).thenReturn(topicResult) + + //retries hardcoded in getTopics to max 3 attempts + + whenever(kafkaFuture.get()) + .thenThrow(ExecutionException(TimeoutException("timed out"))) + .thenThrow(ExecutionException(TimeoutException("timed out"))) + .thenReturn(setOf("topic1")) + + val admin = KafkaAdmin(adminClient) + + assertThat(admin.getTopics()).containsOnly("topic1") + } + + @Test + fun `getTopics will retry an exception and rethrow when retries exceeded`() { + val adminClient = mock() + val kafkaFuture = mock>>() + + val topicResult = mock() + whenever(topicResult.names()).thenReturn(kafkaFuture) + whenever(adminClient.listTopics()).thenReturn(topicResult) + + //retries hardcoded in getTopics to max 3 attempts + + whenever(kafkaFuture.get()) + .thenThrow(ExecutionException(TimeoutException("timed out"))) + + val admin = KafkaAdmin(adminClient) - assertThat(target.getTopics()).containsOnly("topic1") + assertThrows { admin.getTopics() } + verify(kafkaFuture, times(3)).get() } }