Skip to content

Commit

Permalink
Merge branch 'master' into new-job-master
Browse files Browse the repository at this point in the history
  • Loading branch information
workingloong authored Jan 19, 2025
2 parents 2d31d1b + 34f7c4b commit 0fd773c
Show file tree
Hide file tree
Showing 22 changed files with 3,139 additions and 3,806 deletions.
13 changes: 13 additions & 0 deletions .github/actions/elasticjob-controller-test/action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
name: elasticjob-controller-test
description: run gotest to execute go test cases of ElasticJob operator
runs:
using: 'docker'
image: "easydl/dlrover:ci"
args:
- "/bin/bash"
- "-c"
- "rm -rf /usr/local/go && \
wget -q https://go.dev/dl/go1.23.4.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go1.23.4.linux-amd64.tar.gz && \
cd go/elasticjob && go test ./..."
9 changes: 3 additions & 6 deletions .github/actions/go-master-test/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ name: go-master-test
description: run gotest to execute go test cases of ElasticJob operator
runs:
using: 'docker'
image: "easydl/dlrover:ci"
image: "golang:1.23.4"
args:
- "/bin/bash"
- "-c"
- "rm -rf /usr/local/go && \
wget -q https://go.dev/dl/go1.23.4.linux-amd64.tar.gz && \
tar -C /usr/local -xzf go1.23.4.linux-amd64.tar.gz && \
go install github.com/onsi/ginkgo/v2/[email protected] &&
cd go/master && ginkgo -v ./..."
- "go install github.com/onsi/ginkgo/v2/[email protected] && \
cd go/master && ginkgo -v ./..."
10 changes: 0 additions & 10 deletions .github/actions/operator-test/action.yml

This file was deleted.

4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ jobs:
# This step references the directory that contains the action.
- name: RUN gotest
uses: ./.github/actions/go-master-test
operator-test:
elasticjob-controller-test:
runs-on: ubuntu-latest
steps:
# This step checks out a copy of your repository.
- uses: actions/checkout@v3
# This step references the directory that contains the action.
- name: RUN gotest
uses: ./.github/actions/operator-test
uses: ./.github/actions/elasticjob-controller-test
brain-test:
runs-on: ubuntu-latest
steps:
Expand Down
13 changes: 9 additions & 4 deletions dlrover/python/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,20 @@ class JobConstant(object):
INSUFFICIENT_NODE_TIMEOUT_DEFAULT_MIN = 600
INSUFFICIENT_NODE_TIMEOUT_DEFAULT_MAX = 3600
PENDING_NODE_TIMEOUT_DEFAULT_MIN = 600
NODE_CHECK_TIMEOUT = 300

# grpc timeout 60s
MASTER_CLIENT_GRPC_DEFAULT_TIMEOUT = 60

# sleep 3s on NetworkFailureReason.WAITING_NODE
MASTER_CLIENT_CHECK_FAULT_TIMEOUT = 3
# master_client.check_fault_node/check_straggler timeout value
# must > NODE_CHECK_TIMEOUT
MASTER_CLIENT_CHECK_NODE_TIMEOUT = 360

# sleep 3s on NetworkFailureReason.WAITING_NODE
MASTER_CLIENT_CHECK_STRAGGLER_TIMEOUT = 3
# sleep 1s on NetworkFailureReason.WAITING_NODE
MASTER_CLIENT_CHECK_FAULT_SLEEP_TIMEOUT = 1

# sleep 1s on NetworkFailureReason.WAITING_NODE
MASTER_CLIENT_CHECK_STRAGGLER_SLEEP_TIMEOUT = 1

# sleep 5s before next node check round
NODE_CHECK_NEXT_ROUND_TIMEOUT = 5
Expand Down
3 changes: 2 additions & 1 deletion dlrover/python/elastic_agent/diagnosis/diagnosis_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ def diagnose_training_failure(self) -> NodeAction:
def _report_failure_to_master(self, failures, restart_count):
errors = {}
if len(failures) == 0:
logger.info("Skip failure report due to empty failures")
return
for rank, failure in failures.items():
dt = str(datetime.fromtimestamp(int(failure.timestamp)))
Expand All @@ -287,7 +288,7 @@ def send_heartbeat(self):
action = self._client.report_heart_beat(ts)
self._agent_context.enqueue_diagnosis_action(action)
except Exception as e:
logger.warning(f"fail to report a heartbeat: {e}")
logger.warning(f"Fail to report a heartbeat: {e}")

