Skip to content

Commit

Permalink
Enhance ut performance (#1427)
Browse files Browse the repository at this point in the history
* enhance ut performance

* enhance ut performance

* enhance ut performance

* enhance ut performance
  • Loading branch information
BalaBalaYi authored Jan 6, 2025
1 parent fe85c8c commit 18b8e83
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 19 deletions.
6 changes: 6 additions & 0 deletions dlrover/python/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,21 @@ class JobConstant(object):
INSUFFICIENT_NODE_TIMEOUT_DEFAULT_MIN = 600
INSUFFICIENT_NODE_TIMEOUT_DEFAULT_MAX = 3600
PENDING_NODE_TIMEOUT_DEFAULT_MIN = 600

# grpc timeout 60s
MASTER_CLIENT_GRPC_DEFAULT_TIMEOUT = 60

# sleep 3s on NetworkFailureReason.WAITING_NODE
MASTER_CLIENT_CHECK_FAULT_TIMEOUT = 3

# sleep 3s on NetworkFailureReason.WAITING_NODE
MASTER_CLIENT_CHECK_STRAGGLER_TIMEOUT = 3

# sleep 5s before next node check round
NODE_CHECK_NEXT_ROUND_TIMEOUT = 5

TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 15


class Accelerators(object):
NVIDIA_GPU = "nvidia.com/gpu"
Expand Down
13 changes: 6 additions & 7 deletions dlrover/python/elastic_agent/torch/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@
__all__ = ["launch_agent"]


_DEFAULT_INTERVAL = 15


def _set_paral_config():
"""
Set up the directory and path for the parallelism configuration.
Expand Down Expand Up @@ -346,7 +343,9 @@ def next_rendezvous(self):
"and waits for more nodes."
)
start_pending = time.time()
time.sleep(_DEFAULT_INTERVAL)
time.sleep(
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL
)
start_join = time.time()
if start_join - start_pending > self.pend_timeout:
raise TimeoutError(
Expand All @@ -363,7 +362,7 @@ def next_rendezvous(self):
err_msg, level=TrainingExceptionLevel.RDZV_ERROR
)
raise TimeoutError(err_msg)
time.sleep(_DEFAULT_INTERVAL)
time.sleep(JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL)
rank = list(world.keys()).index(self._node_rank)
world_size = len(world)
logger.info(
Expand Down Expand Up @@ -866,7 +865,7 @@ def _initialize_workers(self, worker_group):
"Exit elastic-training rendezvous when there are "
"agents to join the network-check rendezvous."
)
time.sleep(_DEFAULT_INTERVAL)
time.sleep(JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL)
if time.time() - start_pending > pend_timeout:
raise TimeoutError("Timeout to wait for new nodes.")
else:
Expand Down Expand Up @@ -1086,7 +1085,7 @@ def _wait_async_saver(self):
# memory to the storage.
start_wait_time = time.time()
while saver.wait_saving_checkpoint():
time.sleep(_DEFAULT_INTERVAL)
time.sleep(JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL)
wait_time = round(time.time() - start_wait_time, 2)
logger.info(
"Wait for saving the checkpoint and "
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/tests/test_diagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def test_action_queue(self):
action_queue.add_action(action1)
action_queue.add_action(action2)

time.sleep(1)
time.sleep(0.1)
self.assertEqual(
action_queue.next_action(instance=1).action_type,
DiagnosisActionType.NONE,
Expand Down
6 changes: 3 additions & 3 deletions dlrover/python/tests/test_diagnosis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ def tearDown(self):
pass

def test_data_manager(self):
mgr = DiagnosisDataManager(5)
mgr = DiagnosisDataManager(1)
log1 = TrainingLog(0)
mgr.store_data(log1)
time.sleep(1)
time.sleep(0.01)
log2 = TrainingLog(0)
mgr.store_data(log2)

logs = mgr.get_data(DiagnosisDataType.TRAINING_LOG)
self.assertEqual(len(logs), 2)

time.sleep(6)
time.sleep(1.5)
log3 = TrainingLog(0)
mgr.store_data(log3)
logs = mgr.get_data(DiagnosisDataType.TRAINING_LOG)
Expand Down
14 changes: 12 additions & 2 deletions dlrover/python/tests/test_elastic_training_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
Accelerators,
AscendConstants,
ConfigPath,
JobConstant,
NodeEnv,
RendezvousName,
)
Expand Down Expand Up @@ -115,8 +116,10 @@ def setUp(self) -> None:
master_addr=master_addr,
local_addr=self.config.local_addr,
)
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 1

def tearDown(self):
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 15
self._master.stop()
os.environ.clear()

Expand Down Expand Up @@ -193,7 +196,7 @@ def test_rank1_rendezvous(self):
store = self.rdzv_handler._get_store(round=1, group=0)

def _set_store(store):
time.sleep(5)
time.sleep(1)
store.set("MASTER_ADDR", "127.0.0.1".encode())
store.set("MASTER_PORT", "12345".encode())

Expand Down Expand Up @@ -227,6 +230,7 @@ def test_get_local_ip(self):
self.assertEqual(local_ip, "127.0.0.1")

def test_initialize_worker(self):
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 1
node_id = 1
agent = ElasticTrainingAgent(
node_rank=node_id,
Expand Down Expand Up @@ -294,8 +298,10 @@ def setUp(self) -> None:
master_addr=master_addr,
local_addr=self.config.local_addr,
)
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 1

def tearDown(self):
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 15
self._master.stop()

def test_monitor_workers(self):
Expand Down Expand Up @@ -513,7 +519,7 @@ def test_stop_workers_ascend(self, cmdline=None):
)

def stop_task(agent):
time.sleep(10)
time.sleep(1)
agent._stop_workers_ascend(None)

stop_task = threading.Thread(target=stop_task, args=(agent,))
Expand Down Expand Up @@ -700,8 +706,10 @@ def setUp(self) -> None:
master_addr=master_addr,
local_addr=self.config.local_addr,
)
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 1

def tearDown(self):
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 15
self._master.stop()

def test_get_network_check_time(self):
Expand Down Expand Up @@ -800,8 +808,10 @@ class MasterRendezvousHandlerTest(unittest.TestCase):
def setUp(self) -> None:
self._master, addr = start_local_master()
MasterClient._instance = build_master_client(addr, 0.5)
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 1

def tearDown(self):
JobConstant.TRAINING_AGENT_LOOP_DEFAULT_INTERVAL = 15
self._master.stop()

def test_pend_timeout(self):
Expand Down
6 changes: 3 additions & 3 deletions dlrover/python/tests/test_pod_scaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ def test_check_master_service_avaliable(self):
else:
wrong_port = 22222
passed = scaler._check_master_service_avaliable(
"elasticjob-test-master", wrong_port, 2
"elasticjob-test-master", wrong_port, 1
)
self.assertFalse(passed)

passed = scaler._check_master_service_avaliable(
"localhost", wrong_port, 2
"localhost", wrong_port, 1
)
self.assertFalse(passed)

passed = scaler._check_master_service_avaliable("localhost", port, 2)
passed = scaler._check_master_service_avaliable("localhost", port, 1)
self.assertFalse(passed)

def test_periodic_create_pod(self):
Expand Down
7 changes: 4 additions & 3 deletions dlrover/python/tests/test_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,7 @@ def test_is_training_hang_by_insufficient_worker(self):
worker_manager.is_training_hang_by_insufficient_worker()
)

# set the timeout interval 1s
worker_manager.update_node_required_info((4, 8, 1))
self.assertFalse(
worker_manager.is_training_hang_by_insufficient_worker()
Expand Down Expand Up @@ -584,7 +585,7 @@ def test_is_training_hang_by_insufficient_worker(self):
for _ in range(5):
if worker_manager.is_training_hang_by_insufficient_worker():
is_insufficient += 1
time.sleep(1)
time.sleep(0.5)
self.assertEqual(is_insufficient, 0)
mock_nodes.clear()
is_insufficient = 0
Expand All @@ -604,7 +605,7 @@ def test_is_training_hang_by_insufficient_worker(self):
for _ in range(5):
if worker_manager.is_training_hang_by_insufficient_worker():
is_insufficient += 1
time.sleep(1)
time.sleep(0.5)
self.assertTrue(is_insufficient >= 2)
mock_nodes.clear()
is_insufficient = 0
Expand All @@ -627,5 +628,5 @@ def test_is_training_hang_by_insufficient_worker(self):
for _ in range(5):
if worker_manager.is_training_hang_by_insufficient_worker():
is_insufficient += 1
time.sleep(1)
time.sleep(0.5)
self.assertTrue(is_insufficient >= 2)

0 comments on commit 18b8e83

Please sign in to comment.