From 4ffdfa6f3329064c93e76e387994eda252c9af01 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Sun, 23 Feb 2025 12:23:46 +0100 Subject: [PATCH 1/3] fix: terminate started and starting runs on webserver startup --- dagster.Dockerfile | 4 ++ ...t_runs_termination_script_in_background.sh | 10 ++++ .../terminate_starting_and_started_runs.py | 59 +++++++++++++++++++ 3 files changed, 73 insertions(+) create mode 100755 scripts/start_runs_termination_script_in_background.sh create mode 100644 scripts/terminate_starting_and_started_runs.py diff --git a/dagster.Dockerfile b/dagster.Dockerfile index e9d722a..4362158 100644 --- a/dagster.Dockerfile +++ b/dagster.Dockerfile @@ -27,6 +27,10 @@ LABEL org.opencontainers.image.licenses="(EUPL-1.2)" EXPOSE 3000 +COPY scripts/ /opt/dagster/app/scripts/ + +ENTRYPOINT ["/opt/dagster/app/scripts/start_runs_termination_script_in_background.sh"] + CMD ["dagster-webserver", "-h", "0.0.0.0", "-p", "3000", "-w", "workspace.yaml"] FROM base AS daemon diff --git a/scripts/start_runs_termination_script_in_background.sh b/scripts/start_runs_termination_script_in_background.sh new file mode 100755 index 0000000..06b7cc0 --- /dev/null +++ b/scripts/start_runs_termination_script_in_background.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -eo pipefail +set -x + +echo "Starting terminate_starting_and_started_runs in background to terminate orphaned ones." +python /opt/dagster/app/scripts/terminate_starting_and_started_runs.py & + +# Hand off to the CMD +exec "$@" diff --git a/scripts/terminate_starting_and_started_runs.py b/scripts/terminate_starting_and_started_runs.py new file mode 100644 index 0000000..ed03d3b --- /dev/null +++ b/scripts/terminate_starting_and_started_runs.py @@ -0,0 +1,59 @@ +import logging +import socket +import time + +from dagster import DagsterRunStatus +from dagster_graphql import DagsterGraphQLClient, DagsterGraphQLClientError +from requests.exceptions import ConnectionError + +logging.basicConfig() +logger = logging.getLogger(__file__.rsplit('/', 1)[1]) +logger.setLevel(logging.INFO) + +client = DagsterGraphQLClient('localhost', port_number=3000) + +GET_RUNS_QUERY = ''' +query RunsQuery ($filter: RunsFilter) { + runsOrError( + filter: $filter + ) { + __typename + ... on Runs { + results { + runId + jobName + status + runConfigYaml + startTime + endTime + } + } + } +} +''' + + +def get_run_ids_of_runs(status: list[str], timeout: int = 20) -> list[str]: + variables = {'filter': {'statuses': status}} + + start_time = time.perf_counter() + while True: + try: + response = client._execute(GET_RUNS_QUERY, variables) + return [run['runId'] for run in response['runsOrError']['results']] + except DagsterGraphQLClientError as ex: + if isinstance(ex.__cause__, ConnectionError): + time.sleep(0.1) + if time.perf_counter() - start_time >= timeout: + raise TimeoutError('Waited too long for the Dagster Webserver startup') from ex + else: + raise + + +run_ids = get_run_ids_of_runs(['STARTED', 'STARTING']) + +if len(run_ids) > 0: + logger.info(f'Terminating runs {run_ids}') + client.terminate_runs(run_ids) +else: + logger.info('No run in state STARTED or STARTING') From 1061375525bc3dc1df52eee26c79f105390197f9 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Sun, 23 Feb 2025 12:30:52 +0100 Subject: [PATCH 2/3] fix: enable run_monitoring to terminate hanging jobs --- dagster.docker.yaml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/dagster.docker.yaml b/dagster.docker.yaml index 2b4f0f0..1391c97 100644 --- a/dagster.docker.yaml +++ b/dagster.docker.yaml @@ -13,6 +13,17 @@ run_coordinator: dequeue_use_threads: true dequeue_interval_seconds: 1 +run_monitoring: + enabled: true + start_timeout_seconds: 180 + cancel_timeout_seconds: 180 + max_resume_run_attempts: 3 + poll_interval_seconds: 120 + # max_runtime_seconds 21600s = 6h + # to overwrite this for individual jobs, see + # https://docs.dagster.io/guides/deploy/execution/run-monitoring#general-run-timeouts + max_runtime_seconds: 21600 + run_storage: module: dagster_postgres.run_storage class: PostgresRunStorage From dab3b0394a2b8eb57d43bc2ced100d16b3ee1a19 Mon Sep 17 00:00:00 2001 From: Holger Bruch Date: Sun, 23 Feb 2025 12:36:54 +0100 Subject: [PATCH 3/3] doc: add changes changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 52044b2..e59c793 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ The changelog lists most feature changes between each release. +## Upcoming release +* Fix: on startup, terminate runs still in started/starting state, as dagster doesn't terminate them cleanly on shutdown. +* Fix: enable run monitoring to terminate jobs hanging on startup/cancellation (after 180s) or running for more than 6h ## 2025-01-28 - Fix: [create primary key if missing](https://github.com/mobidata-bw/ipl-dagster-pipeline/pull/182)