Skip to content

Commit

Permalink
Kubernetes: report cpu/gpu/memory from parent node (#4413)
Browse files Browse the repository at this point in the history
* Report cpu/gpu/memory from parent node

* Upgrade kind to fix bug

* swap

* Add resources test

* fix runtime -- bytes reporting

* comment

* int

* cmt

* fix

* fix

* fmt

* fix

* update

* Update test_cli.py

* Fix parse

* rm return

* fix

* Fix test

* Fix

* Update test_cli.py

* fix

* fix

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
epicfaace and mergify[bot] authored Jun 14, 2023
1 parent a139943 commit 9374ae8
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 1 deletion.
3 changes: 3 additions & 0 deletions codalab/worker/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,6 @@ def kill(self, container_id: str):

def remove(self, container_id: str):
raise NotImplementedError

def get_node_availability_stats(self) -> dict:
raise NotImplementedError
13 changes: 13 additions & 0 deletions codalab/worker/runtime/kubernetes_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from codalab.common import BundleRuntime
from codalab.worker.runtime import Runtime

import os
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand Down Expand Up @@ -232,3 +233,15 @@ def remove(self, pod_name: str):
f'Exception when calling Kubernetes api->delete_namespaced_pod...: {e}'
)
raise e

def get_node_availability_stats(self) -> dict:
node_name = os.getenv("CODALAB_KUBERNETES_NODE_NAME")
node = self.k8_api.read_node(name=node_name)
allocatable = node.status.allocatable

return {
'cpus': int(allocatable.get('cpu')),
'gpus': int(allocatable.get('nvidia.com/gpu') or '0'),
'memory_bytes': int(utils.parse_quantity(allocatable.get('memory'))),
'free_disk_bytes': int(utils.parse_quantity(allocatable.get('ephemeral-storage'))),
}
11 changes: 11 additions & 0 deletions codalab/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,17 @@ def checkin(self):
'is_terminating': self.terminate or self.terminate_and_restage,
'preemptible': self.preemptible,
}
if self.bundle_runtime.name == BundleRuntime.KUBERNETES.value:
stats = self.bundle_runtime.get_node_availability_stats()
request = dict(
request,
**{
'cpus': stats['cpus'],
'gpus': stats['gpus'],
'memory_bytes': stats['memory_bytes'],
'free_disk_bytes': stats['free_disk_bytes'],
},
)
try:
response = self.bundle_service.checkin(self.id, request)
logger.info('Connected! Successful check in!')
Expand Down
4 changes: 4 additions & 0 deletions codalab/worker_manager/kubernetes_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def start_worker_job(self) -> None:
'env': [
{'name': 'CODALAB_USERNAME', 'value': self.codalab_username},
{'name': 'CODALAB_PASSWORD', 'value': self.codalab_password},
{
'name': 'CODALAB_KUBERNETES_NODE_NAME',
'valueFrom': {'fieldRef': {'fieldPath': 'spec.nodeName'}},
},
],
'resources': {'limits': limits, 'requests': requests},
'volumeMounts': [
Expand Down
2 changes: 1 addition & 1 deletion scripts/local-k8s/kind-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ networking:
apiServerPort: 6443
nodes:
- role: control-plane
image: kindest/node:v1.21.10@sha256:84709f09756ba4f863769bdcabe5edafc2ada72d3c8c44d6515fc581b66b029c
image: kindest/node:v1.22.15@sha256:7d9708c4b0873f0fe2e171e2b1b7f45ae89482617778c1c875f1053d4cef2e41
37 changes: 37 additions & 0 deletions tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2950,6 +2950,10 @@ def test_unicode(ctx):

@TestModule.register('workers')
def test_workers(ctx):
# Spin up a run in case a worker isn't already running, so it can be started by the worker manager.
uuid = _run_command([cl, 'run', 'echo'])
wait(uuid)

result = _run_command([cl, 'workers'])
lines = result.split("\n")

Expand Down Expand Up @@ -2983,6 +2987,39 @@ def test_workers(ctx):
worker_info = lines[2].split()
assert len(worker_info) >= 10

# Make sure that when we run a worker that uses resources, the worker's available resources are decremented accordingly.
cpus_original, gpus_original, free_memory_original, free_disk_original = worker_info[1:5]
cpus_available, cpus_total = (int(i) for i in cpus_original.split("/"))
gpus_available, gpus_total = (int(i) for i in gpus_original.split("/"))
uuid = _run_command(
[
cl,
'run',
'sleep 100',
'--request-cpus',
str(cpus_available),
'--request-gpus',
str(cpus_available),
],
request_memory="100m",
request_disk="100m",
)
wait_until_state(uuid, State.RUNNING)
result = _run_command([cl, 'workers'])
lines = result.split("\n")
worker_info = lines[2].split()
cpus, gpus, free_memory, free_disk = worker_info[1:5]
check_equals(f'0/{cpus_total}', cpus)
check_equals(f'0/{gpus_total}', gpus)

wait(uuid)
result = _run_command([cl, 'workers'])
lines = result.split("\n")
worker_info = lines[2].split()
cpus, gpus, free_memory, free_disk = worker_info[1:5]
check_equals(cpus_original, cpus)
check_equals(gpus_original, gpus)


@TestModule.register('sharing_workers')
def test_sharing_workers(ctx):
Expand Down

0 comments on commit 9374ae8

Please sign in to comment.