-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: terminate hanging jobs #187
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
#!/bin/bash | ||
|
||
set -eo pipefail | ||
set -x | ||
|
||
echo "Starting terminate_starting_and_started_runs in background to terminate orphaned ones." | ||
derhuerst marked this conversation as resolved.
Show resolved
Hide resolved
|
||
python /opt/dagster/app/scripts/terminate_starting_and_started_runs.py & | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The background job (sub process) started here might fail for any reason, including It seems very hard to get the following behaviour solely with a Bash script:
After experimenting for too long (with many variants of 1, 2 & 3), I propose just using #!/bin/bash
set -eo pipefail
echo 'Running terminate_starting_and_started_runs.py as a background job.' >&2
set -x
# In case the background job fails, we kill the entire shell ($$ has its PID) and all its children (by negating the PID).
# This *does not* work if `$$` evaluates to 1 (our shell is the init process), so we *must* run this script with an "external" init command.
python /opt/dagster/app/scripts/terminate_starting_and_started_runs.py \
|| kill -TERM -- "-$$" &
exec "$@"
`` |
||
|
||
# Hand off to the CMD | ||
exec "$@" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
import logging | ||
import socket | ||
import time | ||
|
||
from dagster import DagsterRunStatus | ||
from dagster_graphql import DagsterGraphQLClient, DagsterGraphQLClientError | ||
from requests.exceptions import ConnectionError | ||
|
||
logging.basicConfig() | ||
logger = logging.getLogger(__file__.rsplit('/', 1)[1]) | ||
logger.setLevel(logging.INFO) | ||
|
||
client = DagsterGraphQLClient('localhost', port_number=3000) | ||
|
||
GET_RUNS_QUERY = ''' | ||
query RunsQuery ($filter: RunsFilter) { | ||
runsOrError( | ||
filter: $filter | ||
) { | ||
__typename | ||
... on Runs { | ||
results { | ||
runId | ||
jobName | ||
status | ||
runConfigYaml | ||
startTime | ||
endTime | ||
} | ||
} | ||
} | ||
} | ||
''' | ||
|
||
|
||
def get_run_ids_of_runs(status: list[str], timeout: int = 20) -> list[str]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's likely that we'll want to change this soon, so let's make it configurable via an environment variable. |
||
variables = {'filter': {'statuses': status}} | ||
|
||
start_time = time.perf_counter() | ||
while True: | ||
try: | ||
response = client._execute(GET_RUNS_QUERY, variables) | ||
return [run['runId'] for run in response['runsOrError']['results']] | ||
except DagsterGraphQLClientError as ex: | ||
if isinstance(ex.__cause__, ConnectionError): | ||
time.sleep(0.1) | ||
if time.perf_counter() - start_time >= timeout: | ||
raise TimeoutError('Waited too long for the Dagster Webserver startup') from ex | ||
else: | ||
raise | ||
|
||
|
||
run_ids = get_run_ids_of_runs(['STARTED', 'STARTING']) | ||
|
||
if len(run_ids) > 0: | ||
logger.info(f'Terminating runs {run_ids}') | ||
client.terminate_runs(run_ids) | ||
else: | ||
logger.info('No run in state STARTED or STARTING') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(See the comment about background jobs below.)