Skip to content

Commit

Permalink
[dagster-pipes] mutable log_readers in ThreadedPipesMessageReader
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Oct 4, 2024
1 parent 4bcb615 commit e9402e9
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 31 deletions.
108 changes: 77 additions & 31 deletions python_modules/dagster/dagster/_core/pipes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from threading import Event, Thread
from typing import Any, Iterator, Optional, Sequence, TextIO, Tuple, TypeVar
from typing import IO, Any, Dict, Iterator, Mapping, Optional, Sequence, Tuple, TypeVar

from dagster_pipes import (
PIPES_PROTOCOL_VERSION_FIELD,
Expand Down Expand Up @@ -245,21 +245,36 @@ def no_messages_debug_text(self) -> str:
class PipesThreadedMessageReader(PipesMessageReader):
interval: float
counter: int
log_readers: Sequence["PipesLogReader"]
opened_payload: Optional[PipesOpenedData]
launched_payload: Optional[PipesLaunchedData]
log_readers: Dict[str, "PipesLogReader"]

def __init__(
self,
interval: float = 10,
log_readers: Optional[Sequence["PipesLogReader"]] = None,
log_readers: Optional[Mapping[str, "PipesLogReader"]] = None,
):
self.interval = interval
self.counter = 1

self.log_readers = (
check.opt_sequence_param(log_readers, "log_readers", of_type=PipesLogReader) or []
{ # this converts a Mapping to Dict (self.log_readershas to be mutable in order to add extra readers)
name: reader
for name, reader in check.opt_mapping_param(
log_readers,
"log_readers",
key_type=str,
value_type=PipesLogReader,
).items()
}
if log_readers
else {}
)
self.extra_log_readers = {}
self.opened_payload = None
self.launched_payload = None

@abstractmethod
def can_start(self, params: PipesParams) -> bool: ...

@contextmanager
def read_messages(
Expand Down Expand Up @@ -307,8 +322,13 @@ def on_opened(self, opened_payload: PipesOpenedData) -> None:
def on_launched(self, launched_payload: PipesLaunchedData) -> None:
self.launched_payload = launched_payload

@abstractmethod
def can_start(self, params: PipesParams) -> bool: ...
def add_log_reader(self, name: str, log_reader: "PipesLogReader") -> None:
"""Can be used to attach extra log readers to the message reader.
Typically called when the target for reading logs is not known until after the external
process has started (for example, when the target depends on an external job_id).
The LogReader will be eventually started by the PipesThreadedMessageReader.
"""
self.extra_log_readers[name] = log_reader

@abstractmethod
@contextmanager
Expand Down Expand Up @@ -434,18 +454,26 @@ def _logs_thread(
# payload.
log_params = {**params, **self.opened_payload}

wait_for_logs_start = None

# Loop over all log readers and start them if the target is readable, which typically means
# a file exists at the target location. Different execution environments may write logs at
# different times (e.g., some may write logs periodically during execution, while others may
# only write logs after the process has completed).
try:
unstarted_log_readers = list(self.log_readers)
wait_for_logs_start = None
while unstarted_log_readers:
# iterate in reverse so we can pop off elements as we go
for i in reversed(range(len(unstarted_log_readers))):
if unstarted_log_readers[i].target_is_readable(log_params):
reader = unstarted_log_readers.pop(i)
unstarted_log_readers = {**self.log_readers, **self.extra_log_readers}

while True:
# periodically check extra log readers for new readers which may be added after the
# external process has started and add them to the unstarted log readers
for key in list(self.extra_log_readers.keys()).copy():
new_log_reader = self.extra_log_readers.pop(key)
unstarted_log_readers[key] = new_log_reader
self.log_readers[key] = new_log_reader

for key in list(unstarted_log_readers.keys()).copy():
if unstarted_log_readers[key].can_start(log_params):
reader = unstarted_log_readers.pop(key)
reader.start(log_params, is_session_closed)

# In some cases logs might not be written out until after the external process has
Expand All @@ -456,19 +484,23 @@ def _logs_thread(
if is_session_closed.is_set():
if wait_for_logs_start is None:
wait_for_logs_start = datetime.datetime.now()
if (
datetime.datetime.now() - wait_for_logs_start
).seconds > WAIT_FOR_LOGS_TIMEOUT:
for log_reader in unstarted_log_readers:

if not unstarted_log_readers:
return
elif (
unstarted_log_readers
and (datetime.datetime.now() - wait_for_logs_start).seconds
> WAIT_FOR_LOGS_TIMEOUT
):
for key, log_reader in unstarted_log_readers.items():
warnings.warn(
f"Attempted to read log for reader {log_reader.name} but log was"
" still not written {WAIT_FOR_LOGS_TIMEOUT} seconds after session close. Abandoning log."
f"[pipes] Attempted to read log for reader {key}:{log_reader.name} but log was"
f" still not written {WAIT_FOR_LOGS_TIMEOUT} seconds after session close. Abandoning reader {key}."
)
break
time.sleep(DEFAULT_SLEEP_INTERVAL)

# Wait for the external process to complete
is_session_closed.wait()
return

time.sleep(DEFAULT_SLEEP_INTERVAL)
except:
handler.report_pipes_framework_exception(
f"{self.__class__.__name__} logs thread",
Expand Down Expand Up @@ -545,7 +577,7 @@ def stop(self) -> None: ...
def is_running(self) -> bool: ...

@abstractmethod
def target_is_readable(self, params: PipesParams) -> bool: ...
def can_start(self, params: PipesParams) -> bool: ...

@property
def name(self) -> str:
Expand All @@ -558,10 +590,10 @@ class PipesChunkedLogReader(PipesLogReader):
Args:
interval (float): interval in seconds between attempts to download a chunk.
target_stream (TextIO): The stream to which to write the logs. Typcially `sys.stdout` or `sys.stderr`.
target_stream (IO[str]): The stream to which to write the logs. Typcially `sys.stdout` or `sys.stderr`.
"""

def __init__(self, *, interval: float = 10, target_stream: TextIO):
def __init__(self, *, interval: float = 10, target_stream: IO[str]):
self.interval = interval
self.target_stream = target_stream
self.thread: Optional[Thread] = None
Expand Down Expand Up @@ -618,7 +650,21 @@ def _join_thread(thread: Thread, thread_name: str) -> None:
raise DagsterPipesExecutionError(f"Timed out waiting for {thread_name} thread to finish.")


def extract_message_or_forward_to_file(handler: "PipesMessageHandler", log_line: str, file: TextIO):
def forward_only_logs_to_file(log_line: str, file: IO[str]):
"""Will write the log line to the file if it is not a Pipes message."""
try:
message = json.loads(log_line)
if PIPES_PROTOCOL_VERSION_FIELD in message.keys():
return
else:
file.writelines((log_line, "\n"))
except Exception:
file.writelines((log_line, "\n"))


def extract_message_or_forward_to_file(
handler: "PipesMessageHandler", log_line: str, file: IO[str]
):
# exceptions as control flow, you love to see it
try:
message = json.loads(log_line)
Expand All @@ -627,7 +673,7 @@ def extract_message_or_forward_to_file(handler: "PipesMessageHandler", log_line:
else:
file.writelines((log_line, "\n"))
except Exception:
# move non-message logs in to stdout for compute log capture
# move non-message logs in to file for compute log capture
file.writelines((log_line, "\n"))


Expand Down Expand Up @@ -718,15 +764,15 @@ def ext_asset(context: OpExecutionContext):
)
finally:
if not message_handler.received_opened_message:
context.log.warn(
context.log.warning(
"[pipes] did not receive any messages from external process. Check stdout / stderr"
" logs from the external process if"
f" possible.\n{context_injector.__class__.__name__}:"
f" {context_injector.no_messages_debug_text()}\n{message_reader.__class__.__name__}:"
f" {message_reader.no_messages_debug_text()}\n"
)
elif not message_handler.received_closed_message:
context.log.warn(
context.log.warning(
"[pipes] did not receive closed message from external process. Buffered messages"
" may have been discarded without being delivered. Use `open_dagster_pipes` as a"
" context manager (a with block) to ensure that cleanup is successfully completed."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ def __init__(
self.log_modification_time = None
self.log_path = None

def can_start(self, params: PipesParams) -> bool:
return self._get_log_path(params) is not None

def download_log_chunk(self, params: PipesParams) -> Optional[str]:
log_path = self._get_log_path(params)
if log_path is None:
Expand Down

0 comments on commit e9402e9

Please sign in to comment.