Skip to content

Commit

Permalink
[dagster-pipes] add on_launched hook to PipesMessageReader
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 1, 2024
1 parent 784252e commit f45836f
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion python_modules/dagster/dagster/_core/pipes/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Sequence
from typing import TYPE_CHECKING, Any, Iterator, List, Mapping, Optional, Sequence, TypedDict

from dagster_pipes import (
DagsterPipesError,
Expand Down Expand Up @@ -143,7 +143,18 @@ def no_messages_debug_text(self) -> str:
"""


class PipesLaunchedData(TypedDict):
"""Payload generated on the Client side after the external process startup
containing arbitrary information about the external process.
"""

extras: Mapping[str, Any]


class PipesMessageReader(ABC):
launched_payload: Optional[PipesLaunchedData]
opened_payload: Optional[PipesOpenedData]

@abstractmethod
@contextmanager
def read_messages(self, handler: "PipesMessageHandler") -> Iterator[PipesParams]:
Expand All @@ -170,6 +181,14 @@ def on_opened(self, opened_payload: PipesOpenedData) -> None:
that can only be obtained from the external process.
"""

def on_launched(self, params: Mapping[str, Any]) -> None:
"""Can be called manually to submit extra information about the launched process
to the message reader.
By default this is a no-op. Specific message readers can override this to action information
that can only be obtained on the client side after the external process has been launched.
"""

@abstractmethod
def no_messages_debug_text(self) -> str:
"""A message to be displayed when no messages are received from the external process to aid with
Expand Down

0 comments on commit f45836f

Please sign in to comment.