From 7a343570409e390cc40ebe099f4e0eabf1e9f8a8 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Wed, 8 Jan 2025 17:59:37 +0800 Subject: [PATCH] revert(NodeState): revert `SHUTDOWN` and `SHUTTING_DOWN` to `FENCED` and `CONTROLLED_SHUTDOWN` Signed-off-by: Ning Yu --- .../kafka/controller/BrokerHeartbeatManager.java | 6 +++--- .../kafka/controller/stream/NodeState.java | 16 +++++----------- .../controller/BrokerHeartbeatManagerTest.java | 8 ++++---- .../stream/NodeControlManagerTest.java | 4 ++-- 4 files changed, 14 insertions(+), 20 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index 5d3bc72935..8be03d2280 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -518,14 +518,14 @@ public NodeState brokerState(int brokerId, long shutdownTimeoutNs) { return NodeState.UNKNOWN; } if (broker.shuttingDown()) { - return NodeState.SHUTTING_DOWN; + return NodeState.CONTROLLED_SHUTDOWN; } if (broker.fenced()) { if (broker.lastControlledShutdownNs() + shutdownTimeoutNs > time.nanoseconds()) { // The broker is still in controlled shutdown. - return NodeState.SHUTTING_DOWN; + return NodeState.CONTROLLED_SHUTDOWN; } - return NodeState.SHUTDOWN; + return NodeState.FENCED; } return NodeState.ACTIVE; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java index 97eb8fc497..6b7c28b18c 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/NodeState.java @@ -11,6 +11,8 @@ package org.apache.kafka.controller.stream; +import org.apache.kafka.controller.BrokerControlState; + public enum NodeState { /** * The node is active and can handle requests. @@ -18,21 +20,13 @@ public enum NodeState { ACTIVE, /** * The node is shutting down in a controlled manner. + * Note: In AutoMQ, this state is different from {@link BrokerControlState#CONTROLLED_SHUTDOWN}. In some cases, + * a node in {@link BrokerControlState#FENCED} state may still be shutting down in a controlled manner. */ - SHUTTING_DOWN, - /** - * The node is shut down and cannot handle requests. - */ - SHUTDOWN, - /** - * Use @{@link #SHUTTING_DOWN} instead. - */ - @Deprecated CONTROLLED_SHUTDOWN, /** - * Use @{@link #SHUTDOWN} instead. + * The node is shut down and cannot handle requests. */ - @Deprecated FENCED, /** * The state of the node is unknown, possibly because it has not yet registered. diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index 294909475d..9c96de0b8b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -374,7 +374,7 @@ public void testBrokerState() { manager.register(0, true); // FENCED Broker - assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); // UNFENCED Broker manager.touch(0, false, 100); @@ -382,16 +382,16 @@ public void testBrokerState() { // CONTROLLED_SHUTDOWN Broker manager.maybeUpdateControlledShutdownOffset(0, 100); - assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); // SHUTDOWN_NOW Broker within shutdownTimeoutNs manager.touch(0, true, 100); manager.time().sleep(5); - assertEquals(NodeState.SHUTTING_DOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.CONTROLLED_SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); // SHUTDOWN_NOW Broker after shutdownTimeoutNs manager.time().sleep(6); - assertEquals(NodeState.SHUTDOWN, manager.brokerState(0, shutdownTimeoutNs)); + assertEquals(NodeState.FENCED, manager.brokerState(0, shutdownTimeoutNs)); // UNFENCED Broker after SHUTDOWN manager.touch(0, false, 100); diff --git a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java index 1b0266837e..b725d9e69a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/stream/NodeControlManagerTest.java @@ -95,7 +95,7 @@ public void testRegister() { assertTrue(nodeControlManager.nodeMetadataMap.containsKey(0)); when(nodeRuntimeInfoGetter.hasOpeningStreams(eq(0))).thenReturn(true); - when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.SHUTDOWN); + when(nodeRuntimeInfoGetter.state(eq(0))).thenReturn(NodeState.FENCED); ControllerResult getRst = nodeControlManager.getMetadata( new AutomqGetNodesRequest(new AutomqGetNodesRequestData().setNodeIds(List.of(0, 1)), @@ -107,7 +107,7 @@ public void testRegister() { assertEquals(0, nodes.get(0).nodeId()); assertEquals(2L, nodes.get(0).nodeEpoch()); assertEquals("wal2", nodes.get(0).walConfig()); - assertEquals("SHUTDOWN", nodes.get(0).state()); + assertEquals(NodeState.FENCED.name(), nodes.get(0).state()); } AutomqRegisterNodeRequestData.TagCollection tags(Map tags) {