Skip to content

Commit

Permalink
CORE-20886 stop the registry from being updated when a coordinator is…
Browse files Browse the repository at this point in the history
… in status ERROR
  • Loading branch information
LWogan committed Oct 18, 2024
1 parent 9d1149d commit 6121334
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,21 @@ internal class LifecycleProcessor(
/**
* Perform any logic for updating the status of the coordinator. This includes informing other registered
* coordinators of the status change and informing the registry.
* If a coordinator is in status ERROR then block any transition away from ERROR.
* This allows the liveness probe to determine the health of a worker correctly.
*/
private fun updateStatus(coordinator: LifecycleCoordinatorInternal, newStatus: LifecycleStatus, reason: String) {
if (state.status == LifecycleStatus.ERROR) {
logger.warn("Attempted to update ${coordinator.name} from ERROR to ${newStatus.name}. Transition blocked as ERROR " +
"is a terminal state.")
} else if (state.status != newStatus) {
return
}
if (state.status != newStatus) {
logger.info("Attempting to update coordinator ${coordinator.name} from status ${state.status} to $newStatus. Reason: $reason")
state.status = newStatus
state.registrations.forEach { it.updateCoordinatorStatus(coordinator, newStatus) }
registry.updateStatus(coordinator.name, newStatus, reason)
}
state.status = newStatus
state.registrations.forEach { it.updateCoordinatorStatus(coordinator, newStatus) }
registry.updateStatus(coordinator.name, newStatus, reason)
}

private fun runUserEventHandler(event: LifecycleEvent, coordinator: LifecycleCoordinator): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ internal class LifecycleCoordinatorImplTest {

@Test
@Suppress("TooGenericExceptionThrown")
fun `when a coordinator stops with an error the status is set to error`() {
fun `when a coordinator stops with an error the status is set to error and stays in error when start is called`() {
var startLatch = CountDownLatch(1)
val stopLatch = CountDownLatch(1)
createCoordinator { event, _ ->
Expand All @@ -1074,11 +1074,11 @@ internal class LifecycleCoordinatorImplTest {
assertTrue(stopLatch.await(TIMEOUT, TimeUnit.MILLISECONDS))
assertEquals(LifecycleStatus.ERROR, it.status)

// Restart and prove that the status is set to DOWN.
// Restart and prove that the status stays in ERROR.
startLatch = CountDownLatch(1)
it.start()
assertTrue(startLatch.await(TIMEOUT, TimeUnit.MILLISECONDS))
assertEquals(LifecycleStatus.DOWN, it.status)
assertEquals(LifecycleStatus.ERROR, it.status)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ class LifecycleProcessorTest {
}

@Test
fun `starting from an errored state causes the status to be set back to down`() {
fun `starting from an errored state leaves the status in ERROR`() {
val state = LifecycleStateManager(5)
var processedStartEvents = 0
val registry = mock<LifecycleRegistryCoordinatorAccess>()
Expand All @@ -536,9 +536,7 @@ class LifecycleProcessorTest {
state.registrations.add(registration)
state.postEvent(StartEvent())
process(processor, coordinator = coordinator)
assertEquals(LifecycleStatus.DOWN, state.status)
verify(registration).updateCoordinatorStatus(coordinator, LifecycleStatus.DOWN)
verify(registry).updateStatus(NAME, LifecycleStatus.DOWN, STARTED_REASON)
assertEquals(LifecycleStatus.ERROR, state.status)
}

@Test
Expand Down

0 comments on commit 6121334

Please sign in to comment.