diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py index d1b34c864..8c1f63318 100644 --- a/dlrover/python/master/node/dist_job_manager.py +++ b/dlrover/python/master/node/dist_job_manager.py @@ -510,7 +510,7 @@ def _get_dead_node_event(self, window_interval=600) -> List[NodeEvent]: and node.start_time and node.create_time and node.status == NodeStatus.RUNNING - and not node.is_exited_reported() + and not node.is_succeeded_and_exited() ): if ( node.heartbeat_time <= node.start_time.timestamp() diff --git a/dlrover/python/tests/test_job_manager.py b/dlrover/python/tests/test_job_manager.py index 4d4265291..0e691decc 100644 --- a/dlrover/python/tests/test_job_manager.py +++ b/dlrover/python/tests/test_job_manager.py @@ -431,6 +431,21 @@ def test_get_dead_node_event(self): events = manager._get_dead_node_event() self.assertEqual(len(events), 0) + job_nodes = self.job_context.job_nodes() + for index, node in enumerate(job_nodes[NodeType.WORKER].values()): + node.status = NodeStatus.RUNNING + now = datetime.now() + node.heartbeat_time = (now - timedelta(seconds=1000)).timestamp() + if index == 0: + node.reported_status = NodeEventType.SUCCEEDED_EXITED + else: + node.reported_status = NodeEventType.FAILED_EXITED + node.create_time = now - timedelta(seconds=1400) + node.start_time = now - timedelta(seconds=1200) + self.job_context.update_job_node(node) + events = manager._get_dead_node_event() + self.assertEqual(len(events), 2) + def test_relaunch_training_master(self): params = MockK8sPSJobArgs() params.initilize()