From 093c327f851ff6f65949867c2205bd8a0f495306 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=A9d=C3=A9ric=20Collonval?= Date: Thu, 19 Dec 2024 20:50:14 +0100 Subject: [PATCH] Use jupyter-kernel-client for the console (#32) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Frédéric Collonval --- .gitignore | 1 + datalayer_core/cli/base.py | 150 ++- datalayer_core/kernels/client.py | 1090 ------------------ datalayer_core/kernels/console/consoleapp.py | 207 +--- datalayer_core/kernels/manager.py | 206 +--- pyproject.toml | 6 + 6 files changed, 183 insertions(+), 1477 deletions(-) delete mode 100644 datalayer_core/kernels/client.py diff --git a/.gitignore b/.gitignore index d617374..fd084a4 100644 --- a/.gitignore +++ b/.gitignore @@ -129,6 +129,7 @@ dmypy.json *ystore.db +.yarn # Include !**/.*ignore diff --git a/datalayer_core/cli/base.py b/datalayer_core/cli/base.py index 1171d52..562436b 100644 --- a/datalayer_core/cli/base.py +++ b/datalayer_core/cli/base.py @@ -15,7 +15,8 @@ from rich.console import Console -from traitlets import Bool, Unicode +from traitlets import Bool, Unicode, default +from traitlets.config import LoggingConfigurable from datalayer_core.application import DatalayerApp, base_aliases, base_flags from datalayer_core.authn.http_server import get_token, USE_JUPYTER_SERVER_FOR_LOGIN @@ -43,93 +44,36 @@ ) -class DatalayerCLIBaseApp(DatalayerApp): - name = "datalayer_core" - - version = __version__ - - aliases = datalayer_aliases - - flags = datalayer_flags - +class DatalayerAuthMixin(LoggingConfigurable): user_handle = None - run_url = Unicode( - None, - allow_none=False, - config=True, - help="Datalayer RUN URL." - ) + run_url = Unicode("", allow_none=False, config=True, help="Datalayer RUN URL.") + + @default("run_url") def _run_url_default(self): return os.environ.get("DATALAYER_RUN_URL", "https://prod1.datalayer.run") - token = Unicode( - None, - allow_none=True, - config=True, - help="User access token." - ) + token = Unicode(None, allow_none=True, config=True, help="User access token.") + + @default("token") def _token_default(self): return os.environ.get("DATALAYER_TOKEN", None) - external_token = Unicode( - None, - allow_none=True, - config=True, - help="External token." - ) + external_token = Unicode(None, allow_none=True, config=True, help="External token.") + + @default("external_token") def _external_token_default(self): return os.environ.get("DATALAYER_EXTERNAL_TOKEN", None) no_browser = Bool( - False, - config=True, - help="If true, prompt for login on the CLI only." + False, config=True, help="If true, prompt for login on the CLI only." ) - _is_initialized = False - - _requires_auth = True - - - def initialize(self, argv=None): - super().initialize(argv) - - if self.token is None: - self.user_handle = None - - if not getattr(self, "_dispatching", False): - super().initialize(argv) - - if DatalayerCLIBaseApp._is_initialized: - return - - DatalayerCLIBaseApp._is_initialized = True - - # Log the user. - if self._requires_auth: - self._log_in() - - self.log.debug( - "Datalayer - Version %s - Connected as %s on %s", - self.version, - self.user_handle, - self.run_url, - ) - console = Console() - console.print() - console.print(f"Datalayer - Version [bold cyan]{self.version}[/bold cyan] - Connected as [bold yellow]{self.user_handle}[/bold yellow] on [i]{self.run_url}[/i]") - console.print() - - def _fetch(self, request: str, **kwargs: t.Any) -> requests.Response: """Fetch a network resource as a context manager.""" try: return fetch( - request, - token=self.token, - external_token=self.external_token, - **kwargs + request, token=self.token, external_token=self.external_token, **kwargs ) except requests.exceptions.Timeout as e: raise e @@ -140,7 +84,6 @@ def _fetch(self, request: str, **kwargs: t.Any) -> requests.Response: f"Failed to request the URL {request if isinstance(request, str) else request.url!s}" ) from e - def _log_in(self) -> None: """Login the application with the Identity Provider.""" @@ -174,16 +117,17 @@ def _log_in(self) -> None: self.token = token try: import keyring + keyring.set_password(self.run_url, "access_token", self.token) self.log.debug("Store token with keyring %s", token) except ImportError as e: self.log.debug("Unable to import keyring.", exc_info=e) - if self.token is None: # Look for cached value. try: import keyring + stored_token = keyring.get_password(self.run_url, "access_token") if stored_token: content = {} @@ -200,11 +144,15 @@ def _log_in(self) -> None: ) content = response.json() except requests.exceptions.Timeout as error: - self.log.warning("Request to get the user profile timed out.", exc_info=error) + self.log.warning( + "Request to get the user profile timed out.", exc_info=error + ) except requests.exceptions.HTTPError as error: if error.response.status_code == 401: # Invalidate the stored token. - self.log.debug(f"Delete invalid cached token for {self.run_url}") + self.log.debug( + f"Delete invalid cached token for {self.run_url}" + ) self._log_out() else: self.log.warning( @@ -252,8 +200,9 @@ def _log_in(self) -> None: port = find_http_port() if USE_JUPYTER_SERVER_FOR_LOGIN == False: self.__launch_browser(port) - # Do we need to clear the instanch while using raw http server? - self.clear_instance() + # Do we need to clear the instance while using raw http server? + if hasattr(self, "clear_instance"): + self.clear_instance() ans = get_token(self.run_url, port, self.log) if ans is None: @@ -270,12 +219,12 @@ def _log_in(self) -> None: self.token = token try: import keyring + keyring.set_password(self.run_url, "access_token", self.token) self.log.debug("Store token with keyring %s", token) except ImportError as e: self.log.debug("Unable to import keyring.", exc_info=e) - def _ask_credentials(self) -> dict: questions = [ { @@ -315,13 +264,13 @@ def _ask_credentials(self) -> dict: ] return questionary.prompt(questions) - def _log_out(self) -> None: """Log out from the Identity Provider.""" self.token = None self.user_handle = None try: import keyring + if keyring.get_credential(self.run_url, "access_token") is not None: keyring.delete_password(self.run_url, "access_token") except ImportError as e: @@ -354,3 +303,48 @@ def target(): browser.open(address) threading.Thread(target=target).start() + + +class DatalayerCLIBaseApp(DatalayerApp, DatalayerAuthMixin): + name = "datalayer_core" + + version = __version__ + + aliases = datalayer_aliases + + flags = datalayer_flags + + _is_initialized = False + + _requires_auth = True + + def initialize(self, argv=None): + super().initialize(argv) + + if self.token is None: + self.user_handle = None + + if not getattr(self, "_dispatching", False): + super().initialize(argv) + + if DatalayerCLIBaseApp._is_initialized: + return + + DatalayerCLIBaseApp._is_initialized = True + + # Log the user. + if self._requires_auth: + self._log_in() + + self.log.debug( + "Datalayer - Version %s - Connected as %s on %s", + self.version, + self.user_handle, + self.run_url, + ) + console = Console() + console.print() + console.print( + f"Datalayer - Version [bold cyan]{self.version}[/bold cyan] - Connected as [bold yellow]{self.user_handle}[/bold yellow] on [i]{self.run_url}[/i]" + ) + console.print() diff --git a/datalayer_core/kernels/client.py b/datalayer_core/kernels/client.py deleted file mode 100644 index 628c3a9..0000000 --- a/datalayer_core/kernels/client.py +++ /dev/null @@ -1,1090 +0,0 @@ -# Copyright (c) Datalayer Development Team. -# Distributed under the terms of the Modified BSD License. - -"""A remote kernel client.""" - -from __future__ import annotations - -import logging -import os -import queue -import pprint -import signal -import sys -import time -from threading import Event, Thread -import typing as t -from getpass import getpass -from urllib.parse import urlencode -import websocket - -from jupyter_client.adapter import adapt -from jupyter_client.channelsabc import ChannelABC, HBChannelABC -from jupyter_client.client import validate_string_dict -from jupyter_client.clientabc import KernelClientABC -from jupyter_client.session import Message, Session -from jupyter_client.jsonutil import extract_dates -from jupyter_server.services.kernels.connection.base import ( - deserialize_msg_from_ws_v1, - serialize_msg_to_ws_v1, -) -from traitlets.log import get_logger - -logger_ = logging.getLogger(__name__) -# websocket.enableTrace(True) - -REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", 10)) - - -class WSSession(Session): - def serialize(self, msg: dict[str, t.Any]) -> list[bytes]: - """Serialize the message components to bytes. - - This is roughly the inverse of deserialize. The serialize/deserialize - methods work with full message lists, whereas pack/unpack work with - the individual message parts in the message list. - - Parameters - ---------- - msg : dict or Message - The next message dict as returned by the self.msg method. - - Returns - ------- - msg_list : list - The list of bytes objects to be sent with the format:: - - [p_header, p_parent, - p_metadata, p_content, buffer1, buffer2, ...] - - In this list, the ``p_*`` entities are the packed or serialized - versions, so if JSON is used, these are utf8 encoded JSON strings. - """ - content = msg.get("content", {}) - if content is None: - content = self.none - elif isinstance(content, dict): - content = self.pack(content) - elif isinstance(content, bytes): - # content is already packed, as in a relayed message - pass - elif isinstance(content, str): - # should be bytes, but JSON often spits out unicode - content = content.encode("utf8") - else: - raise TypeError("Content incorrect type: %s" % type(content)) - - real_message = [ - self.pack(msg["header"]), - self.pack(msg["parent_header"]), - self.pack(msg["metadata"]), - content, - ] - - to_send = [] - - to_send.extend(real_message) - - return to_send - - def deserialize( - self, - msg_list: list[bytes], - content: bool = True, - ) -> dict[str, t.Any]: - """Unserialize a msg_list to a nested message dict. - - This is roughly the inverse of serialize. The serialize/deserialize - methods work with full message lists, whereas pack/unpack work with - the individual message parts in the message list. - - Parameters - ---------- - msg_list : list of bytes - The list of message parts of the form [p_header,p_parent, - p_metadata,p_content,buffer1,buffer2,...]. - content : bool (True) - Whether to unpack the content dict (True), or leave it packed - (False). - - Returns - ------- - msg : dict - The nested message dict with top-level keys [header, parent_header, - content, buffers]. The buffers are returned as memoryviews. - """ - minlen = 4 - message = {} - msg_list = t.cast(t.List[bytes], msg_list) - if not len(msg_list) >= minlen: - msg = "malformed message, must have at least %i elements" % minlen - raise TypeError(msg) - header = self.unpack(msg_list[0]) - message["header"] = extract_dates(header) - message["msg_id"] = header["msg_id"] - message["msg_type"] = header["msg_type"] - message["parent_header"] = extract_dates(self.unpack(msg_list[1])) - message["metadata"] = self.unpack(msg_list[2]) - if content: - message["content"] = self.unpack(msg_list[3]) - else: - message["content"] = msg_list[3] - buffers = [memoryview(b) for b in msg_list[4:]] - message["buffers"] = buffers - if self.debug: - pprint.pprint("WSSession.deserialize") - pprint.pprint(message) # noqa - # adapt to the current version - return adapt(message) - - def send( - self, - stream: websocket.WebSocketApp, - channel: str, - msg_or_type: dict[str, t.Any] | str, - content: dict[str, t.Any] | None = None, - parent: dict[str, t.Any] | None = None, - buffers: list[bytes] | None = None, - header: dict[str, t.Any] | None = None, - metadata: dict[str, t.Any] | None = None, - ) -> dict[str, t.Any] | None: - """Build and send a message via stream or socket. - - The message format used by this function internally is as follows: - - [p_header,p_parent,p_content, - buffer1,buffer2,...] - - The serialize/deserialize methods convert the nested message dict into this - format. - - Parameters - ---------- - - stream : websocket.WebSocketApp - The websocket object used to send the data. - channel : str - Channel name - msg_or_type : str or Message/dict - Normally, msg_or_type will be a msg_type unless a message is being - sent more than once. If a header is supplied, this can be set to - None and the msg_type will be pulled from the header. - - content : dict or None - The content of the message (ignored if msg_or_type is a message). - header : dict or None - The header dict for the message (ignored if msg_to_type is a message). - parent : Message or dict or None - The parent or parent header describing the parent of this message - (ignored if msg_or_type is a message). - metadata : dict or None - The metadata describing the message - buffers : list or None - The already-serialized buffers to be appended to the message. - - - Returns - ------- - msg : dict - The constructed message. - """ - - if isinstance(msg_or_type, (Message, dict)): - # We got a Message or message dict, not a msg_type so don't - # build a new Message. - msg = msg_or_type - buffers = buffers or msg.get("buffers", []) - else: - msg = self.msg( - msg_or_type, - content=content, - parent=parent, - header=header, - metadata=metadata, - ) - if self.check_pid and os.getpid() != self.pid: - get_logger().warning( - "WARNING: attempted to send message from fork\n%s", msg - ) - return None - buffers = [] if buffers is None else buffers - for idx, buf in enumerate(buffers): - if isinstance(buf, memoryview): - view = buf - else: - try: - # check to see if buf supports the buffer protocol. - view = memoryview(buf) - except TypeError as e: - emsg = "Buffer objects must support the buffer protocol." - raise TypeError(emsg) from e - # memoryview.contiguous is new in 3.3, - # just skip the check on Python 2 - if hasattr(view, "contiguous") and not view.contiguous: - # zmq requires memoryviews to be contiguous - raise ValueError("Buffer %i (%r) is not contiguous" % (idx, buf)) - - if self.adapt_version: - msg = adapt(msg, self.adapt_version) - to_send = self.serialize(msg) - to_send.extend(buffers) - - stream.send_bytes(serialize_msg_to_ws_v1(to_send, channel)) - - if self.debug: - pprint.pprint("WSSession.send") - pprint.pprint(msg) # noqa - pprint.pprint(to_send) # noqa - pprint.pprint(buffers) # noqa - - return msg - - def send_raw( - self, - stream: websocket.WebSocketApp, - msg_list: list, - flags: int = 0, - copy: bool = True, - ident: bytes | list[bytes] | None = None, - ) -> None: - """Send a raw message via ident path. - - This method is used to send a already serialized message. - - Parameters - ---------- - stream : websocket.WebSocketApp - The websocket to use for sending the message. - msg_list : list - The serialized list of messages to send. This only includes the - [p_header,p_parent,p_metadata,p_content,buffer1,buffer2,...] portion of - the message. - ident : ident or list - A single ident or a list of idents to use in sending. - """ - raise NotImplementedError("WSSession.send_raw should not be needed") - - def recv( - self, - socket: websocket.WebSocketApp, - mode: int = 0, # FIXME - content: bool = True, - copy: bool = True, - ) -> tuple[list[bytes] | None, dict[str, t.Any] | None]: - """Receive and unpack a message. - - Parameters - ---------- - socket : ZMQStream or Socket - The socket or stream to use in receiving. - - Returns - ------- - [idents], msg - [idents] is a list of idents and msg is a nested message dict of - same format as self.msg returns. - """ - raise NotImplementedError("WSSession.recv should not be needed") - - -class WSChannel(ChannelABC): - def __init__( - self, - channel_name: str, - socket: websocket.WebSocketApp, - session: WSSession, - messages_queue: queue.Queue, - log: logging.Logger, - ) -> None: - """Create a channel. - - Parameters - ---------- - channel_name : str - Channel name - socket : :class:`websocket.WebSocketApp` - The websocket connection. - session : :class:`session.Session` - The session to use. - """ - self.channel_name = channel_name - self.socket: websocket.WebSocketApp = socket - self.session = session - self._messages = messages_queue - self.log = log - - def start(self) -> None: - """Start the channel.""" - pass - - def stop(self) -> None: - """Stop the channel.""" - if not self._messages.empty(): - # If unprocessed messages are detected, drain the queue collecting non-status - # messages. If any remain that are not 'shutdown_reply' and this is not iopub - # go ahead and issue a warning. - msgs = [] - while self._messages.qsize(): - msg = self._messages.get_nowait() - if msg["msg_type"] != "status": - msgs.append(msg["msg_type"]) - if self.channel_name == "iopub" and "shutdown_reply" in msgs: - return - if len(msgs): - self.log.warning( - f"Stopping channel '{self.channel_name}' with {len(msgs)} unprocessed non-status messages: {msgs}." - ) - - def is_alive(self) -> bool: - """Test whether the channel is alive.""" - return True # FIXME - - def send(self, msg: t.Dict[str, t.Any]) -> None: - """Pass a message to the websocket to send""" - self.log.debug( - "Sending message on channel: {channel}, msg_id: {msg_id}, msg_type: {msg_type}".format( - channel=self.channel_name, - **msg, - ) - ) - self.session.send(self.socket, self.channel_name, msg) - - def get_msg(self, timeout: float | None = None) -> dict: - return self._messages.get(timeout=timeout) - - def get_msgs(self) -> list[dict[str, t.Any]]: - msgs = [] - while not self._messages.empty(): - msgs.append(self._messages.get_nowait()) - - return msgs - - def msg_ready(self) -> bool: - """Is there a message that has been received?""" - return not self._messages.empty() - - -class HBWSChannel(HBChannelABC, WSChannel): - # FIXME implement an heartbeat - - @property - def time_to_dead(self) -> float: - return 1.0 # Taken from jupyter_client.channels.HBChannel - - def pause(self) -> None: - """Pause the heartbeat channel.""" - pass # FIXME - - def unpause(self) -> None: - """Unpause the heartbeat channel.""" - pass # FIXME - - def is_beating(self) -> bool: - """Test whether the channel is beating.""" - return True # FIXME - - -class KernelClient(KernelClientABC): - """A kernel client class.""" - - DEFAULT_INTERRUPT_WAIT = 1 - - def __init__( - self, - kernel_ws_endpoint: str, - token: str | None = None, - username: str | None = None, - timeout=REQUEST_TIMEOUT, - log=None, - **kwargs, - ): - """Initialize the client.""" - self.allow_stdin = False # Will change when the stdin channel opens - self.shutting_down = False - self.kernel_ws_endpoint = kernel_ws_endpoint - self.token = token - self.log = log or logger_ - self.kernel_socket: websocket.WebSocketApp | None = None - self.connection_thread: Thread | None = None - self.connection_ready = Event() - self._message_received = Event() - self.interrupt_thread = None - self.timeout = timeout - self.session = WSSession() - self.session.username = username or "" - # self.session.debug = True - - self._shell_channel: WSChannel | None = None - self._iopub_channel: WSChannel | None = None - self._stdin_channel: WSChannel | None = None - self._hb_channel: HBWSChannel | None = None - self._control_channel: WSChannel | None = None - self._shell_msg_queue = queue.Queue() - self._iopub_msg_queue = queue.Queue() - self._stdin_msg_queue = queue.Queue() - self._hb_msg_queue = queue.Queue() - self._control_msg_queue = queue.Queue() - - def __del__(self): - if not self.shutting_down: - self.stop_channels() - - @property - def kernel(self) -> t.Any: - # FIXME this is a strange abstract property - # because it does not exists on jupyter_client.client.KernelClient - return None - - @property - def shell_channel_class(self) -> type[WSChannel]: - return WSChannel - - @property - def iopub_channel_class(self) -> type[WSChannel]: - return WSChannel - - @property - def hb_channel_class(self) -> type[HBWSChannel]: - return HBWSChannel - - @property - def stdin_channel_class(self) -> type[WSChannel]: - return WSChannel - - @property - def control_channel_class(self) -> type[WSChannel]: - return WSChannel - - def start_channels( - self, - shell: bool = True, - iopub: bool = True, - stdin: bool = True, - hb: bool = True, - control: bool = True, - ) -> None: - """Start the channels for the client.""" - self.log.debug(f"Connecting kernel client to {self.kernel_ws_endpoint}") - - url = f"{self.kernel_ws_endpoint}" - if self.token is not None: - url += "?" + urlencode( - {"session_id": self.session.session, "token": self.token} - ) - self.kernel_socket = websocket.WebSocketApp( - url, - header=["User-Agent: Datalayer Console"], - # Use the new optimized protocol - subprotocols=["v1.kernel.websocket.jupyter.org"], - on_close=self._on_close, - on_open=self._on_open, - on_message=self._on_message, - ) - self.connection_thread = Thread(target=self._run_websocket) - self.connection_thread.start() - - self.connection_ready.wait(timeout=self.timeout) - if iopub: - self.iopub_channel.start() - if shell: - self.shell_channel.start() - if stdin: - self.stdin_channel.start() - self.allow_stdin = True - else: - self.allow_stdin = False - if hb: - self.hb_channel.start() - if control: - self.control_channel.start() - - def stop_channels(self) -> None: - """Stop the channels for the client.""" - # Terminate thread, close socket and clear queues. - self.shutting_down = True - - if self.kernel_socket: - self.kernel_socket.close() - self.kernel_socket = None - - if self.connection_thread is not None and self.connection_thread.is_alive(): - self.connection_thread.join(self.timeout) - if self.connection_thread.is_alive(): - self.log.warning("Failed to stop websocket connection thread.") - self.connection_thread = None - - @property - def channels_running(self) -> bool: - """Are any of the channels created and running?""" - return self.connection_ready.is_set() - - @property - def shell_channel(self) -> WSChannel: - """Get the shell channel object for this kernel.""" - if self._shell_channel is None: - assert self.kernel_socket is not None - self._shell_channel = self.shell_channel_class( - "shell", - self.kernel_socket, - self.session, - self._shell_msg_queue, - self.log, - ) - return self._shell_channel - - @property - def iopub_channel(self) -> WSChannel: - if self._iopub_channel is None: - assert self.kernel_socket is not None - self._iopub_channel = self.iopub_channel_class( - "iopub", - self.kernel_socket, - self.session, - self._iopub_msg_queue, - self.log, - ) - return self._iopub_channel - - @property - def stdin_channel(self) -> WSChannel: - if self._stdin_channel is None: - assert self.kernel_socket is not None - self._stdin_channel = self.stdin_channel_class( - "stdin", - self.kernel_socket, - self.session, - self._stdin_msg_queue, - self.log, - ) - return self._stdin_channel - - @property - def hb_channel(self) -> HBWSChannel: - if self._hb_channel is None: - assert self.kernel_socket is not None - self._hb_channel = self.hb_channel_class( - "hb", self.kernel_socket, self.session, self._hb_msg_queue, self.log - ) - return self._hb_channel - - @property - def control_channel(self) -> WSChannel: - if self._control_channel is None: - assert self.kernel_socket is not None - self._control_channel = self.control_channel_class( - "control", - self.kernel_socket, - self.session, - self._control_msg_queue, - self.log, - ) - return self._control_channel - - def get_shell_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]: - """Get a message from the shell channel""" - return self.shell_channel.get_msg(*args, **kwargs) - - def get_iopub_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]: - """Get a message from the iopub channel""" - return self.iopub_channel.get_msg(*args, **kwargs) - - def get_stdin_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]: - """Get a message from the stdin channel""" - return self.stdin_channel.get_msg(*args, **kwargs) - - def get_control_msg(self, *args: t.Any, **kwargs: t.Any) -> t.Dict[str, t.Any]: - """Get a message from the control channel""" - return self.control_channel.get_msg(*args, **kwargs) - - def complete(self, code: str, cursor_pos: t.Optional[int] = None) -> str: - """Tab complete text in the kernel's namespace. - - Parameters - ---------- - code : str - The context in which completion is requested. - Can be anything between a variable name and an entire cell. - cursor_pos : int, optional - The position of the cursor in the block of code where the completion was requested. - Default: ``len(code)`` - - Returns - ------- - The msg_id of the message sent. - """ - if cursor_pos is None: - cursor_pos = len(code) - content = {"code": code, "cursor_pos": cursor_pos} - msg = self.session.msg("complete_request", content) - self.shell_channel.send(msg) - return msg["header"]["msg_id"] - - def inspect( - self, code: str, cursor_pos: t.Optional[int] = None, detail_level: int = 0 - ) -> str: - """Get metadata information about an object in the kernel's namespace. - - It is up to the kernel to determine the appropriate object to inspect. - - Parameters - ---------- - code : str - The context in which info is requested. - Can be anything between a variable name and an entire cell. - cursor_pos : int, optional - The position of the cursor in the block of code where the info was requested. - Default: ``len(code)`` - detail_level : int, optional - The level of detail for the introspection (0-2) - - Returns - ------- - The msg_id of the message sent. - """ - if cursor_pos is None: - cursor_pos = len(code) - content = { - "code": code, - "cursor_pos": cursor_pos, - "detail_level": detail_level, - } - msg = self.session.msg("inspect_request", content) - self.shell_channel.send(msg) - return msg["header"]["msg_id"] - - def history( - self, - raw: bool = True, - output: bool = False, - hist_access_type: str = "range", - **kwargs: t.Any, - ) -> str: - """Get entries from the kernel's history list. - - Parameters - ---------- - raw : bool - If True, return the raw input. - output : bool - If True, then return the output as well. - hist_access_type : str - 'range' (fill in session, start and stop params), 'tail' (fill in n) - or 'search' (fill in pattern param). - - session : int - For a range request, the session from which to get lines. Session - numbers are positive integers; negative ones count back from the - current session. - start : int - The first line number of a history range. - stop : int - The final (excluded) line number of a history range. - - n : int - The number of lines of history to get for a tail request. - - pattern : str - The glob-syntax pattern for a search request. - - Returns - ------- - The ID of the message sent. - """ - if hist_access_type == "range": - kwargs.setdefault("session", 0) - kwargs.setdefault("start", 0) - content = dict( - raw=raw, output=output, hist_access_type=hist_access_type, **kwargs - ) - msg = self.session.msg("history_request", content) - self.shell_channel.send(msg) - return msg["header"]["msg_id"] - - def kernel_info(self) -> str: - """Request kernel info - - Returns - ------- - The msg_id of the message sent - """ - msg = self.session.msg("kernel_info_request") - self.shell_channel.send(msg) - return msg["header"]["msg_id"] - - def comm_info(self, target_name: t.Optional[str] = None) -> str: - """Request comm info - - Returns - ------- - The msg_id of the message sent - """ - content = {} if target_name is None else {"target_name": target_name} - msg = self.session.msg("comm_info_request", content) - self.shell_channel.send(msg) - return msg["header"]["msg_id"] - - def execute( - self, - code: str, - silent: bool = False, - store_history: bool = True, - user_expressions: t.Optional[t.Dict[str, t.Any]] = None, - allow_stdin: t.Optional[bool] = None, - stop_on_error: bool = True, - ) -> str: - """Execute code in the kernel. - - Parameters - ---------- - code : str - A string of code in the kernel's language. - - silent : bool, optional (default False) - If set, the kernel will execute the code as quietly possible, and - will force store_history to be False. - - store_history : bool, optional (default True) - If set, the kernel will store command history. This is forced - to be False if silent is True. - - user_expressions : dict, optional - A dict mapping names to expressions to be evaluated in the user's - dict. The expression values are returned as strings formatted using - :func:`repr`. - - allow_stdin : bool, optional (default self.allow_stdin) - Flag for whether the kernel can send stdin requests to frontends. - - Some frontends (e.g. the Notebook) do not support stdin requests. - If raw_input is called from code executed from such a frontend, a - StdinNotImplementedError will be raised. - - stop_on_error: bool, optional (default True) - Flag whether to abort the execution queue, if an exception is encountered. - - Returns - ------- - The msg_id of the message sent. - """ - if user_expressions is None: - user_expressions = {} - if allow_stdin is None: - allow_stdin = self.allow_stdin - - # Don't waste network traffic if inputs are invalid - if not isinstance(code, str): - raise ValueError("code %r must be a string" % code) - validate_string_dict(user_expressions) - - # Create class for content/msg creation. Related to, but possibly - # not in Session. - content = { - "code": code, - "silent": silent, - "store_history": store_history, - "user_expressions": user_expressions, - "allow_stdin": allow_stdin, - "stop_on_error": stop_on_error, - } - msg = self.session.msg("execute_request", content) - self.shell_channel.send(msg) - return msg["header"]["msg_id"] - - def execute_interactive( - self, - code: str, - silent: bool = False, - store_history: bool = True, - user_expressions: t.Optional[t.Dict[str, t.Any]] = None, - allow_stdin: t.Optional[bool] = None, - stop_on_error: bool = True, - timeout: t.Optional[float] = None, - output_hook: t.Optional[t.Callable] = None, - stdin_hook: t.Optional[t.Callable] = None, - ) -> t.Dict[str, t.Any]: - """Execute code in the kernel interactively - - Output will be redisplayed, and stdin prompts will be relayed as well. - - You can pass a custom output_hook callable that will be called - with every IOPub message that is produced instead of the default redisplay. - - Parameters - ---------- - code : str - A string of code in the kernel's language. - - silent : bool, optional (default False) - If set, the kernel will execute the code as quietly possible, and - will force store_history to be False. - - store_history : bool, optional (default True) - If set, the kernel will store command history. This is forced - to be False if silent is True. - - user_expressions : dict, optional - A dict mapping names to expressions to be evaluated in the user's - dict. The expression values are returned as strings formatted using - :func:`repr`. - - allow_stdin : bool, optional (default self.allow_stdin) - Flag for whether the kernel can send stdin requests to frontends. - - stop_on_error: bool, optional (default True) - Flag whether to abort the execution queue, if an exception is encountered. - - timeout: float or None (default: None) - Timeout to use when waiting for a reply - - output_hook: callable(msg) - Function to be called with output messages. - If not specified, output will be redisplayed. - - stdin_hook: callable(msg) - Function to be called with stdin_request messages. - If not specified, input/getpass will be called. - - Returns - ------- - reply: dict - The reply message for this request - """ - if not self.iopub_channel.is_alive(): - emsg = "IOPub channel must be running to receive output" - raise RuntimeError(emsg) - if allow_stdin is None: - allow_stdin = self.allow_stdin - if allow_stdin and not self.stdin_channel.is_alive(): - emsg = "stdin channel must be running to allow input" - raise RuntimeError(emsg) - - if stdin_hook is None: - stdin_hook = self._stdin_hook_default - if output_hook is None: - # default: redisplay plain-text outputs - output_hook = self._output_hook_default - - # set deadline based on timeout - if timeout is not None: - deadline = time.monotonic() + timeout - - self._message_received.clear() - if self.iopub_channel.msg_ready(): - # Flush the message - self.iopub_channel.get_msgs() - msg_id = self.execute( - code, - silent=silent, - store_history=store_history, - user_expressions=user_expressions, - allow_stdin=allow_stdin, - stop_on_error=stop_on_error, - ) - - # wait for output and redisplay it - while True: - if not self.connection_ready.is_set(): - raise RuntimeError("Connection was lost.") - - if timeout is not None: - timeout = max(0, deadline - time.monotonic()) - - self._message_received.wait(timeout=timeout) - - if allow_stdin: - try: - req = self.stdin_channel.get_msg(timeout=0) - stdin_hook(req) - continue - except (queue.Empty, TimeoutError): - ... - - try: - msg = self.iopub_channel.get_msg(timeout=0) - except (queue.Empty, TimeoutError): - if not self.iopub_channel.msg_ready() and ( - not allow_stdin or not self.stdin_channel.msg_ready() - ): - self._message_received.clear() - continue - - if msg["parent_header"].get("msg_id") != msg_id: - self.log.debug(f"Ignoring message not from request: {msg!s}") - continue - output_hook(msg) - - # stop on idle - if ( - msg["header"]["msg_type"] == "status" - and msg["content"]["execution_state"] == "idle" - ): - break - - # output is done, get the reply - if timeout is not None: - timeout = max(0, deadline - time.monotonic()) - return self._recv_reply(msg_id, timeout=timeout) - - def is_alive(self) -> bool: - """Is the kernel process still running?""" - if self._hb_channel is not None: - # We don't have access to the KernelManager, - # so we use the heartbeat. - return self._hb_channel.is_beating() - # no heartbeat and not local, we can't tell if it's running, - # so naively return True - return True - - def is_complete(self, code: str) -> str: - """Ask the kernel whether some code is complete and ready to execute. - - Returns - ------- - The ID of the message sent. - """ - msg = self.session.msg("is_complete_request", {"code": code}) - self.shell_channel.send(msg) - return msg["header"]["msg_id"] - - def input(self, string: str) -> None: - """Send a string of raw input to the kernel. - - This should only be called in response to the kernel sending an - ``input_request`` message on the stdin channel. - - Returns - ------- - The ID of the message sent. - """ - content = {"value": string} - msg = self.session.msg("input_reply", content) - self.stdin_channel.send(msg) - - def shutdown(self, restart: bool = False) -> str: - """Request an immediate kernel shutdown on the control channel. - - Upon receipt of the (empty) reply, client code can safely assume that - the kernel has shut down and it's safe to forcefully terminate it if - it's still alive. - - The kernel will send the reply via a function registered with Python's - atexit module, ensuring it's truly done as the kernel is done with all - normal operation. - - Returns - ------- - The msg_id of the message sent - """ - # Send quit message to kernel. Once we implement kernel-side setattr, - # this should probably be done that way, but for now this will do. - msg = self.session.msg("shutdown_request", {"restart": restart}) - self.control_channel.send(msg) - return msg["header"]["msg_id"] - - def _on_open(self, _: websocket.WebSocketApp): - self.log.debug("Websocket connection is ready.") - self.connection_ready.set() - - def _on_close(self, _: websocket.WebSocketApp, close_status_code, close_msg): - msg = "Websocket connection is closed" - if close_status_code or close_msg: - self.log.info("%s: %s %s", msg, close_status_code, close_msg) - else: - self.log.debug(msg) - self.connection_ready.clear() - - def _on_message(self, _: websocket.WebSocketApp, message: bytes) -> None: - channel, msg_list = deserialize_msg_from_ws_v1(message) - deserialize_msg = self.session.deserialize(msg_list) - self.log.debug( - "Received message on channel: {channel}, msg_id: {msg_id}, msg_type: {msg_type}".format( - channel=channel, - **(deserialize_msg or {"msg_id": "null", "msg_type": "null"}), - ) - ) - - getattr(self, f"_{channel}_msg_queue").put_nowait(deserialize_msg) - self._message_received.set() - - def _run_websocket(self) -> None: - try: - self.kernel_socket.run_forever(ping_interval=60, reconnect=5) - except ValueError as e: - self.log.error( - "Unable to open websocket connection with %s", - self.kernel_socket.url, - exc_info=e, - ) - except BaseException as e: - self.log.error("Websocket listener thread stopped.", exc_info=e) - - def _output_hook_default(self, msg: t.Dict[str, t.Any]) -> None: - """Default hook for redisplaying plain-text output""" - sys.stdout.flush() - sys.stderr.flush() - msg_type = msg["header"]["msg_type"] - content = msg["content"] - if msg_type == "stream": - stream = getattr(sys, content["name"]) - stream.write(content["text"]) - elif msg_type in ("display_data", "execute_result"): - sys.stdout.write(content["data"].get("text/plain", "")) - sys.stdout.write("\n") - sys.stdout.flush() - elif msg_type == "error": - sys.stderr.write("\n".join(content["traceback"])) - - def _stdin_hook_default(self, msg: t.Dict[str, t.Any]) -> None: - """Handle an input request""" - content = msg["content"] - prompt = getpass if content.get("password", False) else input - - # wrap SIGINT handler - real_handler = signal.getsignal(signal.SIGINT) - - def double_int(sig, frame): - # call real handler (forwards sigint to kernel), - # then raise local interrupt, stopping local raw_input - real_handler(sig, frame) - raise KeyboardInterrupt - - signal.signal(signal.SIGINT, double_int) - - try: - raw_data = prompt(content["prompt"]) # type:ignore[operator] - except EOFError: - # turn EOFError into EOF character - raw_data = "\x04" - except KeyboardInterrupt: - sys.stdout.write("\n") - return - finally: - # restore SIGINT handler - signal.signal(signal.SIGINT, real_handler) - - # only send stdin reply if there *was not* another request - # or execution finished while we were reading. - if not (self.stdin_channel.msg_ready() or self.shell_channel.msg_ready()): - self.input(raw_data) - - def _recv_reply( - self, msg_id: str, timeout: t.Optional[float] = None, channel: str = "shell" - ) -> t.Dict[str, t.Any]: - """Receive and return the reply for a given request""" - if timeout is not None: - deadline = time.monotonic() + timeout - while True: - if timeout is not None: - timeout = max(0, deadline - time.monotonic()) - try: - if channel == "control": - reply = self.control_channel.get_msg(timeout=timeout) - else: - reply = self.shell_channel.get_msg(timeout=timeout) - except queue.Empty as e: - msg = "Timeout waiting for reply" - raise TimeoutError(msg) from e - if reply["parent_header"].get("msg_id") != msg_id: - self.log.debug(f"Ignoring message not from request: {msg!s}") - continue - return reply diff --git a/datalayer_core/kernels/console/consoleapp.py b/datalayer_core/kernels/console/consoleapp.py index d3ae6d4..97e86fe 100644 --- a/datalayer_core/kernels/console/consoleapp.py +++ b/datalayer_core/kernels/console/consoleapp.py @@ -1,188 +1,105 @@ # Copyright (c) Datalayer Development Team. # Distributed under the terms of the Modified BSD License. -import signal -import sys +import typing as t -from traitlets import CBool, Dict, Type, Unicode -from traitlets.config import catch_config_error, boolean_flag - - -from datalayer_core._version import __version__ -from datalayer_core.cli.base import ( - DatalayerCLIBaseApp, - datalayer_aliases, - datalayer_flags, +from jupyter_core.application import JupyterApp +from jupyter_kernel_client.konsoleapp import ( + KonsoleApp, + aliases as base_aliases, + flags as base_flags, ) +from traitlets import default +from traitlets.config import catch_config_error + +from ..._version import __version__ +from ...cli.base import DatalayerAuthMixin from ..manager import KernelManager -from .shell import WSTerminalInteractiveShell - - -# ----------------------------------------------------------------------------- -# Globals -# ----------------------------------------------------------------------------- - -_examples = """ -jupyter kernels console # start the WS-based console -""" - -# ----------------------------------------------------------------------------- -# Flags and Aliases -# ----------------------------------------------------------------------------- - -# copy flags from mixin: -flags = dict(datalayer_flags) -flags.update( - boolean_flag( - "simple-prompt", - "WSTerminalInteractiveShell.simple_prompt", - "Force simple minimal prompt using `raw_input`", - "Use a rich interactive prompt with prompt_toolkit", - ) -) -flags.update( - boolean_flag( - "confirm-exit", - "KernelConsoleApp.confirm_exit", - """Set to display confirmation dialog on exit. You can always use 'exit' or - 'quit', to force a direct exit without any confirmation. This can also - be set in the config file by setting - `c.KernelConsoleApp.confirm_exit`. - """, - """Don't prompt the user when exiting. This will terminate the kernel - if it is owned by the frontend, and leave it alive if it is external. - This can also be set in the config file by setting - `c.KernelConsoleApp.confirm_exit`. - """, - ) -) -# copy flags from mixin -aliases = dict(datalayer_aliases) -aliases.update( +datalayer_aliases = dict(base_aliases) +datalayer_aliases["run-url"] = "DatalayerAuthMixin.run_url" +datalayer_aliases["token"] = "DatalayerAuthMixin.token" +datalayer_aliases["external-token"] = "DatalayerAuthMixin.external_token" + +datalayer_flags = dict(base_flags) +datalayer_flags.update( { - "kernel": "KernelsConsoleApp.kernel_name", + "no-browser": ( + {"DatalayerAuthMixin": {"no_browser": True}}, + "Will prompt for user and password on the CLI.", + ) } ) -# ----------------------------------------------------------------------------- -# Classes -# ----------------------------------------------------------------------------- - -class KernelsConsoleApp(DatalayerCLIBaseApp): - """Start a terminal frontend to a remote kernel.""" +class KernelsConsoleApp(DatalayerAuthMixin, KonsoleApp): + """Console for Datalayer remote kernels.""" - name = "jupyter-kernels-console" + name = "datalayer-console" version = __version__ - description = """ - The Jupyter Kernels terminal-based Console. - - This launches a Console application inside a terminal. - """ - examples = _examples - - classes = [WSTerminalInteractiveShell] - flags = Dict(flags) - aliases = Dict(aliases) - - subcommands = Dict() - - kernel_manager_class = Type( - default_value=KernelManager, - config=True, - help="The kernel manager class to use.", - ) + aliases = datalayer_aliases - kernel_name = Unicode( - "", config=True, help="""The name of the kernel to connect to.""" - ) + flags = datalayer_flags - confirm_exit = CBool( - True, - config=True, - help=""" - Set to display confirmation dialog on exit. You can always use 'exit' or 'quit', - to force a direct exit without any confirmation.""", - ) + @default("kernel_manager_class") + def _kernel_manager_class_default(self) -> type: + return KernelManager - force_interact = True - - def init_shell(self): - # relay sigint to kernel - signal.signal(signal.SIGINT, self.handle_sigint) - self.shell = WSTerminalInteractiveShell.instance( - parent=self, - client=self.kernel_client, # FIXME - confirm_exit=self.confirm_exit, - ) - self.shell.own_kernel = False # not self.existing # FIXME we always assume there is a remote kernel - - def handle_sigint(self, *args): - if self.shell._executing: - if self.kernel_manager: - self.kernel_manager.interrupt_kernel() - else: - print( - "ERROR: Cannot interrupt kernels we didn't start.", file=sys.stderr - ) - else: - # raise the KeyboardInterrupt if we aren't waiting for execution, - # so that the interact loop advances, and prompt is redrawn, etc. - raise KeyboardInterrupt + @default("kernel_name") + def _kernel_name_default(self) -> str: + # Don't set a default kernel name + return "" @catch_config_error - def initialize(self, argv=None): + def initialize(self, argv: t.Any = None) -> None: """Do actions after construct, but before starting the app.""" + super(JupyterApp, self).initialize(argv) + + if self.token is None: + self.user_handle = None + if getattr(self, "_dispatching", False): return - DatalayerCLIBaseApp.initialize(self) - self.kernel_manager = None + self._log_in() + self.kernel_client = None self.shell = None self.init_kernel_manager() self.init_kernel_client() - if self.kernel_client.channels_running: + if self.kernel_client.client.channels_running: # create the shell self.init_shell() # and draw the banner self.init_banner() - def init_banner(self): - """Optionally display the banner""" - self.shell.show_banner() - def init_kernel_manager(self) -> None: - # Create a KernelManager. - self.kernel_manager = self.kernel_manager_class( - run_url=self.run_url, - token=self.token, - username=self.user_handle, + """Initialize the kernel manager.""" + # Create a KernelManager and start a kernel. + self.kernel_client = self.kernel_manager_class( parent=self, + run_url=self.run_url, + token=self.token or "", + username=self.user_handle or "", ) - def init_kernel_client(self) -> None: - """Initialize the kernel client.""" - self.kernel_manager.start_kernel(kernel_name=self.kernel_name) - self.kernel_client = self.kernel_manager.client() - - self.kernel_client.start_channels() - - def start(self): - # JupyterApp.start dispatches on NoStart - super(KernelsConsoleApp, self).start() - try: - if self.shell is None: - return - self.log.debug("Starting the jupyter console mainloop...") - self.shell.mainloop() - finally: - self.kernel_client.stop_channels() + if not self.existing: + self.kernel_client.start_kernel( + name=self.kernel_name, path=self.kernel_path + ) + elif self.kernel_client.kernel is None: + msg = f"Unable to connect to kernel with ID {self.existing}." + raise RuntimeError(msg) + + def init_shell(self): + super().init_shell() + # Force `own_kernel` to False to prevent shutting down the kernel + # on exit + self.shell.own_kernel = False main = launch_new_instance = KernelsConsoleApp.launch_instance diff --git a/datalayer_core/kernels/manager.py b/datalayer_core/kernels/manager.py index 0aa4cb2..f1ac42d 100644 --- a/datalayer_core/kernels/manager.py +++ b/datalayer_core/kernels/manager.py @@ -3,121 +3,70 @@ from __future__ import annotations -import datetime import re -import typing as t +from jupyter_kernel_client.manager import REQUEST_TIMEOUT, KernelHttpManager from jupyter_server.utils import url_path_join -from jupyter_server._tz import UTC, utcnow -from requests.exceptions import HTTPError -from traitlets import DottedObjectName, Type -from traitlets.config import LoggingConfigurable from datalayer_core.kernels.utils import _timestamp_to_local_date from datalayer_core.utils.utils import fetch - HTTP_PROTOCOL_REGEXP = re.compile(r"^http") -class KernelManager(LoggingConfigurable): +class KernelManager(KernelHttpManager): """Manages a single kernel remotely.""" def __init__(self, run_url: str, token: str, username: str, **kwargs): """Initialize the gateway kernel manager.""" - super().__init__(**kwargs) + _ = kwargs.pop("kernel_id", None) # kernel_id not supported + super().__init__(server_url="", token="", username=username, **kwargs) + self._kernel_id = "" self.run_url = run_url - self.token = token + self.run_token = token self.username = username - self.kernel_url: str = "" - self.kernel_id: str = "" - self.kernel: dict | None = None - self.kernel_token: str = "" - # simulate busy/activity markers - self.execution_state = "starting" - self.last_activity = utcnow() @property - def has_kernel(self): - """Has a kernel been started that we are managing.""" - return self.kernel is not None - - client_class = DottedObjectName("datalayer_core.kernels.client.KernelClient") - client_factory = Type(klass="datalayer_core.kernels.client.KernelClient") - - # -------------------------------------------------------------------------- - # create a Client connected to our Kernel - # -------------------------------------------------------------------------- - - def client(self, **kwargs): - """Create a client configured to connect to our kernel""" - base_ws_url = HTTP_PROTOCOL_REGEXP.sub("ws", self.kernel_url, 1) - - kw: dict[str, t.Any] = {} - kw.update( - { - "kernel_ws_endpoint": url_path_join(base_ws_url, "channels"), - "token": self.kernel_token, - "username": self.username, - "log": self.log, - "parent": self, - } - ) - - # add kwargs last, for manual overrides - kw.update(kwargs) - return self.client_factory(**kw) - - def refresh_model(self, model=None): - """Refresh the kernel model. - - Parameters - ---------- - model : dict - The model from which to refresh the kernel. If None, the kernel - model is fetched from the Gateway server. - """ - if model is None: - self.log.debug("Request kernel at: %s" % self.kernel_url) - try: - response = fetch(self.kernel_url, token=self.kernel_token, method="GET") - except HTTPError as error: - if error.response.status_code == 404: - self.log.warning("Kernel not found at: %s", self.kernel_url) - model = None - else: - raise - else: - model = response.json() - self.log.debug("Kernel retrieved: %s" % model) - - if model: # Update activity markers - self.last_activity = datetime.datetime.strptime( - model["last_activity"], "%Y-%m-%dT%H:%M:%S.%fZ" - ).replace(tzinfo=UTC) - self.execution_state = model["execution_state"] - - self.kernel = model - return model + def kernel_url(self) -> str | None: + if self.kernel: + kernel_id = self.kernel["id"] + return url_path_join(self.server_url, "api/kernels", kernel_id) + elif self._kernel_id: + return url_path_join(self.server_url, "api/kernels", self._kernel_id) + else: + return None # -------------------------------------------------------------------------- # Kernel management # -------------------------------------------------------------------------- - def start_kernel(self, **kwargs): - """Starts a kernel via HTTP in an asynchronous manner. + def start_kernel( + self, name: str = "", path: str | None = None, timeout: float = REQUEST_TIMEOUT + ): + """Starts a kernel on Datalayer cloud. Parameters ---------- - `**kwargs` : optional - keyword arguments that are passed down to build the kernel_cmd - and launching the kernel (e.g. Popen kwargs). + name : str + Kernel name + path : str + [optional] API path from root to the cwd of the kernel + timeout : float + Request timeout + Returns + ------- + The kernel model """ - kernel_name = kwargs.get("kernel_name") + if self.has_kernel: + raise RuntimeError( + "A kernel is already started. Shutdown it before starting a new one." + ) + + kernel_name = name kernel = None if kernel_name: response = fetch( "{}/api/jupyter/v1/kernels/{}".format(self.run_url, kernel_name), - token=self.token, + token=self.run_token, ) kernel = response.json().get("kernel") else: @@ -126,7 +75,7 @@ def start_kernel(self, **kwargs): ) response = fetch( "{}/api/jupyter/v1/kernels".format(self.run_url), - token=self.token, + token=self.run_token, ) kernels = response.json().get("kernels", []) if len(kernels) == 0: @@ -139,90 +88,19 @@ def start_kernel(self, **kwargs): if kernel is None: raise RuntimeError("Unable to find a remote kernel.") - base_url = kernel["ingress"] - # base_ws_url = HTTP_PROTOCOL_REGEXP.sub("ws", base_url, 1) - self.kernel_token = kernel.get("token", "") + self.server_url = kernel["ingress"] + self.token = kernel.get("token", "") - response = fetch(f"{base_url}/api/kernels", token=self.kernel_token) + # Trick to set the kernel_url without the ability to set self.__kernel + response = fetch(f"{self.server_url}/api/kernels", token=self.token) kernels = response.json() - kernel_id = kernels[0]["id"] + self._kernel_id = kernels[0]["id"] - self.kernel_id = kernel_id - self.kernel_url = url_path_join(base_url, "api/kernels", self.kernel_id) - self.kernel = self.refresh_model() + kernel_model = self.refresh_model() msg = f"KernelManager using existing jupyter kernel {kernel_name}" expired_at = kernel.get("expired_at") if expired_at is not None: msg += f" expiring at {_timestamp_to_local_date(expired_at)}" self.log.info(msg) - def shutdown_kernel(self, now=False, restart=False): - """Attempts to stop the kernel process cleanly via HTTP.""" - - if self.has_kernel: - self.log.debug("Request shutdown kernel at: %s", self.kernel_url) - try: - response = fetch( - self.kernel_url, token=self.kernel_token, method="DELETE" - ) - self.log.debug( - "Shutdown kernel response: %d %s", - response.status_code, - response.reason, - ) - except HTTPError as error: - if error.response.status_code == 404: - self.log.debug( - "Shutdown kernel response: kernel not found (ignored)" - ) - else: - raise - - def restart_kernel(self, **kw): - """Restarts a kernel via HTTP.""" - if self.has_kernel: - assert self.kernel_url is not None - kernel_url = self.kernel_url + "/restart" - self.log.debug("Request restart kernel at: %s", kernel_url) - response = fetch( - kernel_url, - token=self.kernel_token, - method="POST", - json={}, - ) - self.log.debug( - "Restart kernel response: %d %s", response.status_code, response.reason - ) - - def interrupt_kernel(self): - """Interrupts the kernel via an HTTP request.""" - if self.has_kernel: - assert self.kernel_url is not None - kernel_url = self.kernel_url + "/interrupt" - self.log.debug("Request interrupt kernel at: %s", kernel_url) - response = fetch( - kernel_url, - token=self.kernel_token, - method="POST", - json={}, - ) - self.log.debug( - "Interrupt kernel response: %d %s", - response.status_code, - response.reason, - ) - - def is_alive(self): - """Is the kernel process still running?""" - if self.has_kernel: - # Go ahead and issue a request to get the kernel - self.kernel = self.refresh_model() - self.log.debug(f"The kernel: {self.kernel} is alive.") - return True - else: # we don't have a kernel - self.log.debug(f"The kernel: {self.kernel} no longer exists.") - return False - - def cleanup_resources(self, restart=False): - """Clean up resources when the kernel is shut down""" - ... + return kernel_model diff --git a/pyproject.toml b/pyproject.toml index 6e45b29..e8f80cf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,8 +24,14 @@ classifiers = [ "Programming Language :: Python :: 3.11", ] dependencies = [ + "jupyter-kernel-client", + "jupyter-console", "jupyter_server>=2,<3", + "keyring", + "questionary", + "requests", "rich", + "traitlets", ] dynamic = ["version", "description", "authors", "urls", "keywords"]