Skip to content

Commit

Permalink
Add logs to track failure report (#1441)
Browse files Browse the repository at this point in the history
* add logs to track failure report

* update log format

* add unit test

---------

Co-authored-by: BO SANG <[email protected]>
Co-authored-by: Tianyi Chen <[email protected]>
  • Loading branch information
3 people authored Jan 16, 2025
1 parent 227940f commit 34f7c4b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
3 changes: 2 additions & 1 deletion dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def diagnose_training_failure(self) -> NodeAction:
def _report_failure_to_master(self, failures, restart_count):
errors = {}
if len(failures) == 0:
logger.info("Skip failure report due to empty failures")
return
for rank, failure in failures.items():
dt = str(datetime.fromtimestamp(int(failure.timestamp)))
Expand All @@ -287,7 +288,7 @@ def send_heartbeat(self):
action = self._client.report_heart_beat(ts)
self._agent_context.enqueue_diagnosis_action(action)
except Exception as e:
logger.warning(f"fail to report a heartbeat: {e}")
logger.warning(f"Fail to report a heartbeat: {e}")

def _periodically_report(self):
logger.info("Start diagnosis agent periodically reporter.")
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/master/node/dist_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,8 @@ def handle_training_failure(
):
"""Process the training failure reported by the node."""
node = self._job_context.job_node(node_type, node_id)
logger.info(f"Handle failed node: {node}")
if node.is_released:
logger.info(f"The node {node.name} has been released.")
return
relaunch_node = self._process_error(
node, restart_count, error_data, level
Expand Down
9 changes: 9 additions & 0 deletions dlrover/python/tests/test_diagnosis_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ def test_send_heartbeat(self):
DiagnosisActionType.RESTART_WORKER,
)

agent._client.report_heart_beat = mock.MagicMock(
side_effect=[Exception]
)
agent.send_heartbeat()
self.assertTrue(
context._diagnosis_action_queue.next_action().action_type,
DiagnosisActionType.NONE,
)

def test_async_thread(self):
DiagnosisConstant.AGENT_PERIODICALLY_DIAGNOSIS_INTERVAL_SECS = 1
DiagnosisConstant.AGENT_PERIODICALLY_REPORT_INTERVAL_SECS = 1
Expand Down

0 comments on commit 34f7c4b

Please sign in to comment.