diff --git a/docs/content/dagster-plus/managing-deployments/deployment-settings-reference.mdx b/docs/content/dagster-plus/managing-deployments/deployment-settings-reference.mdx index d5a1785d454a8..6233d47a49be0 100644 --- a/docs/content/dagster-plus/managing-deployments/deployment-settings-reference.mdx +++ b/docs/content/dagster-plus/managing-deployments/deployment-settings-reference.mdx @@ -28,6 +28,7 @@ run_queue: run_monitoring: start_timeout_seconds: 1200 cancel_timeout_seconds: 1200 + max_runtime_seconds: 7200 run_retries: max_retries: 0 @@ -122,6 +123,7 @@ The `run_monitoring` settings allow you to define how long Dagster+ should wait run_monitoring: start_timeout_seconds: 1200 cancel_timeout_seconds: 1200 + max_runtime_seconds: 7200 ``` @@ -136,6 +138,29 @@ run_monitoring: + + The number of seconds that Dagster+ will wait after a run termination is + initiated for the process or container to terminate. After the timeout, the + run will move into a CANCELED state. This prevents runs from hanging in{" "} + CANCELING indefinitely when the process or container doesn't + terminate cleanly. + + + + The number of seconds that Dagster+ will wait after a run is moved into a + STARTED state for the run to complete. After the timeout, the run will be + terminated and moved into a FAILURE state. This prevents runs from hanging + in STARTED indefinitely if the process is hanging. + + ### Run retries (run_retries) diff --git a/docs/content/deployment/run-monitoring.mdx b/docs/content/deployment/run-monitoring.mdx index a874c233d1089..5bafc1acdd697 100644 --- a/docs/content/deployment/run-monitoring.mdx +++ b/docs/content/deployment/run-monitoring.mdx @@ -22,7 +22,7 @@ run_monitoring: ``` - In Dagster+ Run Monitoring is enabled by default and can be configured in{" "} + In Dagster+ Run Monitoring is always enabled and can be configured in{" "} deployment settings @@ -39,7 +39,22 @@ When Dagster terminates a run, the run moves into CANCELING status and sends a t ## General run timeouts -After a run is marked as STARTED, it may hang indefinitely for various reasons (user API errors, network issues, etc.). The `dagster/max_runtime` tag can be used to set a timeout in seconds on a per-run basis. If the run exceeds this timeout, and run monitoring is enabled, it will be marked as failed. +After a run is marked as STARTED, it may hang indefinitely for various reasons (user API errors, network issues, etc.). You can configure a maximum runtime for every run in a deployment by setting the `run_monitoring.max_runtime_seconds` field in your dagster.yaml or (Dagster+ deployment settings)\[dagster-plus/managing-deployments/deployment-settings-reference] to the maximum runtime in seconds. If a run exceeds this timeout and run monitoring is enabled, it will be marked as failed. The `dagster/max_runtime` tag can also be used to set a timeout in seconds on a per-run basis. + +For example, to configure a maximum of 2 hours for every run in your deployment: + +```yaml +run_monitoring: + enabled: true + max_runtime_seconds: 7200 +``` + +or in Dagster+, add the following to your [deployment settings](/dagster-plus/managing-deployments/deployment-settings-reference): + +```yaml +run_monitoring: + max_runtime_seconds: 7200 +``` The below code example shows how to set a run timeout of 10 seconds on a per-job basis: diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index dcd0e7d48185a..96646cdfab80e 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -886,6 +886,10 @@ def run_monitoring_start_timeout_seconds(self) -> int: def run_monitoring_cancel_timeout_seconds(self) -> int: return self.run_monitoring_settings.get("cancel_timeout_seconds", 180) + @property + def run_monitoring_max_runtime_seconds(self) -> int: + return self.run_monitoring_settings.get("max_runtime_seconds", 0) + @property def code_server_settings(self) -> Any: return self.get_settings("code_servers") diff --git a/python_modules/dagster/dagster/_core/instance/config.py b/python_modules/dagster/dagster/_core/instance/config.py index 59aa81a293d13..24dcd46c01ad1 100644 --- a/python_modules/dagster/dagster/_core/instance/config.py +++ b/python_modules/dagster/dagster/_core/instance/config.py @@ -355,6 +355,7 @@ def dagster_instance_config_schema() -> Mapping[str, Field]: "enabled": Field(Bool, is_required=False), "start_timeout_seconds": Field(int, is_required=False), "cancel_timeout_seconds": Field(int, is_required=False), + "max_runtime_seconds": Field(int, is_required=False), "max_resume_run_attempts": Field(int, is_required=False), "poll_interval_seconds": Field(int, is_required=False), "cancellation_thread_poll_interval_seconds": Field(int, is_required=False), diff --git a/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py b/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py index 7715420f8ead3..766eabac88cda 100644 --- a/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py +++ b/python_modules/dagster/dagster/_daemon/monitoring/run_monitoring.py @@ -156,7 +156,9 @@ def monitor_started_run( instance.report_run_failed(run, msg) # Return rather than immediately checking for a timeout, since we just failed return - check_run_timeout(instance, run_record, logger) + check_run_timeout( + instance, run_record, logger, float(instance.run_monitoring_max_runtime_seconds) + ) def execute_run_monitoring_iteration( @@ -208,15 +210,22 @@ def execute_run_monitoring_iteration( def check_run_timeout( - instance: DagsterInstance, run_record: RunRecord, logger: logging.Logger + instance: DagsterInstance, + run_record: RunRecord, + logger: logging.Logger, + default_timeout_seconds: float, ) -> None: + # Also allow dagster/max_runtime_seconds to match the global setting max_time_str = run_record.dagster_run.tags.get( - MAX_RUNTIME_SECONDS_TAG, + MAX_RUNTIME_SECONDS_TAG, run_record.dagster_run.tags.get("dagster/max_runtime_seconds") ) - if not max_time_str: - return + if max_time_str: + max_time = float(max_time_str) + else: + max_time = default_timeout_seconds - max_time = float(max_time_str) + if not max_time: + return if ( run_record.start_time is not None diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py b/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py index 613853f35cd49..da0dc67843d79 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_monitoring_daemon.py @@ -99,7 +99,11 @@ def instance(): "module": "dagster_tests.daemon_tests.test_monitoring_daemon", "class": "TestRunLauncher", }, - "run_monitoring": {"enabled": True, "max_resume_run_attempts": 3}, + "run_monitoring": { + "enabled": True, + "max_resume_run_attempts": 3, + "max_runtime_seconds": 750, + }, }, ) as instance: yield instance @@ -295,6 +299,12 @@ def test_long_running_termination( status=DagsterRunStatus.STARTING, tags={MAX_RUNTIME_SECONDS_TAG: "500"}, ) + too_long_run_other_tag_value = create_run_for_test( + instance, + job_name="foo", + status=DagsterRunStatus.STARTING, + tags={"dagster/max_runtime_seconds": "500"}, + ) okay_run = create_run_for_test( instance, job_name="foo", @@ -307,6 +317,7 @@ def test_long_running_termination( started_time = initial + datetime.timedelta(seconds=1) with freeze_time(started_time): report_started_event(instance, too_long_run, started_time.timestamp()) + report_started_event(instance, too_long_run_other_tag_value, started_time.timestamp()) report_started_event(instance, okay_run, started_time.timestamp()) report_started_event(instance, run_no_tag, started_time.timestamp()) @@ -315,6 +326,13 @@ def test_long_running_termination( assert too_long_record.dagster_run.status == DagsterRunStatus.STARTED assert too_long_record.start_time == started_time.timestamp() + too_long_other_tag_value_record = instance.get_run_record_by_id( + too_long_run_other_tag_value.run_id + ) + assert too_long_other_tag_value_record is not None + assert too_long_other_tag_value_record.dagster_run.status == DagsterRunStatus.STARTED + assert too_long_other_tag_value_record.start_time == started_time.timestamp() + okay_record = instance.get_run_record_by_id(okay_run.run_id) assert okay_record is not None assert okay_record.dagster_run.status == DagsterRunStatus.STARTED @@ -362,6 +380,41 @@ def test_long_running_termination( assert event assert event.message == "Exceeded maximum runtime of 500 seconds." + monitor_started_run(instance, workspace, too_long_other_tag_value_record, logger) + run = instance.get_run_by_id(too_long_other_tag_value_record.dagster_run.run_id) + assert run + assert len(run_launcher.termination_calls) == 2 + run = instance.get_run_by_id(too_long_other_tag_value_record.dagster_run.run_id) + assert run + assert run.status == DagsterRunStatus.FAILURE + + run_failure_events = instance.all_logs( + too_long_other_tag_value_record.dagster_run.run_id, + of_type=DagsterEventType.RUN_FAILURE, + ) + assert len(run_failure_events) == 1 + event = run_failure_events[0].dagster_event + assert event + assert event.message == "Exceeded maximum runtime of 500 seconds." + + # Wait long enough for the instance default to kick in + eval_time = started_time + datetime.timedelta(seconds=751) + with freeze_time(eval_time): + # Still overridden to 1000 so no problem + monitor_started_run(instance, workspace, okay_record, logger) + run = instance.get_run_by_id(okay_record.dagster_run.run_id) + assert run + # no new termination calls + assert len(run_launcher.termination_calls) == 2 + + monitor_started_run(instance, workspace, no_tag_record, logger) + run = instance.get_run_by_id(no_tag_record.dagster_run.run_id) + assert run + assert len(run_launcher.termination_calls) == 3 + run = instance.get_run_by_id(no_tag_record.dagster_run.run_id) + assert run + assert run.status == DagsterRunStatus.FAILURE + @pytest.mark.parametrize("failure_case", ["fail_termination", "termination_exception"]) def test_long_running_termination_failure(