From f350f4b57410406db0bfaca9b011529cb3bda812 Mon Sep 17 00:00:00 2001 From: BO SANG Date: Wed, 15 Jan 2025 16:05:45 -0800 Subject: [PATCH 1/3] add logs to track failure report --- dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py | 1 + dlrover/python/master/node/dist_job_manager.py | 1 + 2 files changed, 2 insertions(+) diff --git a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py index f0808b18b..833d99748 100644 --- a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py +++ b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py @@ -264,6 +264,7 @@ def diagnose_training_failure(self) -> NodeAction: def _report_failure_to_master(self, failures, restart_count): errors = {} if len(failures) == 0: + logger.info("there is no failures. skip failures report") return for rank, failure in failures.items(): dt = str(datetime.fromtimestamp(int(failure.timestamp))) diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py index 8c1f63318..d6b3b7bf6 100644 --- a/dlrover/python/master/node/dist_job_manager.py +++ b/dlrover/python/master/node/dist_job_manager.py @@ -1198,6 +1198,7 @@ def handle_training_failure( ): """Process the training failure reported by the node.""" node = self._job_context.job_node(node_type, node_id) + logger.info(f"handle failed node: {node}") if node.is_released: logger.info(f"The node {node.name} has been released.") return From b648ea41ebd039c7d619b3eedd8d4519f6cd339a Mon Sep 17 00:00:00 2001 From: BO SANG Date: Wed, 15 Jan 2025 22:29:35 -0800 Subject: [PATCH 2/3] update log format --- dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py | 4 ++-- dlrover/python/master/node/dist_job_manager.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py index 833d99748..b39ac2eaf 100644 --- a/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py +++ b/dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py @@ -264,7 +264,7 @@ def diagnose_training_failure(self) -> NodeAction: def _report_failure_to_master(self, failures, restart_count): errors = {} if len(failures) == 0: - logger.info("there is no failures. skip failures report") + logger.info("Skip failure report due to empty failures") return for rank, failure in failures.items(): dt = str(datetime.fromtimestamp(int(failure.timestamp))) @@ -288,7 +288,7 @@ def send_heartbeat(self): action = self._client.report_heart_beat(ts) self._agent_context.enqueue_diagnosis_action(action) except Exception as e: - logger.warning(f"fail to report a heartbeat: {e}") + logger.warning(f"Fail to report a heartbeat: {e}") def _periodically_report(self): logger.info("Start diagnosis agent periodically reporter.") diff --git a/dlrover/python/master/node/dist_job_manager.py b/dlrover/python/master/node/dist_job_manager.py index d6b3b7bf6..7c3e376e6 100644 --- a/dlrover/python/master/node/dist_job_manager.py +++ b/dlrover/python/master/node/dist_job_manager.py @@ -1198,9 +1198,8 @@ def handle_training_failure( ): """Process the training failure reported by the node.""" node = self._job_context.job_node(node_type, node_id) - logger.info(f"handle failed node: {node}") + logger.info(f"Handle failed node: {node}") if node.is_released: - logger.info(f"The node {node.name} has been released.") return relaunch_node = self._process_error( node, restart_count, error_data, level From 6e79cb5dbd515ed84d9c445537cc79f02c659979 Mon Sep 17 00:00:00 2001 From: BO SANG Date: Wed, 15 Jan 2025 22:55:16 -0800 Subject: [PATCH 3/3] add unit test --- dlrover/python/tests/test_diagnosis_agent.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dlrover/python/tests/test_diagnosis_agent.py b/dlrover/python/tests/test_diagnosis_agent.py index aad5999e0..d820d0b27 100644 --- a/dlrover/python/tests/test_diagnosis_agent.py +++ b/dlrover/python/tests/test_diagnosis_agent.py @@ -196,6 +196,15 @@ def test_send_heartbeat(self): DiagnosisActionType.RESTART_WORKER, ) + agent._client.report_heart_beat = mock.MagicMock( + side_effect=[Exception] + ) + agent.send_heartbeat() + self.assertTrue( + context._diagnosis_action_queue.next_action().action_type, + DiagnosisActionType.NONE, + ) + def test_async_thread(self): DiagnosisConstant.AGENT_PERIODICALLY_DIAGNOSIS_INTERVAL_SECS = 1 DiagnosisConstant.AGENT_PERIODICALLY_REPORT_INTERVAL_SECS = 1