Skip to content

Commit

Permalink
Merge pull request #499 from latchbio/ayush/latch-attach
Browse files Browse the repository at this point in the history
attach to nf workdir
  • Loading branch information
ayushkamat authored Oct 18, 2024
2 parents 6faa03f + a8b33d2 commit 586eb2d
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 84 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ Types of changes

# Latch SDK Changelog

## 2.53.8 - 2024-10-18

### Added

* Nextflow
- add `latch nextflow attach` command to attach to a nextflow work directory

## 2.53.7 - 2024-10-16

### Added
Expand Down
15 changes: 14 additions & 1 deletion latch_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ def execute(
):
"""Drops the user into an interactive shell from within a task."""

from latch_cli.services.execute.main import exec
from latch_cli.services.k8s.execute import exec

exec(execution_id=execution_id, egn_id=egn_id, container_index=container_index)

Expand Down Expand Up @@ -999,6 +999,19 @@ def generate_entrypoint(
)


@nextflow.command("attach")
@click.option(
"--execution-id", "-e", type=str, help="Optional execution ID to inspect."
)
@requires_login
def attach(execution_id: Optional[str]):
"""Drops the user into an interactive shell to inspect the workdir of a nextflow execution."""

from latch_cli.services.k8s.attach import attach

attach(execution_id)


"""
POD COMMANDS
"""
Expand Down
File renamed without changes.
77 changes: 77 additions & 0 deletions latch_cli/services/k8s/attach.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio
import json
import secrets
import sys
from typing import Optional
from urllib.parse import urljoin, urlparse

import click
import websockets.client as websockets
import websockets.exceptions as ws_exceptions
from latch_sdk_config.latch import NUCLEUS_URL

from latch_cli.utils import get_auth_header

from .utils import get_pvc_info
from .ws_utils import forward_stdio


async def connect(execution_id: str, session_id: str):
async with websockets.connect(
urlparse(urljoin(NUCLEUS_URL, "/workflows/cli/attach-nf-workdir"))
._replace(scheme="wss")
.geturl(),
close_timeout=0,
extra_headers={"Authorization": get_auth_header()},
) as ws:
request = {"execution_id": int(execution_id), "session_id": session_id}

await ws.send(json.dumps(request))
data = await ws.recv()

msg = ""
try:
res = json.loads(data)
if "error" in res:
raise RuntimeError(res["error"])
except json.JSONDecodeError:
msg = "Unable to connect to pod - internal error."
except RuntimeError as e:
msg = str(e)

if msg != "":
raise RuntimeError(msg)

await forward_stdio(ws)


def get_session_id():
return secrets.token_bytes(8).hex()


def attach(execution_id: Optional[str] = None):
execution_id = get_pvc_info(execution_id)
session_id = get_session_id()

click.secho(
"Attaching to workdir - this may take a few seconds...", dim=True, italic=True
)

import termios
import tty

old_settings_stdin = termios.tcgetattr(sys.stdin.fileno())
tty.setraw(sys.stdin)

msg = ""
try:
asyncio.run(connect(execution_id, session_id))
except ws_exceptions.ConnectionClosedError as e:
msg = json.loads(e.reason)["error"]
except RuntimeError as e:
msg = str(e)
finally:
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings_stdin)

if msg != "":
click.secho(msg, fg="red")
78 changes: 78 additions & 0 deletions latch_cli/services/k8s/execute.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
import json
import sys
from typing import Optional
from urllib.parse import urljoin, urlparse

import websockets.client as websockets
from latch_sdk_config.latch import NUCLEUS_URL

from latch_cli.services.k8s.utils import (
ContainerNode,
EGNNode,
ExecutionInfoNode,
get_container_info,
get_egn_info,
get_execution_info,
)
from latch_cli.utils import get_auth_header

from .ws_utils import forward_stdio


async def connect(egn_info: EGNNode, container_info: Optional[ContainerNode]):
async with websockets.connect(
urlparse(urljoin(NUCLEUS_URL, "/workflows/cli/shell"))
._replace(scheme="wss")
.geturl(),
close_timeout=0,
extra_headers={"Authorization": get_auth_header()},
) as ws:
request = {
"egn_id": egn_info["id"],
"container_index": (
container_info["index"] if container_info is not None else None
),
}

