From e9402e9285f6adeeafc01826113d5af8ec1f3db1 Mon Sep 17 00:00:00 2001 From: Daniel Gafni Date: Fri, 4 Oct 2024 16:34:46 +0200 Subject: [PATCH] [dagster-pipes] mutable log_readers in ThreadedPipesMessageReader --- .../dagster/dagster/_core/pipes/utils.py | 108 +++++++++++++----- .../dagster_databricks/pipes.py | 3 + 2 files changed, 80 insertions(+), 31 deletions(-) diff --git a/python_modules/dagster/dagster/_core/pipes/utils.py b/python_modules/dagster/dagster/_core/pipes/utils.py index bfc05d3f42cc4..18744e0623e40 100644 --- a/python_modules/dagster/dagster/_core/pipes/utils.py +++ b/python_modules/dagster/dagster/_core/pipes/utils.py @@ -8,7 +8,7 @@ from abc import ABC, abstractmethod from contextlib import contextmanager from threading import Event, Thread -from typing import Any, Iterator, Optional, Sequence, TextIO, Tuple, TypeVar +from typing import IO, Any, Dict, Iterator, Mapping, Optional, Sequence, Tuple, TypeVar from dagster_pipes import ( PIPES_PROTOCOL_VERSION_FIELD, @@ -245,21 +245,36 @@ def no_messages_debug_text(self) -> str: class PipesThreadedMessageReader(PipesMessageReader): interval: float counter: int - log_readers: Sequence["PipesLogReader"] opened_payload: Optional[PipesOpenedData] launched_payload: Optional[PipesLaunchedData] + log_readers: Dict[str, "PipesLogReader"] def __init__( self, interval: float = 10, - log_readers: Optional[Sequence["PipesLogReader"]] = None, + log_readers: Optional[Mapping[str, "PipesLogReader"]] = None, ): self.interval = interval - self.counter = 1 + self.log_readers = ( - check.opt_sequence_param(log_readers, "log_readers", of_type=PipesLogReader) or [] + { # this converts a Mapping to Dict (self.log_readershas to be mutable in order to add extra readers) + name: reader + for name, reader in check.opt_mapping_param( + log_readers, + "log_readers", + key_type=str, + value_type=PipesLogReader, + ).items() + } + if log_readers + else {} ) + self.extra_log_readers = {} self.opened_payload = None + self.launched_payload = None + + @abstractmethod + def can_start(self, params: PipesParams) -> bool: ... @contextmanager def read_messages( @@ -307,8 +322,13 @@ def on_opened(self, opened_payload: PipesOpenedData) -> None: def on_launched(self, launched_payload: PipesLaunchedData) -> None: self.launched_payload = launched_payload - @abstractmethod - def can_start(self, params: PipesParams) -> bool: ... + def add_log_reader(self, name: str, log_reader: "PipesLogReader") -> None: + """Can be used to attach extra log readers to the message reader. + Typically called when the target for reading logs is not known until after the external + process has started (for example, when the target depends on an external job_id). + The LogReader will be eventually started by the PipesThreadedMessageReader. + """ + self.extra_log_readers[name] = log_reader @abstractmethod @contextmanager @@ -434,18 +454,26 @@ def _logs_thread( # payload. log_params = {**params, **self.opened_payload} + wait_for_logs_start = None + # Loop over all log readers and start them if the target is readable, which typically means # a file exists at the target location. Different execution environments may write logs at # different times (e.g., some may write logs periodically during execution, while others may # only write logs after the process has completed). try: - unstarted_log_readers = list(self.log_readers) - wait_for_logs_start = None - while unstarted_log_readers: - # iterate in reverse so we can pop off elements as we go - for i in reversed(range(len(unstarted_log_readers))): - if unstarted_log_readers[i].target_is_readable(log_params): - reader = unstarted_log_readers.pop(i) + unstarted_log_readers = {**self.log_readers, **self.extra_log_readers} + + while True: + # periodically check extra log readers for new readers which may be added after the + # external process has started and add them to the unstarted log readers + for key in list(self.extra_log_readers.keys()).copy(): + new_log_reader = self.extra_log_readers.pop(key) + unstarted_log_readers[key] = new_log_reader + self.log_readers[key] = new_log_reader + + for key in list(unstarted_log_readers.keys()).copy(): + if unstarted_log_readers[key].can_start(log_params): + reader = unstarted_log_readers.pop(key) reader.start(log_params, is_session_closed) # In some cases logs might not be written out until after the external process has @@ -456,19 +484,23 @@ def _logs_thread( if is_session_closed.is_set(): if wait_for_logs_start is None: wait_for_logs_start = datetime.datetime.now() - if ( - datetime.datetime.now() - wait_for_logs_start - ).seconds > WAIT_FOR_LOGS_TIMEOUT: - for log_reader in unstarted_log_readers: + + if not unstarted_log_readers: + return + elif ( + unstarted_log_readers + and (datetime.datetime.now() - wait_for_logs_start).seconds + > WAIT_FOR_LOGS_TIMEOUT + ): + for key, log_reader in unstarted_log_readers.items(): warnings.warn( - f"Attempted to read log for reader {log_reader.name} but log was" - " still not written {WAIT_FOR_LOGS_TIMEOUT} seconds after session close. Abandoning log." + f"[pipes] Attempted to read log for reader {key}:{log_reader.name} but log was" + f" still not written {WAIT_FOR_LOGS_TIMEOUT} seconds after session close. Abandoning reader {key}." ) - break - time.sleep(DEFAULT_SLEEP_INTERVAL) - # Wait for the external process to complete - is_session_closed.wait() + return + + time.sleep(DEFAULT_SLEEP_INTERVAL) except: handler.report_pipes_framework_exception( f"{self.__class__.__name__} logs thread", @@ -545,7 +577,7 @@ def stop(self) -> None: ... def is_running(self) -> bool: ... @abstractmethod - def target_is_readable(self, params: PipesParams) -> bool: ... + def can_start(self, params: PipesParams) -> bool: ... @property def name(self) -> str: @@ -558,10 +590,10 @@ class PipesChunkedLogReader(PipesLogReader): Args: interval (float): interval in seconds between attempts to download a chunk. - target_stream (TextIO): The stream to which to write the logs. Typcially `sys.stdout` or `sys.stderr`. + target_stream (IO[str]): The stream to which to write the logs. Typcially `sys.stdout` or `sys.stderr`. """ - def __init__(self, *, interval: float = 10, target_stream: TextIO): + def __init__(self, *, interval: float = 10, target_stream: IO[str]): self.interval = interval self.target_stream = target_stream self.thread: Optional[Thread] = None @@ -618,7 +650,21 @@ def _join_thread(thread: Thread, thread_name: str) -> None: raise DagsterPipesExecutionError(f"Timed out waiting for {thread_name} thread to finish.") -def extract_message_or_forward_to_file(handler: "PipesMessageHandler", log_line: str, file: TextIO): +def forward_only_logs_to_file(log_line: str, file: IO[str]): + """Will write the log line to the file if it is not a Pipes message.""" + try: + message = json.loads(log_line) + if PIPES_PROTOCOL_VERSION_FIELD in message.keys(): + return + else: + file.writelines((log_line, "\n")) + except Exception: + file.writelines((log_line, "\n")) + + +def extract_message_or_forward_to_file( + handler: "PipesMessageHandler", log_line: str, file: IO[str] +): # exceptions as control flow, you love to see it try: message = json.loads(log_line) @@ -627,7 +673,7 @@ def extract_message_or_forward_to_file(handler: "PipesMessageHandler", log_line: else: file.writelines((log_line, "\n")) except Exception: - # move non-message logs in to stdout for compute log capture + # move non-message logs in to file for compute log capture file.writelines((log_line, "\n")) @@ -718,7 +764,7 @@ def ext_asset(context: OpExecutionContext): ) finally: if not message_handler.received_opened_message: - context.log.warn( + context.log.warning( "[pipes] did not receive any messages from external process. Check stdout / stderr" " logs from the external process if" f" possible.\n{context_injector.__class__.__name__}:" @@ -726,7 +772,7 @@ def ext_asset(context: OpExecutionContext): f" {message_reader.no_messages_debug_text()}\n" ) elif not message_handler.received_closed_message: - context.log.warn( + context.log.warning( "[pipes] did not receive closed message from external process. Buffered messages" " may have been discarded without being delivered. Use `open_dagster_pipes` as a" " context manager (a with block) to ensure that cleanup is successfully completed." diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py b/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py index e73b3fe5c5857..3b79ba5c0d673 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py @@ -403,6 +403,9 @@ def __init__( self.log_modification_time = None self.log_path = None + def can_start(self, params: PipesParams) -> bool: + return self._get_log_path(params) is not None + def download_log_chunk(self, params: PipesParams) -> Optional[str]: log_path = self._get_log_path(params) if log_path is None: