Skip to content

Commit

Permalink
fix: update spinner with the service readyz status
Browse files Browse the repository at this point in the history
Signed-off-by: Frost Ming <[email protected]>
  • Loading branch information
frostming committed Oct 17, 2024
1 parent 5fca0e3 commit 7178b2f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
4 changes: 3 additions & 1 deletion src/bentoml/_internal/cloud/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ def tail_logs(
}
)

def heartbeat():
def heartbeat() -> None:
while True:
try:
ws.send_json({"type": "heartbeat"})
Expand All @@ -751,6 +751,8 @@ def heartbeat():
jsn = schema_from_object(data, LogWSResponseSchema)
if jsn.type == "error":
raise CloudRESTApiClientError(jsn.message or "Unknown error")
if jsn.type == "success" and jsn.message == "EOF":
break
if jsn.type == "heartbeat" or jsn.payload is None:
continue
for line in jsn.payload.items:
Expand Down
59 changes: 44 additions & 15 deletions src/bentoml/_internal/cloud/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ def _init_deployment_files(
check_interval = 5
start_time = time.time()
console = spinner.console
spinner_text = "🔄 Preparing development environment - status: [green]{}[/]"
spinner_text = "🔄 Preparing codespace - status: [green]{}[/]"
status = self.get_status(False).status
while time.time() - start_time < timeout:
spinner.update(spinner_text.format(status))
Expand Down Expand Up @@ -855,7 +855,7 @@ def is_bento_changed(bento_info: Bento) -> bool:
console.print("✨ [green bold]Bento change detected[/]")
spinner.update("🔄 Pushing Bento to BentoCloud")
bento_api._do_push_bento(bento_info, upload_id, bare=True) # type: ignore
spinner.update("🔄 Updating deployment with new configuration")
spinner.update("🔄 Updating codespace with new configuration")
update_config = DeploymentConfigParameters(
bento=str(bento_info.tag)
if bento_changed
Expand All @@ -869,7 +869,7 @@ def is_bento_changed(bento_info: Bento) -> bool:
self = deployment_api.update(update_config)
target = self._refetch_target(False)
else:
spinner.update("🔄 Resetting deployment")
spinner.update("🔄 Resetting codespace")
self = deployment_api.void_update(self.name, self.cluster)
needs_update = bento_changed = False
requirements_hash, setup_md5 = self._init_deployment_files(
Expand All @@ -878,8 +878,7 @@ def is_bento_changed(bento_info: Bento) -> bool:
if endpoint_url is None:
endpoint_url = self.get_endpoint_urls(True)[0]
spinner.log(f"🌐 Endpoint: {endpoint_url}")
with self._tail_logs(spinner.console, stop_event):
spinner.update("👀 Watching for changes")
with self._tail_logs(spinner, stop_event):
for changes in watchfiles.watch(
*watch_dirs, watch_filter=watch_filter, stop_event=stop_event
):
Expand Down Expand Up @@ -956,35 +955,67 @@ def is_bento_changed(bento_info: Bento) -> bool:
DeploymentStatus.Unhealthy.value,
]:
console.print(
f'🚨 [bold red]Deployment "{self.name}" aborted. Current status: "{status}"[/]'
f'🚨 [bold red]Codespace "{self.name}" aborted. Current status: "{status}"[/]'
)
return
except KeyboardInterrupt:
spinner.log(
"\nWatcher stopped. Next steps:\n"
"* Attach to this deployment again:\n"
f" [blue]$ bentoml develop --attach {self.name} --cluster {self.cluster}[/]\n\n"
"* Attach to this codespace again:\n"
f" [blue]$ bentoml code --attach {self.name} --cluster {self.cluster}[/]\n\n"
"* Push the bento to BentoCloud:\n"
" [blue]$ bentoml build --push[/]\n\n"
"* Terminate the deployment:\n"
"* Shut down the codespace:\n"
f" [blue]$ bentoml deployment terminate {self.name} --cluster {self.cluster}[/]"
)
finally:
spinner.stop()

@contextlib.contextmanager
def _tail_logs(
self, console: Console, stop_event: Event
self, spinner: Spinner, stop_event: Event
) -> t.Generator[None, None, None]:
import itertools
from collections import defaultdict

spinner.update("🟡 👀 Watching for changes")
server_ready = False
console = spinner.console

def set_server_ready(is_ready: bool) -> None:
nonlocal server_ready
if is_ready is server_ready:
return
spinner.update(
"🟢 👀 Watching for changes"
if is_ready
else "🟡 👀 Watching for changes"
)
server_ready = is_ready

pods = self._client.v2.list_deployment_pods(self.name, self.cluster)
workers: list[Thread] = []

colors = itertools.cycle(["cyan", "yellow", "blue", "magenta", "green"])
runner_color: dict[str, str] = defaultdict(lambda: next(colors))

def heartbeat(event: Event, check_interval: float = 5.0) -> None:
from httpx import NetworkError
from httpx import TimeoutException

endpoint_url = self.get_endpoint_urls(False)[0]
while not event.is_set():
try:
resp = self._client.session.get(f"{endpoint_url}/readyz", timeout=5)
except (TimeoutException, NetworkError):
set_server_ready(False)
else:
if resp.is_success:
set_server_ready(True)
else:
set_server_ready(False)
event.wait(check_interval)

def pod_log_worker(pod: KubePodSchema, stop_event: Event) -> None:
current = ""
color = runner_color[pod.runner_name]
Expand All @@ -1008,14 +1039,12 @@ def pod_log_worker(pod: KubePodSchema, stop_event: Event) -> None:
current = line
break
console.print(f"[{color}]\\[{pod.runner_name}][/] {line}")
if (
"BentoMLDevSupervisor has been shut down" in line
and not stop_event.is_set()
):
stop_event.set()
if current:
console.print(f"[{color}]\\[{pod.runner_name}][/] {current}")

heartbeat_thread = Thread(target=heartbeat, args=(stop_event,))
heartbeat_thread.start()
workers.append(heartbeat_thread)
try:
for pod in pods:
if pod.labels.get("yatai.ai/is-bento-image-builder") == "true":
Expand Down
4 changes: 2 additions & 2 deletions src/bentoml/_internal/cloud/schemas/schemasv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ class KubePodSchema:
@attr.define
class LogSchema:
__forbid_extra_keys__ = False
items: t.List[str]
type: str
items: t.List[str] = attr.field(factory=list)
type: str = "append"


@attr.define
Expand Down
8 changes: 4 additions & 4 deletions src/bentoml_cli/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def develop_command(
if attach:
deployment = bentoml.deployment.get(attach)
else:
with console.status("Fetching deployments..."):
with console.status("Fetching codespaces..."):
current_user = _rest_client.v1.get_current_user()
if current_user is None:
raise CLIException("current user is not found")
Expand All @@ -287,9 +287,9 @@ def develop_command(
]

chosen = questionary.select(
message="Select a deployment to attach to or create a new one",
message="Select a codespace to attach to or create a new one",
choices=[{"name": d.name, "value": d} for d in deployments]
+ [{"name": "Create a new deployment", "value": "new"}],
+ [{"name": "Create a new codespace", "value": "new"}],
).ask()

if chosen == "new":
Expand All @@ -306,7 +306,7 @@ def develop_command(
else:
if env or secret:
rich.print(
"[yellow]Warning:[/] --env and --secret are ignored when attaching to an existing deployment"
"[yellow]Warning:[/] --env and --secret are ignored when attaching to an existing codespace"
)
deployment = t.cast(Deployment, chosen)
deployment.watch(bento_dir)
Expand Down

0 comments on commit 7178b2f

Please sign in to comment.