def _periodically_report(self):
logger.info("Start diagnosis agent periodically reporter.")
Expand Down
10 changes: 6 additions & 4 deletions dlrover/python/elastic_agent/master_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,9 @@ def check_fault_node(self, timeout=300):
result: grpc.NetworkCheckResult = self._get(request)
if (
result.reason == NetworkFailureReason.WAITING_NODE
and time.time() - start < timeout
):
time.sleep(JobConstant.MASTER_CLIENT_CHECK_FAULT_TIMEOUT)
or result.reason == NetworkFailureReason.NO_INIT
) and time.time() - start < timeout:
time.sleep(JobConstant.MASTER_CLIENT_CHECK_FAULT_SLEEP_TIMEOUT)
continue
break
return result.nodes, result.reason
Expand All @@ -411,7 +411,9 @@ def check_straggler(self, timeout=300):
result.reason == NetworkFailureReason.WAITING_NODE
and time.time() - start < timeout
):
time.sleep(JobConstant.MASTER_CLIENT_CHECK_STRAGGLER_TIMEOUT)
time.sleep(
JobConstant.MASTER_CLIENT_CHECK_STRAGGLER_SLEEP_TIMEOUT
)
continue
break
return result.nodes, result.reason
Expand Down
15 changes: 12 additions & 3 deletions dlrover/python/elastic_agent/torch/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,9 @@ def __init__(
self._check_round = check_round
self._config: ElasticLaunchConfig = config

def _get_check_node_timeout(self):
return JobConstant.MASTER_CLIENT_CHECK_NODE_TIMEOUT

def run(self, role: str = DEFAULT_ROLE) -> bool:
spec = self._worker_group.spec
role = spec.role
Expand All @@ -1409,7 +1412,9 @@ def run(self, role: str = DEFAULT_ROLE) -> bool:
fault_nodes = []
stragglers = []
for i in range(self._check_round):
result, elapsed_time = self._run_node_check()
result, elapsed_time = self._run_node_check(
timeout=JobConstant.NODE_CHECK_TIMEOUT
)
elapsed_time = round(elapsed_time, 3)
logger.info(
f"Network check time of round {i} is {elapsed_time}"
Expand All @@ -1426,8 +1431,12 @@ def run(self, role: str = DEFAULT_ROLE) -> bool:
elapsed_time,
)
success = success or result
fault_nodes, fault_reason = self._client.check_fault_node()
stragglers, straggler_reason = self._client.check_straggler()
fault_nodes, fault_reason = self._client.check_fault_node(
timeout=self._get_check_node_timeout()
)
stragglers, straggler_reason = self._client.check_straggler(
timeout=self._get_check_node_timeout()
)
logger.info(
f"Fault nodes are: {fault_nodes} with {fault_reason} "
f" and stragglers are: {stragglers} with {straggler_reason}"
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/master/node/dist_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,8 @@ def handle_training_failure(
):
"""Process the training failure reported by the node."""
node = self._job_context.job_node(node_type, node_id)
logger.info(f"Handle failed node: {node}")
if node.is_released:
logger.info(f"The node {node.name} has been released.")
return
relaunch_node = self._process_error(
node, restart_count, error_data, level
Expand Down
9 changes: 9 additions & 0 deletions dlrover/python/tests/test_diagnosis_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ def test_send_heartbeat(self):
DiagnosisActionType.RESTART_WORKER,
)

agent._client.report_heart_beat = mock.MagicMock(
side_effect=[Exception]
)
agent.send_heartbeat()
self.assertTrue(
context._diagnosis_action_queue.next_action().action_type,
DiagnosisActionType.NONE,
)

def test_async_thread(self):
DiagnosisConstant.AGENT_PERIODICALLY_DIAGNOSIS_INTERVAL_SECS = 1
DiagnosisConstant.AGENT_PERIODICALLY_REPORT_INTERVAL_SECS = 1
Expand Down
17 changes: 17 additions & 0 deletions dlrover/python/tests/test_elastic_training_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,8 @@ def test_run_agent(self):
)

# with no fault and no stragglers
agent._client.check_fault_node = mock.MagicMock(return_value=([], ""))
agent._client.check_straggler = mock.MagicMock(return_value=([], ""))
agent._run_node_check = mock.MagicMock(return_value=(True, 100))
agent._stop_workers = mock.MagicMock(return_value=True)
self.assertTrue(agent.run())
Expand Down Expand Up @@ -821,6 +823,21 @@ def test_comm_perf_test(self, mock_run):
comm_perf_check(config, entrypoint, args)
mock_run.assert_called()

def test_get_check_node_timeout(self):
config = ElasticLaunchConfig(4, 4, 8)

agent = _create_check_agent(
config=config,
entrypoint="python",
args=[],
rdzv_name="elastic-training",
check_round=2,
)
self.assertEqual(
agent._get_check_node_timeout(),
JobConstant.MASTER_CLIENT_CHECK_NODE_TIMEOUT,
)


class MasterRendezvousHandlerTest(unittest.TestCase):
def setUp(self) -> None:
Expand Down
2 changes: 1 addition & 1 deletion dlrover/python/tests/test_master_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def test_get(self):
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].type, NodeType.WORKER)

nodes, _ = self._master_client.check_fault_node()
nodes, _ = self._master_client.check_fault_node(timeout=1)
self.assertListEqual(nodes, [])

round = self._master_client.join_rendezvous(0, 8, "elastic-training")
Expand Down
6 changes: 5 additions & 1 deletion docs/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,16 @@ make install
make run
```

- Deploy the controller with GO 1.18.
- Deploy the controller with GO 1.23.4

```bash
make deploy IMG=easydl/elasticjob-controller:master
```

If you cannot use curl to download kustomize when running `make deploy`,
you can download the kustomize from [release page](https://github.com/kubernetes-sigs/kustomize/releases)
and move the kustomize bin to `go/elasticjob/bin/`

### 3. Grant Permission for the DLRover Master to Access CRDs

```bash
Expand Down
2 changes: 1 addition & 1 deletion go/elasticjob/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ ENVTEST ?= $(LOCALBIN)/setup-envtest

## Tool Versions
KUSTOMIZE_VERSION ?= v3.8.7
CONTROLLER_TOOLS_VERSION ?= v0.9.2
CONTROLLER_TOOLS_VERSION ?= v0.14.0

KUSTOMIZE_INSTALL_SCRIPT ?= "https://raw.githubusercontent.com/kubernetes-sigs/kustomize/master/hack/install_kustomize.sh"
.PHONY: kustomize
Expand Down
1 change: 1 addition & 0 deletions go/elasticjob/api/v1alpha1/scaleplan_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type ScaleSpec struct {
// {
// "PS": ReplicaResourceSpec,
// "worker": ReplicaResourceSpec,
// "dlrover-master": ReplicaResourceSpec,
// }
ReplicaResourceSpecs map[commonv1.ReplicaType]ReplicaResourceSpec `json:"replicaResourceSpecs,omitempty"`

Expand Down
Loading

0 comments on commit 0fd773c

Please sign in to comment.