diff --git a/python/fedml/computing/scheduler/master/base_master_agent.py b/python/fedml/computing/scheduler/master/base_master_agent.py index e7d18f64f..30cf5da1c 100755 --- a/python/fedml/computing/scheduler/master/base_master_agent.py +++ b/python/fedml/computing/scheduler/master/base_master_agent.py @@ -72,7 +72,8 @@ def logout(): sys_utils.cleanup_all_fedml_server_api_processes() def stop(self, kill_process=False): - self.protocol_mgr.stop(kill_process=kill_process) + if self.protocol_mgr is not None: + self.protocol_mgr.stop(kill_process=kill_process) def _create_protocol_manager(self, role, login_result): if self.protocol_mgr is not None: diff --git a/python/fedml/computing/scheduler/master/base_master_job_runner.py b/python/fedml/computing/scheduler/master/base_master_job_runner.py index 32b285dc7..fdfff143a 100755 --- a/python/fedml/computing/scheduler/master/base_master_job_runner.py +++ b/python/fedml/computing/scheduler/master/base_master_job_runner.py @@ -1,4 +1,3 @@ - import json import logging import multiprocessing @@ -414,9 +413,9 @@ def _generate_job_runner_instance(self, args, run_id=None, request_json=None, ag return None def start_runner_process( - self, run_id, request_json, edge_id=None, is_server_job=False, - sender_message_queue=None, listener_message_queue=None, - status_center_queue=None, process_name=None + self, run_id, request_json, edge_id=None, is_server_job=False, + sender_message_queue=None, listener_message_queue=None, + status_center_queue=None, process_name=None ): server_runner = self._generate_job_runner_instance( self.args, run_id=run_id, request_json=request_json, @@ -440,7 +439,7 @@ def start_runner_process( self.run_process_event, self.run_process_completed_event, self.run_edge_id_status_queue, self.run_edge_device_info_queue, self.run_metrics_queue, self.run_events_queue, self.run_artifacts_queue, self.run_logs_queue, self.run_edge_device_info_global_queue, - self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue, + self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue, process_name, ) ) @@ -450,7 +449,7 @@ def start_runner_process( self.run_process_event, self.run_process_completed_event, self.run_edge_id_status_queue, self.run_edge_device_info_queue, self.run_metrics_queue, self.run_events_queue, self.run_artifacts_queue, self.run_logs_queue, self.run_edge_device_info_global_queue, - self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue, + self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue, process_name, ) ) @@ -731,6 +730,3 @@ def should_process_async_cluster(self): def get_client_id_list(self, server_edge_id_list): return server_edge_id_list - - - diff --git a/python/fedml/computing/scheduler/master/base_master_job_runner_manager.py b/python/fedml/computing/scheduler/master/base_master_job_runner_manager.py index f462596cb..39f743869 100755 --- a/python/fedml/computing/scheduler/master/base_master_job_runner_manager.py +++ b/python/fedml/computing/scheduler/master/base_master_job_runner_manager.py @@ -2,7 +2,6 @@ import json import logging import multiprocessing -import os import platform import time from abc import ABC @@ -11,7 +10,6 @@ import fedml from .cloud_server_manager import FedMLCloudServerManager from ..comm_utils.run_process_utils import RunProcessUtils -from ..scheduler_core.general_constants import GeneralConstants from ..scheduler_core.scheduler_base_job_runner_manager import FedMLSchedulerBaseJobRunnerManager from ..scheduler_core.account_manager import FedMLAccountManager @@ -67,10 +65,10 @@ def stop_job_runner( run_id_str = str(run_id) if self.master_agent_instance_map.get(run_id_str, None) is not None: - self.master_agent_instance_map.get(run_id_str).stop() + self.master_agent_instance_map.get(run_id_str).stop(kill_process=True) self.master_agent_instance_map.pop(run_id_str) - if run_as_cloud_server: + if use_local_process_as_cloud_server: time.sleep(1) RunProcessUtils.kill_process(self.cloud_run_process_map[run_id_str].pid) diff --git a/python/fedml/computing/scheduler/master/base_master_protocol_manager.py b/python/fedml/computing/scheduler/master/base_master_protocol_manager.py index 9d3b49275..05529f8c8 100755 --- a/python/fedml/computing/scheduler/master/base_master_protocol_manager.py +++ b/python/fedml/computing/scheduler/master/base_master_protocol_manager.py @@ -2,6 +2,7 @@ import base64 import json import logging +import time import fedml from ..comm_utils.constants import SchedulerConstants @@ -142,6 +143,7 @@ def on_agent_communication_connected(self, mqtt_client_object): def callback_start_train(self, topic=None, payload=None): # Fetch config from MLOps # noinspection PyBroadException + try: MLOpsConfigs.fetch_all_configs() except Exception: @@ -290,6 +292,16 @@ def callback_stop_train(self, topic, payload, use_payload=None): server_agent_id = self.edge_id topic_stop_train_to_cloud_server = f"mlops/flserver_agent_{server_id}/stop_train" self.message_center.send_message(topic_stop_train_to_cloud_server, payload) + + time.sleep(2) + MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, server_id) + self._get_job_runner_manager().stop_job_runner( + run_id, args=self.args, server_id=server_id, request_json=None, + run_as_cloud_agent=self.run_as_cloud_agent, run_as_cloud_server=self.run_as_cloud_server, + use_local_process_as_cloud_server=self.use_local_process_as_cloud_server) + self.generate_status_report(run_id, server_id, server_agent_id=server_agent_id). \ + report_server_id_status(run_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_KILLED, + edge_id=server_id, server_id=server_id) return # Reset all edge status and server status diff --git a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py index 7b0d00f53..717503237 100755 --- a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py +++ b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner.py @@ -615,7 +615,8 @@ def job_error_processor(self, error_list): def start_runner_process( self, run_id, edge_id, request_json, cuda_visible_gpu_ids_str=None, - sender_message_queue=None, status_center_queue=None, process_name=None + sender_message_queue=None, listener_message_queue=None, + status_center_queue=None, process_name=None ): return None diff --git a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner_manager.py b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner_manager.py index 8edf57fcb..ad32f7863 100755 --- a/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner_manager.py +++ b/python/fedml/computing/scheduler/scheduler_core/scheduler_base_job_runner_manager.py @@ -29,6 +29,7 @@ def start_job_runner( ) self.job_runners[run_id_str].start_runner_process( run_id, request_json, edge_id=edge_id, + cuda_visible_gpu_ids_str=cuda_visible_gpu_ids_str, sender_message_queue=sender_message_queue, listener_message_queue=listener_message_queue, status_center_queue=status_center_queue,