Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: terminate hanging jobs #187

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

The changelog lists most feature changes between each release.

## Upcoming release
* Fix: on startup, terminate runs still in started/starting state, as dagster doesn't terminate them cleanly on shutdown.
* Fix: enable run monitoring to terminate jobs hanging on startup/cancellation (after 180s) or running for more than 6h

## 2025-01-28
- Fix: [create primary key if missing](https://github.com/mobidata-bw/ipl-dagster-pipeline/pull/182)
Expand Down
4 changes: 4 additions & 0 deletions dagster.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ LABEL org.opencontainers.image.licenses="(EUPL-1.2)"

EXPOSE 3000

COPY scripts/ /opt/dagster/app/scripts/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(See the comment about background jobs below.)

Suggested change
COPY scripts/ /opt/dagster/app/scripts/
ARG TARGETARCH
ENV TINI_VERSION=v0.19.0
ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${TARGETARCH} /tini
RUN chmod +x /tini && /tini --version
COPY scripts/ /opt/dagster/app/scripts/
ENTRYPOINT [ "/tini", "--", "/opt/dagster/app/scripts/start_runs_termination_script_in_background.sh" ]


ENTRYPOINT ["/opt/dagster/app/scripts/start_runs_termination_script_in_background.sh"]

CMD ["dagster-webserver", "-h", "0.0.0.0", "-p", "3000", "-w", "workspace.yaml"]

FROM base AS daemon
Expand Down
11 changes: 11 additions & 0 deletions dagster.docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ run_coordinator:
dequeue_use_threads: true
dequeue_interval_seconds: 1

run_monitoring:
enabled: true
start_timeout_seconds: 180
cancel_timeout_seconds: 180
max_resume_run_attempts: 3
poll_interval_seconds: 120
# max_runtime_seconds 21600s = 6h
# to overwrite this for individual jobs, see
# https://docs.dagster.io/guides/deploy/execution/run-monitoring#general-run-timeouts
max_runtime_seconds: 21600

run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
Expand Down
10 changes: 10 additions & 0 deletions scripts/start_runs_termination_script_in_background.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

set -eo pipefail
set -x

echo "Starting terminate_starting_and_started_runs in background to terminate orphaned ones."
python /opt/dagster/app/scripts/terminate_starting_and_started_runs.py &
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The background job (sub process) started here might fail for any reason, including /opt/dagster/app/scripts/terminate_starting_and_started_runs.py missing. I'm afraid that this will cause further headaches down the road, at some inconvenient moment, in production. I strongly suggest that we change the script to crash (exit with non-0) the contain if this happens.

It seems very hard to get the following behaviour solely with a Bash script:

  • If the background job fails, also quit the main process.
  • If the main process exits, don't leave the background job running.
  • If both run through cleanly, exit as usual.

After experimenting for too long (with many variants of 1, 2 & 3), I propose just using tini in combination with kill "-$$" as a workaround.

#!/bin/bash

set -eo pipefail

echo 'Running terminate_starting_and_started_runs.py as a background job.' >&2

set -x

# In case the background job fails, we kill the entire shell ($$ has its PID) and all its children (by negating the PID).
# This *does not* work if `$$` evaluates to 1 (our shell is the init process), so we *must* run this script with an "external" init command.
python /opt/dagster/app/scripts/terminate_starting_and_started_runs.py \
	|| kill -TERM -- "-$$" &

exec "$@"
``


# Hand off to the CMD
exec "$@"
59 changes: 59 additions & 0 deletions scripts/terminate_starting_and_started_runs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import logging
import socket
import time

from dagster import DagsterRunStatus
from dagster_graphql import DagsterGraphQLClient, DagsterGraphQLClientError
from requests.exceptions import ConnectionError

logging.basicConfig()
logger = logging.getLogger(__file__.rsplit('/', 1)[1])
logger.setLevel(logging.INFO)

client = DagsterGraphQLClient('localhost', port_number=3000)

GET_RUNS_QUERY = '''
query RunsQuery ($filter: RunsFilter) {
runsOrError(
filter: $filter
) {
__typename
... on Runs {
results {
runId
jobName
status
runConfigYaml
startTime
endTime
}
}
}
}
'''


def get_run_ids_of_runs(status: list[str], timeout: int = 20) -> list[str]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's likely that we'll want to change this soon, so let's make it configurable via an environment variable.

variables = {'filter': {'statuses': status}}

start_time = time.perf_counter()
while True:
try:
response = client._execute(GET_RUNS_QUERY, variables)
return [run['runId'] for run in response['runsOrError']['results']]
except DagsterGraphQLClientError as ex:
if isinstance(ex.__cause__, ConnectionError):
time.sleep(0.1)
if time.perf_counter() - start_time >= timeout:
raise TimeoutError('Waited too long for the Dagster Webserver startup') from ex
else:
raise


run_ids = get_run_ids_of_runs(['STARTED', 'STARTING'])

if len(run_ids) > 0:
logger.info(f'Terminating runs {run_ids}')
client.terminate_runs(run_ids)
else:
logger.info('No run in state STARTED or STARTING')