await ws.send(json.dumps(request))
data = await ws.recv()

msg = ""
try:
res = json.loads(data)
if "error" in res:
raise RuntimeError(res["error"])
except json.JSONDecodeError:
msg = "Unable to connect to pod - internal error."
except RuntimeError as e:
msg = str(e)

if msg != "":
raise RuntimeError(msg)

await forward_stdio(ws)


def exec(
execution_id: Optional[str] = None,
egn_id: Optional[str] = None,
container_index: Optional[int] = None,
):
execution_info: Optional[ExecutionInfoNode] = None
if egn_id is None:
execution_info = get_execution_info(execution_id)

egn_info = get_egn_info(execution_info, egn_id)
container_info = get_container_info(egn_info, container_index)

import termios
import tty

old_settings_stdin = termios.tcgetattr(sys.stdin.fileno())
tty.setraw(sys.stdin)

try:
asyncio.run(connect(egn_info, container_info))
finally:
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, old_settings_stdin)
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from latch_cli.click_utils import bold, color
from latch_cli.menus import select_tui
from latch_cli.utils import current_workspace


# todo(ayush): put this into latch_sdk_gql
Expand Down Expand Up @@ -179,35 +178,39 @@ def get_execution_info(execution_id: Optional[str]) -> ExecutionInfoNode:
"""),
*fragments,
),
{"createdBy": current_workspace()},
{"createdBy": user_config.workspace_id},
)["runningExecutions"]

if len(res["nodes"]) == 0:
click.secho("You have no executions currently running.", dim=True)
nodes = res["nodes"]

if len(nodes) == 0:
click.secho(
f"You have no executions currently running.",
dim=True,
)
raise click.exceptions.Exit(0)

if len(res["nodes"]) == 1:
execution = res["nodes"][0]
if len(nodes) == 1:
execution = nodes[0]
click.secho(
"Selecting execution"
f" {color(execution['displayName'])} as it is"
" the only"
" one currently running in Workspace"
" the only one currently running in Workspace"
f" {color(workspace_str)}.",
)

return execution

selected_execution = select_tui(
"You have multiple executions running in this workspace"
f" ({color(workspace_str)}). Which"
" execution would you like to inspect?",
"You have multiple executions running in"
f" this workspace ({color(workspace_str)}). Which execution would you like to"
" inspect?",
[
{
"display_name": f'{x["displayName"]} ({x["workflow"]["displayName"]})',
"value": x,
}
for x in res["nodes"]
for x in nodes
],
clear_terminal=False,
)
Expand Down Expand Up @@ -368,3 +371,61 @@ def get_container_info(
raise click.exceptions.Exit(0)

return selected_container_info


class Node(TypedDict):
id: str
displayName: str


class nfAvailablePvcs(TypedDict):
nodes: List[Node]


def get_pvc_info(execution_id: Optional[str]) -> str:
if execution_id is not None:
return execution_id

workspace_str: str = user_config.workspace_name or user_config.workspace_id

res: nfAvailablePvcs = execute(
gql.gql("""
query NFWorkdirs($wsId: BigInt!) {
nfAvailablePvcs(argWsId: $wsId) {
nodes {
id
displayName
}
}
}
"""),
{"wsId": user_config.workspace_id},
)["nfAvailablePvcs"]

nodes = res["nodes"]

if len(nodes) == 0:
click.secho(
f"You have no available workdirs (all have expired).",
dim=True,
)
raise click.exceptions.Exit(0)

if len(nodes) == 1:
execution = nodes[0]
click.secho(
f"Selecting execution {color(execution['displayName'])} as it is the only"
f" one without an expired workDir in Workspace {color(workspace_str)}.",
)
return execution["id"]

selected_execution = select_tui(
"You have multiple available workDirs in this workspace"
f" ({color(workspace_str)}). Which execution would you like to attach to?",
options=[{"display_name": x["displayName"], "value": x["id"]} for x in nodes],
)
if selected_execution is None:
click.secho("No execution selected. Exiting.", dim=True)
raise click.exceptions.Exit(0)

return selected_execution
Loading

0 comments on commit 586eb2d

Please sign in to comment.