Skip to content

Commit

Permalink
Optimize status flow validation (#1408)
Browse files Browse the repository at this point in the history
* optimize status flow validation

* lint

* lint

(cherry picked from commit 9e85899)
  • Loading branch information
BalaBalaYi authored and chentianyi.cty committed Jan 2, 2025
1 parent 592bacb commit 918b5c5
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
30 changes: 29 additions & 1 deletion dlrover/python/master/node/status_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,39 @@
]


ALLOWED_TRANSITIONS = {
NodeStatus.INITIAL: [
NodeStatus.INITIAL,
NodeStatus.PENDING,
NodeStatus.RUNNING,
NodeStatus.SUCCEEDED,
NodeStatus.FAILED,
NodeStatus.DELETED,
],
NodeStatus.PENDING: [
NodeStatus.PENDING,
NodeStatus.RUNNING,
NodeStatus.SUCCEEDED,
NodeStatus.FAILED,
NodeStatus.DELETED,
],
NodeStatus.RUNNING: [
NodeStatus.RUNNING,
NodeStatus.SUCCEEDED,
NodeStatus.FAILED,
NodeStatus.DELETED,
],
NodeStatus.SUCCEEDED: [NodeStatus.SUCCEEDED, NodeStatus.DELETED],
NodeStatus.FAILED: [NodeStatus.FAILED, NodeStatus.DELETED],
NodeStatus.DELETED: [NodeStatus.DELETED],
}


def get_node_state_flow(from_status, event_type, phase):
if event_type == "DELETED" and from_status == NodeStatus.PENDING:
# The phase if pending if the pending node is deleted.
phase = NodeStatus.DELETED
if from_status == phase:
if from_status == phase or phase not in ALLOWED_TRANSITIONS[from_status]:
return None
for flow in NODE_STATE_FLOWS:
if (
Expand Down
9 changes: 9 additions & 0 deletions dlrover/python/tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from dlrover.python.master.node.job_context import get_job_context
from dlrover.python.master.node.local_job_manager import LocalJobManager
from dlrover.python.master.node.status_flow import (
ALLOWED_TRANSITIONS,
NODE_STATE_FLOWS,
NodeStateFlow,
get_node_state_flow,
Expand Down Expand Up @@ -122,6 +123,14 @@ def test_get_node_state_flow(self):
self.assertEqual(flow, NODE_STATE_FLOWS[9])
self.assertTrue(flow.should_relaunch)

def test_allowed_transitions(self):
self.assertTrue(
NodeStatus.RUNNING in ALLOWED_TRANSITIONS[NodeStatus.RUNNING]
)
self.assertFalse(
NodeStatus.PENDING in ALLOWED_TRANSITIONS[NodeStatus.RUNNING]
)


class DistributedJobManagerTest(unittest.TestCase):
def setUp(self) -> None:
Expand Down

0 comments on commit 918b5c5

Please sign in to comment.