Skip to content

Commit

Permalink
[CoreEngine] make the job stopping feature work.
Browse files Browse the repository at this point in the history
  • Loading branch information
fedml-alex committed Jun 25, 2024
1 parent fd038b5 commit 5ae6904
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 15 deletions.
3 changes: 2 additions & 1 deletion python/fedml/computing/scheduler/master/base_master_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def logout():
sys_utils.cleanup_all_fedml_server_api_processes()

def stop(self, kill_process=False):
self.protocol_mgr.stop(kill_process=kill_process)
if self.protocol_mgr is not None:
self.protocol_mgr.stop(kill_process=kill_process)

def _create_protocol_manager(self, role, login_result):
if self.protocol_mgr is not None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import json
import logging
import multiprocessing
Expand Down Expand Up @@ -414,9 +413,9 @@ def _generate_job_runner_instance(self, args, run_id=None, request_json=None, ag
return None

def start_runner_process(
self, run_id, request_json, edge_id=None, is_server_job=False,
sender_message_queue=None, listener_message_queue=None,
status_center_queue=None, process_name=None
self, run_id, request_json, edge_id=None, is_server_job=False,
sender_message_queue=None, listener_message_queue=None,
status_center_queue=None, process_name=None
):
server_runner = self._generate_job_runner_instance(
self.args, run_id=run_id, request_json=request_json,
Expand All @@ -440,7 +439,7 @@ def start_runner_process(
self.run_process_event, self.run_process_completed_event, self.run_edge_id_status_queue,
self.run_edge_device_info_queue, self.run_metrics_queue, self.run_events_queue,
self.run_artifacts_queue, self.run_logs_queue, self.run_edge_device_info_global_queue,
self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue,
self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue,
process_name,
)
)
Expand All @@ -450,7 +449,7 @@ def start_runner_process(
self.run_process_event, self.run_process_completed_event, self.run_edge_id_status_queue,
self.run_edge_device_info_queue, self.run_metrics_queue, self.run_events_queue,
self.run_artifacts_queue, self.run_logs_queue, self.run_edge_device_info_global_queue,
self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue,
self.run_extend_queue_list, sender_message_queue, listener_message_queue, status_center_queue,
process_name,
)
)
Expand Down Expand Up @@ -731,6 +730,3 @@ def should_process_async_cluster(self):

def get_client_id_list(self, server_edge_id_list):
return server_edge_id_list



Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import logging
import multiprocessing
import os
import platform
import time
from abc import ABC
Expand All @@ -11,7 +10,6 @@
import fedml
from .cloud_server_manager import FedMLCloudServerManager
from ..comm_utils.run_process_utils import RunProcessUtils
from ..scheduler_core.general_constants import GeneralConstants
from ..scheduler_core.scheduler_base_job_runner_manager import FedMLSchedulerBaseJobRunnerManager
from ..scheduler_core.account_manager import FedMLAccountManager

Expand Down Expand Up @@ -67,10 +65,10 @@ def stop_job_runner(

run_id_str = str(run_id)
if self.master_agent_instance_map.get(run_id_str, None) is not None:
self.master_agent_instance_map.get(run_id_str).stop()
self.master_agent_instance_map.get(run_id_str).stop(kill_process=True)
self.master_agent_instance_map.pop(run_id_str)

if run_as_cloud_server:
if use_local_process_as_cloud_server:
time.sleep(1)
RunProcessUtils.kill_process(self.cloud_run_process_map[run_id_str].pid)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import base64
import json
import logging
import time

import fedml
from ..comm_utils.constants import SchedulerConstants
Expand Down Expand Up @@ -142,6 +143,7 @@ def on_agent_communication_connected(self, mqtt_client_object):
def callback_start_train(self, topic=None, payload=None):
# Fetch config from MLOps
# noinspection PyBroadException

try:
MLOpsConfigs.fetch_all_configs()
except Exception:
Expand Down Expand Up @@ -290,6 +292,16 @@ def callback_stop_train(self, topic, payload, use_payload=None):
server_agent_id = self.edge_id
topic_stop_train_to_cloud_server = f"mlops/flserver_agent_{server_id}/stop_train"
self.message_center.send_message(topic_stop_train_to_cloud_server, payload)

time.sleep(2)
MLOpsRuntimeLogDaemon.get_instance(self.args).stop_log_processor(run_id, server_id)
self._get_job_runner_manager().stop_job_runner(
run_id, args=self.args, server_id=server_id, request_json=None,
run_as_cloud_agent=self.run_as_cloud_agent, run_as_cloud_server=self.run_as_cloud_server,
use_local_process_as_cloud_server=self.use_local_process_as_cloud_server)
self.generate_status_report(run_id, server_id, server_agent_id=server_agent_id). \
report_server_id_status(run_id, GeneralConstants.MSG_MLOPS_SERVER_STATUS_KILLED,
edge_id=server_id, server_id=server_id)
return

# Reset all edge status and server status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,8 @@ def job_error_processor(self, error_list):

def start_runner_process(
self, run_id, edge_id, request_json, cuda_visible_gpu_ids_str=None,
sender_message_queue=None, status_center_queue=None, process_name=None
sender_message_queue=None, listener_message_queue=None,
status_center_queue=None, process_name=None
):
return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def start_job_runner(
)
self.job_runners[run_id_str].start_runner_process(
run_id, request_json, edge_id=edge_id,
cuda_visible_gpu_ids_str=cuda_visible_gpu_ids_str,
sender_message_queue=sender_message_queue,
listener_message_queue=listener_message_queue,
status_center_queue=status_center_queue,
Expand Down

0 comments on commit 5ae6904

Please sign in to comment.