Skip to content

Commit

Permalink
Fix Errandboy (#9)
Browse files Browse the repository at this point in the history
Previously Errandboy was blocking our termination. I think this should resolve the issue.
  • Loading branch information
aidangomez authored Nov 7, 2018
1 parent 1019e9b commit c1bceec
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 18 deletions.
4 changes: 2 additions & 2 deletions cloud/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
import cloud
from cloud import registry as reg
from cloud import Instance
from cloud.envs.utils import config_path
from cloud.envs import utils


def connect():
with open(config_path(), "r") as cf:
with open(utils.config_path(), "r") as cf:
config = toml.load(cf)
provider = config.pop("provider").lower()
cloud.instance = reg.retrieve(provider, config=config)
Expand Down
17 changes: 9 additions & 8 deletions cloud/envs/env.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import logging
import multiprocessing
import traceback
import sys
import time

from cloud.envs import utils
from errand_boy.run import main as eb_main

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -41,13 +39,13 @@ def __init__(self, manager=None, **kwargs):
super().__init__(manager=manager)
self.resource_managers = []

self._p = multiprocessing.Process(target=eb_main, args=([None],))
self._p.start()
time.sleep(2)
assert utils.get_server().is_alive()

def __del__(self):
self._p.terminate()
self._p.join(timeout=5)
def _kill_command_server(self):
logger.warn("Killing transport")
utils.kill_transport()
logger.warn("Killing server")
utils.kill_server()

@property
def driver(self):
Expand Down Expand Up @@ -80,6 +78,8 @@ def down(self, async=False, delete_resources=True):
rm.delete(async=async)
else:
rm.down(async=async)

self._kill_command_server()
self.driver.ex_stop_node(self.node)

def delete(self, async=False, confirm=True):
Expand All @@ -94,6 +94,7 @@ def delete(self, async=False, confirm=True):

super().delete(async=async)

self._kill_command_server()
self.driver.destroy_node(self.node, destroy_boot_disk=True)


Expand Down
8 changes: 4 additions & 4 deletions cloud/envs/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, name, manager=None):
super().__init__(manager=manager)
self._name = name
details = self.details
self.ip = details["ipAddress"]
self.ip = details.get("ipAddress")
self.preemptible = details.get("preemptible") == "true"

@property
Expand All @@ -78,8 +78,8 @@ def details(self):
@property
def usable(self):
details = self.details
return (details["state"] in ["READY", "RUNNING"] and
details["health"] == "HEALTHY")
return (details.get("state") in ["READY", "RUNNING"] and
details.get("health") == "HEALTHY")

def up(self, async=False):
cmd = ["gcloud", "alpha", "compute", "tpus", "start", self.name]
Expand Down Expand Up @@ -136,7 +136,7 @@ def collect_existing(self):

def clean(self, async=True):
for tpu in self.resources:
if tpu.details["health"] != "HEALTHY":
if tpu.details.get("health") != "HEALTHY":
tpu.delete(async=async)

def _new_name(self, length=5):
Expand Down
54 changes: 51 additions & 3 deletions cloud/envs/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,59 @@
import logging
import multiprocessing
import os
import subprocess
import time

from errand_boy.transports.unixsocket import UNIXSocketTransport
from errand_boy.run import main as eb_main

EB_TRANSPORT = None
EB_SERVER = None


def get_transport():
global EB_TRANSPORT
if EB_TRANSPORT is None:
EB_TRANSPORT = UNIXSocketTransport()
return EB_TRANSPORT


def kill_transport():
global EB_TRANSPORT
if EB_TRANSPORT is None:
return

del EB_TRANSPORT
EB_TRANSPORT = None


def _server_fn():
server = UNIXSocketTransport()
server.run_server()


def get_server():
global EB_SERVER
if EB_SERVER is None:
EB_SERVER = multiprocessing.Process(target=_server_fn)
EB_SERVER.start()
time.sleep(1)
logging.getLogger("errand_boy").setLevel(logging.ERROR)
return EB_SERVER


def kill_server():
global EB_SERVER
if EB_SERVER is None:
return

if EB_SERVER.is_alive():
EB_SERVER.terminate()
time.sleep(0.5)
EB_SERVER.join(timeout=5)
del EB_SERVER
EB_SERVER = None

EB_TRANSPORT = UNIXSocketTransport()

logger = logging.getLogger(__name__)

Expand All @@ -13,8 +62,7 @@ def call(cmd):
if isinstance(cmd, list):
cmd = " ".join(cmd)

global EB_TRANSPORT
stdout, stderr, returncode = EB_TRANSPORT.run_cmd(cmd)
stdout, stderr, returncode = get_transport().run_cmd(cmd)
return returncode, stdout.decode("utf-8"), stderr.decode("utf-8")


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

setup(
name='dl-cloud',
version='0.0.2',
version='0.0.3',
description='Cloud resource management for deep learning applications.',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down

0 comments on commit c1bceec

Please sign in to comment.