Skip to content

Commit

Permalink
Expose a global 'max_runtime_seconds' field on run monitoring (#24653)
Browse files Browse the repository at this point in the history
## Summary & Motivation
Allows you to set a global max across all runs in the deployment instead
of tagging individual runs

## How I Tested These Changes
BK

## Changelog
- [x] `NEW` Added a `max_runtime_seconds` configuration option to run
monitoring, allowing you to specify that any run in your Dagster
deployment should terminate if it exceeds a certain runtime. Prevoiusly,
jobs had to be individually tagged with a `dagster/max_runtime` tag in
order to take advantage of this feature. Jobs and runs can still be
tagged in order to override this value for an individual run.
  • Loading branch information
gibsondan authored Sep 24, 2024
1 parent 60e0ceb commit a993cd9
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ run_queue:
run_monitoring:
start_timeout_seconds: 1200
cancel_timeout_seconds: 1200
max_runtime_seconds: 7200

run_retries:
max_retries: 0
Expand Down Expand Up @@ -122,6 +123,7 @@ The `run_monitoring` settings allow you to define how long Dagster+ should wait
run_monitoring:
start_timeout_seconds: 1200
cancel_timeout_seconds: 1200
max_runtime_seconds: 7200
```

<ReferenceTable>
Expand All @@ -136,6 +138,29 @@ run_monitoring:
</li>
</ul>
</ReferenceTableItem>
<ReferenceTableItem propertyName="run_monitoring.cancel_timeout_seconds">
The number of seconds that Dagster+ will wait after a run termination is
initiated for the process or container to terminate. After the timeout, the
run will move into a CANCELED state. This prevents runs from hanging in{" "}
<code>CANCELING</code> indefinitely when the process or container doesn't
terminate cleanly.
<ul>
<li>
<strong>Default</strong> - <code>1200</code> (20 minutes)
</li>
</ul>
</ReferenceTableItem>
<ReferenceTableItem propertyName="run_monitoring.max_runtime_seconds">
The number of seconds that Dagster+ will wait after a run is moved into a
STARTED state for the run to complete. After the timeout, the run will be
terminated and moved into a FAILURE state. This prevents runs from hanging
in <code>STARTED</code> indefinitely if the process is hanging.
<ul>
<li>
<strong>Default</strong> - <code>No limit</code>
</li>
</ul>
</ReferenceTableItem>
</ReferenceTable>

### Run retries (run_retries)
Expand Down
19 changes: 17 additions & 2 deletions docs/content/deployment/run-monitoring.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ run_monitoring:
```
<Note>
In Dagster+ Run Monitoring is enabled by default and can be configured in{" "}
In Dagster+ Run Monitoring is always enabled and can be configured in{" "}
<a href="https://docs.dagster.io/dagster-plus/managing-deployments/deployment-settings-reference">
deployment settings
</a>
Expand All @@ -39,7 +39,22 @@ When Dagster terminates a run, the run moves into CANCELING status and sends a t

## General run timeouts

After a run is marked as STARTED, it may hang indefinitely for various reasons (user API errors, network issues, etc.). The `dagster/max_runtime` tag can be used to set a timeout in seconds on a per-run basis. If the run exceeds this timeout, and run monitoring is enabled, it will be marked as failed.
After a run is marked as STARTED, it may hang indefinitely for various reasons (user API errors, network issues, etc.). You can configure a maximum runtime for every run in a deployment by setting the `run_monitoring.max_runtime_seconds` field in your dagster.yaml or (Dagster+ deployment settings)\[dagster-plus/managing-deployments/deployment-settings-reference] to the maximum runtime in seconds. If a run exceeds this timeout and run monitoring is enabled, it will be marked as failed. The `dagster/max_runtime` tag can also be used to set a timeout in seconds on a per-run basis.

For example, to configure a maximum of 2 hours for every run in your deployment:

```yaml
run_monitoring:
enabled: true
max_runtime_seconds: 7200
```

or in Dagster+, add the following to your [deployment settings](/dagster-plus/managing-deployments/deployment-settings-reference):

```yaml
run_monitoring:
max_runtime_seconds: 7200
```

The below code example shows how to set a run timeout of 10 seconds on a per-job basis:

Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,10 @@ def run_monitoring_start_timeout_seconds(self) -> int:
def run_monitoring_cancel_timeout_seconds(self) -> int:
return self.run_monitoring_settings.get("cancel_timeout_seconds", 180)

@property
def run_monitoring_max_runtime_seconds(self) -> int:
return self.run_monitoring_settings.get("max_runtime_seconds", 0)

@property
def code_server_settings(self) -> Any:
return self.get_settings("code_servers")
Expand Down
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_core/instance/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ def dagster_instance_config_schema() -> Mapping[str, Field]:
"enabled": Field(Bool, is_required=False),
"start_timeout_seconds": Field(int, is_required=False),
"cancel_timeout_seconds": Field(int, is_required=False),
"max_runtime_seconds": Field(int, is_required=False),
"max_resume_run_attempts": Field(int, is_required=False),
"poll_interval_seconds": Field(int, is_required=False),
"cancellation_thread_poll_interval_seconds": Field(int, is_required=False),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ def monitor_started_run(
instance.report_run_failed(run, msg)
# Return rather than immediately checking for a timeout, since we just failed
return
check_run_timeout(instance, run_record, logger)
check_run_timeout(
instance, run_record, logger, float(instance.run_monitoring_max_runtime_seconds)
)


def execute_run_monitoring_iteration(
Expand Down Expand Up @@ -208,15 +210,22 @@ def execute_run_monitoring_iteration(


def check_run_timeout(
instance: DagsterInstance, run_record: RunRecord, logger: logging.Logger
instance: DagsterInstance,
run_record: RunRecord,
logger: logging.Logger,
default_timeout_seconds: float,
) -> None:
# Also allow dagster/max_runtime_seconds to match the global setting
max_time_str = run_record.dagster_run.tags.get(
MAX_RUNTIME_SECONDS_TAG,
MAX_RUNTIME_SECONDS_TAG, run_record.dagster_run.tags.get("dagster/max_runtime_seconds")
)
if not max_time_str:
return
if max_time_str:
max_time = float(max_time_str)
else:
max_time = default_timeout_seconds

max_time = float(max_time_str)
if not max_time:
return

if (
run_record.start_time is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ def instance():
"module": "dagster_tests.daemon_tests.test_monitoring_daemon",
"class": "TestRunLauncher",
},
"run_monitoring": {"enabled": True, "max_resume_run_attempts": 3},
"run_monitoring": {
"enabled": True,
"max_resume_run_attempts": 3,
"max_runtime_seconds": 750,
},
},
) as instance:
yield instance
Expand Down Expand Up @@ -295,6 +299,12 @@ def test_long_running_termination(
status=DagsterRunStatus.STARTING,
tags={MAX_RUNTIME_SECONDS_TAG: "500"},
)
too_long_run_other_tag_value = create_run_for_test(
instance,
job_name="foo",
status=DagsterRunStatus.STARTING,
tags={"dagster/max_runtime_seconds": "500"},
)
okay_run = create_run_for_test(
instance,
job_name="foo",
Expand All @@ -307,6 +317,7 @@ def test_long_running_termination(
started_time = initial + datetime.timedelta(seconds=1)
with freeze_time(started_time):
report_started_event(instance, too_long_run, started_time.timestamp())
report_started_event(instance, too_long_run_other_tag_value, started_time.timestamp())
report_started_event(instance, okay_run, started_time.timestamp())
report_started_event(instance, run_no_tag, started_time.timestamp())

Expand All @@ -315,6 +326,13 @@ def test_long_running_termination(
assert too_long_record.dagster_run.status == DagsterRunStatus.STARTED
assert too_long_record.start_time == started_time.timestamp()

too_long_other_tag_value_record = instance.get_run_record_by_id(
too_long_run_other_tag_value.run_id
)
assert too_long_other_tag_value_record is not None
assert too_long_other_tag_value_record.dagster_run.status == DagsterRunStatus.STARTED
assert too_long_other_tag_value_record.start_time == started_time.timestamp()

okay_record = instance.get_run_record_by_id(okay_run.run_id)
assert okay_record is not None
assert okay_record.dagster_run.status == DagsterRunStatus.STARTED
Expand Down Expand Up @@ -362,6 +380,41 @@ def test_long_running_termination(
assert event
assert event.message == "Exceeded maximum runtime of 500 seconds."

monitor_started_run(instance, workspace, too_long_other_tag_value_record, logger)
run = instance.get_run_by_id(too_long_other_tag_value_record.dagster_run.run_id)
assert run
assert len(run_launcher.termination_calls) == 2
run = instance.get_run_by_id(too_long_other_tag_value_record.dagster_run.run_id)
assert run
assert run.status == DagsterRunStatus.FAILURE

run_failure_events = instance.all_logs(
too_long_other_tag_value_record.dagster_run.run_id,
of_type=DagsterEventType.RUN_FAILURE,
)
assert len(run_failure_events) == 1
event = run_failure_events[0].dagster_event
assert event
assert event.message == "Exceeded maximum runtime of 500 seconds."

# Wait long enough for the instance default to kick in
eval_time = started_time + datetime.timedelta(seconds=751)
with freeze_time(eval_time):
# Still overridden to 1000 so no problem
monitor_started_run(instance, workspace, okay_record, logger)
run = instance.get_run_by_id(okay_record.dagster_run.run_id)
assert run
# no new termination calls
assert len(run_launcher.termination_calls) == 2

monitor_started_run(instance, workspace, no_tag_record, logger)
run = instance.get_run_by_id(no_tag_record.dagster_run.run_id)
assert run
assert len(run_launcher.termination_calls) == 3
run = instance.get_run_by_id(no_tag_record.dagster_run.run_id)
assert run
assert run.status == DagsterRunStatus.FAILURE


@pytest.mark.parametrize("failure_case", ["fail_termination", "termination_exception"])
def test_long_running_termination_failure(
Expand Down

1 comment on commit a993cd9

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-ci10uf8ul-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit a993cd9.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.