Skip to content

Commit

Permalink
CORE-20886 stop the registry from being updated when a coordinator is…
Browse files Browse the repository at this point in the history
… in status ERROR
  • Loading branch information
LWogan committed Oct 17, 2024
1 parent 47994ec commit 9d1149d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package net.corda.lifecycle.impl

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledFuture
import net.corda.lifecycle.DependentComponents
import net.corda.lifecycle.ErrorEvent
import net.corda.lifecycle.LifecycleCoordinator
Expand All @@ -18,6 +16,8 @@ import net.corda.lifecycle.registry.LifecycleRegistryException
import net.corda.utilities.debug
import net.corda.utilities.trace
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ScheduledFuture

/**
* Perform processing of lifecycle events.
Expand Down Expand Up @@ -228,12 +228,15 @@ internal class LifecycleProcessor(
* coordinators of the status change and informing the registry.
*/
private fun updateStatus(coordinator: LifecycleCoordinatorInternal, newStatus: LifecycleStatus, reason: String) {
if (state.status != newStatus) {
logger.info("Updating coordinator ${coordinator.name} from status ${state.status} to $newStatus. Reason: $reason")
if (state.status == LifecycleStatus.ERROR) {
logger.warn("Attempted to update ${coordinator.name} from ERROR to ${newStatus.name}. Transition blocked as ERROR " +
"is a terminal state.")
} else if (state.status != newStatus) {
logger.info("Attempting to update coordinator ${coordinator.name} from status ${state.status} to $newStatus. Reason: $reason")
state.status = newStatus
state.registrations.forEach { it.updateCoordinatorStatus(coordinator, newStatus) }
registry.updateStatus(coordinator.name, newStatus, reason)
}
state.status = newStatus
state.registrations.forEach { it.updateCoordinatorStatus(coordinator, newStatus) }
registry.updateStatus(coordinator.name, newStatus, reason)
}

private fun runUserEventHandler(event: LifecycleEvent, coordinator: LifecycleCoordinator): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ class LifecycleRegistryImpl : LifecycleRegistry, LifecycleRegistryCoordinatorAcc
// Without `computeIfPresent`, `updateStatus` may (in theory) re-introduce just removed coordinator to `statuses` map only,
// but not to `coordinators` map.
val coordinatorStatus = CoordinatorStatus(name, status, reason)
statuses.computeIfPresent(name) { _, _ -> coordinatorStatus }
statuses.computeIfPresent(name) { key, currentValue ->
if (currentValue.status == LifecycleStatus.ERROR) {
logger.warn("Attempted to update ${key.componentName} from ERROR to ${status.name}. Transition blocked as ERROR " +
"is a terminal state.")
currentValue
} else {
coordinatorStatus
}
}
logger.trace { "Coordinator status update: $name is now $status ($reason)" }
}
}
Expand Down Expand Up @@ -80,8 +88,17 @@ class LifecycleRegistryImpl : LifecycleRegistry, LifecycleRegistryCoordinatorAcc
*/
override fun removeCoordinator(name: LifecycleCoordinatorName) {
logger.debug { "Removing coordinator $name from registry" }
coordinators.remove(name)
statuses.remove(name)

statuses.compute(name) { _, currentStatus ->
if (currentStatus?.status == LifecycleStatus.ERROR) {
logger.warn("Attempt was made to remove coordinator $name with status ERROR. Blocked attempt to allow for worker " +
"health endpoint to report ERROR correctly.")
return@compute currentStatus // Keep the status if it's ERROR
} else {
coordinators.remove(name)
return@compute null // Return null to remove the entry from the map
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package net.corda.lifecycle.impl

import java.util.concurrent.Delayed
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import net.corda.lifecycle.DependentComponents
import net.corda.lifecycle.ErrorEvent
import net.corda.lifecycle.LifecycleCoordinatorName
Expand Down Expand Up @@ -31,6 +28,9 @@ import org.mockito.Mockito.verify
import org.mockito.kotlin.any
import org.mockito.kotlin.verifyNoInteractions
import org.mockito.kotlin.whenever
import java.util.concurrent.Delayed
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

class LifecycleProcessorTest {

Expand Down Expand Up @@ -402,6 +402,33 @@ class LifecycleProcessorTest {
assertEquals(0, processedOtherEvents)
}

@Test
fun `stop event when coordinator is in ERROR state does not update coordinator status`() {
val state = LifecycleStateManager(5)
state.isRunning = true
state.status = LifecycleStatus.ERROR
var processedStopEvents = 0
var processedOtherEvents = 0
val registry = mock<LifecycleRegistryCoordinatorAccess>()
val processor = LifecycleProcessor(NAME, state, registry, mock()) { event, _ ->
when (event) {
is StopEvent -> {
processedStopEvents++
}
else -> {
processedOtherEvents++
}
}
}
val registration1 = mock<Registration>()
val registration2 = mock<Registration>()
val coordinator = setupCoordinatorMock()
listOf(registration1, registration2).forEach { state.registrations.add(it) }
state.postEvent(StopEvent())
process(processor, coordinator = coordinator)
assertEquals(LifecycleStatus.ERROR, state.status)
}

@Test
fun `start event causes a request to notify about all current tracked registrations`() {
val state = LifecycleStateManager(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,32 @@ class LifecycleRegistryImplTest {
registry.getCoordinator(aliceName)
}
}

@Test
fun `status update is blocked when current status is ERROR`() {
val registry = LifecycleRegistryImpl()
registry.registerCoordinator(aliceName, mock())
registry.updateStatus(aliceName, LifecycleStatus.ERROR, "Simulated error")

// Try updating from ERROR to UP
registry.updateStatus(aliceName, LifecycleStatus.UP, "Alice is up")

// Verify that the status remains ERROR
val statuses = registry.componentStatus()
assertEquals(CoordinatorStatus(aliceName, LifecycleStatus.ERROR, "Simulated error"), statuses[aliceName])
}

@Test
fun `removing a coordinator with ERROR status is blocked`() {
val registry = LifecycleRegistryImpl()
registry.registerCoordinator(aliceName, mock())
registry.updateStatus(aliceName, LifecycleStatus.ERROR, "Simulated error")

// Try removing the coordinator
registry.removeCoordinator(aliceName)

// Verify that the coordinator is still present
assertEquals(1, registry.componentStatus().size)
assertEquals(CoordinatorStatus(aliceName, LifecycleStatus.ERROR, "Simulated error"), registry.componentStatus()[aliceName])
}
}

0 comments on commit 9d1149d

Please sign in to comment.