diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx index 5e2fb14eb3c7d..b77d677d51c1f 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetLiveDataProvider.tsx @@ -2,6 +2,7 @@ import uniq from 'lodash/uniq'; import React, {useCallback, useMemo, useRef} from 'react'; import {AssetBaseData, __resetForJest as __resetBaseData} from './AssetBaseDataProvider'; +import {AssetSLAData} from './AssetSLADataProvider'; import { AssetStaleStatusData, __resetForJest as __resetStaleData, @@ -155,7 +156,9 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) = return ( - {children} + + {children} + ); }; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetSLADataProvider.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetSLADataProvider.tsx new file mode 100644 index 0000000000000..e1df5bfbe8cb9 --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/AssetSLADataProvider.tsx @@ -0,0 +1,49 @@ +import {useMemo} from 'react'; +import {getAssetSLAQueryResponse} from 'shared/asset-data/getAssetSLAQueryResponse.oss'; + +import {ApolloClient, useApolloClient} from '../apollo-client'; +import {tokenForAssetKey} from '../asset-graph/Utils'; +import {AssetKeyInput} from '../graphql/types'; +import {liveDataFactory} from '../live-data-provider/Factory'; +import {LiveDataThreadID} from '../live-data-provider/LiveDataThread'; +import {useBlockTraceUntilTrue} from '../performance/TraceContext'; + +export function init() { + return liveDataFactory( + () => { + return useApolloClient(); + }, + async (keys, client: ApolloClient) => { + return getAssetSLAQueryResponse(keys, client); + }, + ); +} + +export const AssetSLAData = init(); + +export function useAssetSLAData(assetKey: AssetKeyInput, thread: LiveDataThreadID = 'default') { + const result = AssetSLAData.useLiveDataSingle(tokenForAssetKey(assetKey), thread); + useBlockTraceUntilTrue('useAssetStaleData', !!result.liveData); + return result; +} + +export function useAssetsSLAData(assetKeys: AssetKeyInput[], thread: LiveDataThreadID = 'default') { + const result = AssetSLAData.useLiveData( + useMemo(() => assetKeys.map((key) => tokenForAssetKey(key)), [assetKeys]), + thread, + ); + useBlockTraceUntilTrue( + 'useAssetsSLAData', + !!(Object.keys(result.liveDataByNode).length === assetKeys.length), + ); + return result; +} + +export function AssetSLADataRefreshButton() { + return ; +} + +// For tests +export function __resetForJest() { + Object.assign(AssetSLAData, init()); +} diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-data/getAssetSLAQueryResponse.oss.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-data/getAssetSLAQueryResponse.oss.tsx new file mode 100644 index 0000000000000..3b4c457752894 --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-data/getAssetSLAQueryResponse.oss.tsx @@ -0,0 +1,5 @@ +import {ApolloClient} from '../apollo-client'; + +export const getAssetSLAQueryResponse = (_assetKeys: string[], _client: ApolloClient) => { + return {}; +}; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNode.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNode.tsx index 0fa63fd63d975..c24c42beedbc2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNode.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNode.tsx @@ -2,9 +2,11 @@ import {Box, Colors, FontFamily, Icon, Tooltip} from '@dagster-io/ui-components' import isEqual from 'lodash/isEqual'; import * as React from 'react'; import {Link} from 'react-router-dom'; +import {AssetNodeSLARow} from 'shared/asset-graph/AssetNodeSLARow.oss'; import styled, {CSSObject} from 'styled-components'; import {AssetNodeMenuProps, useAssetNodeMenu} from './AssetNodeMenu'; +import {AssetNodeRowBox} from './AssetNodeRowBox'; import {buildAssetNodeStatusContent} from './AssetNodeStatusContent'; import {ContextMenuWrapper} from './ContextMenuWrapper'; import {LiveDataForNode} from './Utils'; @@ -70,6 +72,7 @@ export const AssetNode = React.memo( {hasChecks && } + { ); }; -const AssetNodeRowBox = styled(Box)` - white-space: nowrap; - line-height: 12px; - font-size: 12px; - height: 24px; - a:hover { - text-decoration: none; - } - &:last-child { - border-bottom-left-radius: 8px; - border-bottom-right-radius: 8px; - } -`; - interface StatusRowProps { definition: AssetNodeFragment; liveData: LiveDataForNode | undefined; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNodeRowBox.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNodeRowBox.tsx new file mode 100644 index 0000000000000..683534bcbbbed --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNodeRowBox.tsx @@ -0,0 +1,16 @@ +import {Box} from '@dagster-io/ui-components'; +import styled from 'styled-components'; + +export const AssetNodeRowBox = styled(Box)` + white-space: nowrap; + line-height: 12px; + font-size: 12px; + height: 24px; + a:hover { + text-decoration: none; + } + &:last-child { + border-bottom-left-radius: 8px; + border-bottom-right-radius: 8px; + } +`; diff --git a/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNodeSLARow.oss.tsx b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNodeSLARow.oss.tsx new file mode 100644 index 0000000000000..cb8882cd7dc98 --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/asset-graph/AssetNodeSLARow.oss.tsx @@ -0,0 +1,7 @@ +import {AssetKey} from '../assets/types'; + +export interface AssetNodeSLARowProps { + assetKey: AssetKey; +} + +export const AssetNodeSLARow = (_props: AssetNodeSLARowProps) => null; diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/overview/AssetNodeOverview.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/overview/AssetNodeOverview.tsx index e5f19dd5fc35b..38889e2737bd1 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/overview/AssetNodeOverview.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/overview/AssetNodeOverview.tsx @@ -10,6 +10,7 @@ import { import React, {useMemo} from 'react'; import {Link} from 'react-router-dom'; import {AssetAlertsSection} from 'shared/assets/AssetAlertsSection.oss'; +import {AssetNodeOverviewSLASummary} from 'shared/assets/overview/AssetNodeOverviewSLASummary.oss'; import {AssetEventMetadataEntriesTable} from '../AssetEventMetadataEntriesTable'; import {metadataForAssetNode} from '../AssetMetadata'; @@ -115,7 +116,7 @@ export const AssetNodeOverview = ({ const renderStatusSection = () => ( - + Latest {assetNode?.isObservable ? 'observation' : 'materialization'} @@ -144,6 +145,7 @@ export const AssetNodeOverview = ({ /> ) : undefined} + {rowCountMeta?.intValue ? ( Row count diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/overview/AssetNodeOverviewSLASummary.oss.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/overview/AssetNodeOverviewSLASummary.oss.tsx new file mode 100644 index 0000000000000..2790b62b70f62 --- /dev/null +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/overview/AssetNodeOverviewSLASummary.oss.tsx @@ -0,0 +1,9 @@ +import {AssetKey} from '../types'; + +export type AssetNodeOverviewSLASummaryProps = { + assetKey: AssetKey; +}; + +export const AssetNodeOverviewSLASummary = ({ + assetKey: _assetKey, +}: AssetNodeOverviewSLASummaryProps) => null; diff --git a/python_modules/dagster-test/dagster_test/toys/basic_assets.py b/python_modules/dagster-test/dagster_test/toys/basic_assets.py index a99245aebee37..07fe38d207be5 100644 --- a/python_modules/dagster-test/dagster_test/toys/basic_assets.py +++ b/python_modules/dagster-test/dagster_test/toys/basic_assets.py @@ -1,4 +1,8 @@ from dagster import AssetSelection, MetadataValue, asset, define_asset_job +from dagster._core.definitions.asset_check_spec import AssetCheckSpec +from dagster._core.definitions.declarative_automation.automation_condition import ( + AutomationCondition, +) @asset(group_name="basic_assets") @@ -17,6 +21,22 @@ def basic_asset_3(basic_asset_1): ... def basic_asset_4(basic_asset_2, basic_asset_3): ... +@asset( + automation_condition=AutomationCondition.on_cron("* * * * *"), + group_name="SLA_demo", + check_specs=[ + AssetCheckSpec( + name="my_favorite_check", asset="my_cron_asset", description="This is my favorite check" + ) + ], +) +def my_cron_asset(): ... + + +@asset(group_name="SLA_demo") +def my_sensored_asset(): ... + + basic_assets_job = define_asset_job( "basic_assets_job", selection=AssetSelection.groups("basic_assets"), diff --git a/python_modules/dagster/dagster/_core/definitions/sla.py b/python_modules/dagster/dagster/_core/definitions/sla.py new file mode 100644 index 0000000000000..ac4f0adbc560d --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/sla.py @@ -0,0 +1,67 @@ +# SLA definition stuff +from collections.abc import Mapping +from typing import TYPE_CHECKING, Any, Union + +from dagster._core.definitions.events import AssetKey +from dagster._record import record +from dagster._serdes import whitelist_for_serdes + +if TYPE_CHECKING: + from dagster._core.events import DagsterEventType + + +@whitelist_for_serdes +@record +class SlaViolating: + sla_name: str + asset_key: AssetKey + # The timestamp at which the asset began violating its SLA. + violating_since: float + last_event_timestamp: float + last_event_storage_id: int + reason_md: str + metadata: Mapping[str, Any] + + @classmethod + def dagster_event_type(cls) -> "DagsterEventType": + from dagster._core.events import DagsterEventType + + return DagsterEventType.SLA_VIOLATING + + +@whitelist_for_serdes +@record +class SlaPassing: + sla_name: str + asset_key: AssetKey + # The associated event log entry ID which led to the asset passing its SLA. + last_event_storage_id: int + # Time at which the passing event occurred + last_event_timestamp: float + reason_md: str + metadata: Mapping[str, Any] + + @classmethod + def dagster_event_type(cls) -> "DagsterEventType": + from dagster._core.events import DagsterEventType + + return DagsterEventType.SLA_PASSING + + +# If the asset is missing and therefore the SLA cannot be evaluated. +@whitelist_for_serdes +@record +class SlaUnknown: + sla_name: str + asset_key: AssetKey + reason_md: str + metadata: Mapping[str, Any] + + @classmethod + def dagster_event_type(cls) -> "DagsterEventType": + from dagster._core.events import DagsterEventType + + return DagsterEventType.SLA_UNKNOWN + + +SlaEvaluationResult = Union[SlaPassing, SlaViolating, SlaUnknown] diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index 1243209ba3720..70d5ebfe4b624 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -56,6 +56,7 @@ if TYPE_CHECKING: from dagster._core.definitions.events import ObjectStoreOperation + from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating from dagster._core.execution.plan.plan import ExecutionPlan from dagster._core.execution.plan.step import StepKind @@ -80,6 +81,9 @@ "AssetMaterializationPlannedData", "AssetCheckEvaluation", "AssetCheckEvaluationPlanned", + "SlaViolating", + "SlaPassing", + "SlaUnknown", ] @@ -155,6 +159,10 @@ class DagsterEventType(str, Enum): LOGS_CAPTURED = "LOGS_CAPTURED" + SLA_PASSING = "SLA_PASSING" + SLA_VIOLATING = "SLA_VIOLATING" + SLA_UNKNOWN = "SLA_UNKNOWN" + EVENT_TYPE_TO_DISPLAY_STRING = { DagsterEventType.PIPELINE_ENQUEUED: "RUN_ENQUEUED", diff --git a/python_modules/dagster/dagster/_core/events/log.py b/python_modules/dagster/dagster/_core/events/log.py index ef79ec68d2814..e781d0071a9f6 100644 --- a/python_modules/dagster/dagster/_core/events/log.py +++ b/python_modules/dagster/dagster/_core/events/log.py @@ -6,6 +6,7 @@ from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.events import AssetMaterialization, AssetObservation from dagster._core.definitions.logger_definition import LoggerDefinition +from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating from dagster._core.events import DagsterEvent, DagsterEventType from dagster._core.utils import coerce_valid_log_level from dagster._serdes.serdes import deserialize_value, serialize_value, whitelist_for_serdes @@ -141,6 +142,18 @@ def asset_materialization(self) -> Optional[AssetMaterialization]: return None + @property + def sla_violating(self) -> Optional[SlaViolating]: + pass + + @property + def sla_passing(self) -> Optional[SlaPassing]: + pass + + @property + def sla_unknown(self) -> Optional[SlaUnknown]: + pass + @property def asset_observation(self) -> Optional[AssetObservation]: if ( diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index a2f39a7c367e9..e852a4dec6e02 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -112,6 +112,7 @@ RepositoryLoadData, ) from dagster._core.definitions.run_request import InstigatorType + from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating from dagster._core.event_api import ( AssetRecordsFilter, EventHandlerFn, @@ -3278,9 +3279,17 @@ def get_latest_materialization_code_versions( @public def report_runless_asset_event( self, - asset_event: Union["AssetMaterialization", "AssetObservation", "AssetCheckEvaluation"], + asset_event: Union[ + "AssetMaterialization", + "AssetObservation", + "AssetCheckEvaluation", + "SlaViolating", + "SlaPassing", + "SlaUnknown", + ], ): """Record an event log entry related to assets that does not belong to a Dagster run.""" + from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating from dagster._core.events import ( AssetMaterialization, AssetObservationData, @@ -3298,6 +3307,9 @@ def report_runless_asset_event( elif isinstance(asset_event, AssetObservation): event_type_value = DagsterEventType.ASSET_OBSERVATION.value data_payload = AssetObservationData(asset_event) + elif isinstance(asset_event, (SlaViolating, SlaPassing, SlaUnknown)): + event_type_value = asset_event.dagster_event_type().value + data_payload = asset_event else: raise DagsterInvariantViolationError( f"Received unexpected asset event type {asset_event}, expected" diff --git a/python_modules/dagster/dagster/_daemon/auto_run_reexecution/event_log_consumer.py b/python_modules/dagster/dagster/_daemon/auto_run_reexecution/event_log_consumer.py index b9cf735371a19..89af75162e191 100644 --- a/python_modules/dagster/dagster/_daemon/auto_run_reexecution/event_log_consumer.py +++ b/python_modules/dagster/dagster/_daemon/auto_run_reexecution/event_log_consumer.py @@ -24,6 +24,7 @@ DAGSTER_EVENT_TYPES = [DagsterEventType.RUN_FAILURE, DagsterEventType.RUN_SUCCESS] +# Should rename this the eventlogrunconsumer or something class EventLogConsumerDaemon(IntervalDaemon): def __init__( self, diff --git a/python_modules/dagster/dagster/_time/__init__.py b/python_modules/dagster/dagster/_time/__init__.py index 49ede7ebe4722..8162181cd2667 100644 --- a/python_modules/dagster/dagster/_time/__init__.py +++ b/python_modules/dagster/dagster/_time/__init__.py @@ -1,6 +1,7 @@ +import os import time from datetime import datetime, timedelta, timezone, tzinfo -from typing import Union +from typing import Optional, Union import dagster._check as check from dagster._vendored.dateutil import parser @@ -12,8 +13,23 @@ from dagster._vendored.dateutil.tz import gettz as _timezone_from_string +def _get_frozen_timestamp_from_fs() -> Optional[float]: + # If DAGSTER_FROZEN_TIME_PATH is set, timestamp will be read from there instead of + # current time. This allows you to effectively freeze time across all dagster processes. + timestamp_file_path = os.getenv("DAGSTER_FROZEN_TIME_PATH") + if timestamp_file_path and os.path.exists(timestamp_file_path): + with open(timestamp_file_path) as f: + timestamp_str = f.read().strip() + return float(timestamp_str) + else: + return None + + def _mockable_get_current_datetime() -> datetime: # Can be mocked in tests by freeze_time() + fs_timestamp = _get_frozen_timestamp_from_fs() + if fs_timestamp: + return datetime_from_timestamp(fs_timestamp, tz=timezone.utc) return datetime.now(tz=timezone.utc) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py index 8abbb491c92eb..222edc177a54e 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py @@ -188,6 +188,9 @@ def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult: context.log.info( f"************Exiting sensor for {airflow_data.airflow_instance.name}***********" ) + context.log.info( + f"Asset events: {len(updated_asset_events)}, Check keys: {len(all_check_keys)}" + ) return SensorResult( asset_events=sorted_asset_events(updated_asset_events, repository_def), run_requests=[RunRequest(asset_check_keys=list(all_check_keys))] diff --git a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/minute_dag.py b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/minute_dag.py new file mode 100644 index 0000000000000..68e348db25edc --- /dev/null +++ b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/minute_dag.py @@ -0,0 +1,33 @@ +from pathlib import Path + +from airflow import DAG +from airflow.operators.python import PythonOperator +from dagster._time import get_current_datetime_midnight +from dagster_airlift.in_airflow import proxying_to_dagster +from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml + + +def print_hello() -> None: + print("Hello") # noqa: T201 + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "retries": 0, +} + +with DAG( + dag_id="minute_dag", + default_args=default_args, + schedule="* * * * *", + start_date=get_current_datetime_midnight(), + is_paused_upon_creation=True, +) as minute_dag: + PythonOperator(task_id="task", python_callable=print_hello) + + +proxying_to_dagster( + proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), + global_vars=globals(), +) diff --git a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_defs/clean_defs.py b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_defs/clean_defs.py new file mode 100644 index 0000000000000..4198f16b15cba --- /dev/null +++ b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_defs/clean_defs.py @@ -0,0 +1,34 @@ +from dagster import Definitions, multi_asset +from dagster._core.execution.context.asset_execution_context import AssetExecutionContext +from dagster_airlift.core import build_defs_from_airflow_instance, load_airflow_dag_asset_specs + +from kitchen_sink.airflow_instance import local_airflow_instance + + +def build_mapped_defs() -> Definitions: + return build_defs_from_airflow_instance( + airflow_instance=local_airflow_instance(), + dag_selector_fn=lambda dag: not dag.dag_id.startswith("unmapped"), + sensor_minimum_interval_seconds=1, + ) + + +unmapped_specs = load_airflow_dag_asset_specs( + airflow_instance=local_airflow_instance(), + dag_selector_fn=lambda dag: dag.dag_id.startswith("unmapped"), +) + + +@multi_asset(specs=unmapped_specs) +def materialize_dags(context: AssetExecutionContext): + for spec in unmapped_specs: + af_instance = local_airflow_instance() + dag_id = spec.metadata["Dag ID"] + dag_run_id = af_instance.trigger_dag(dag_id=dag_id) + af_instance.wait_for_run_completion(dag_id=dag_id, run_id=dag_run_id) + state = af_instance.get_run_state(dag_id=dag_id, run_id=dag_run_id) + if state != "success": + raise Exception(f"Failed to materialize {dag_id} with state {state}") + + +defs = Definitions.merge(build_mapped_defs(), Definitions([materialize_dags])) diff --git a/scripts/update_frozen_time_fs.py b/scripts/update_frozen_time_fs.py new file mode 100644 index 0000000000000..6ee6bcc833b52 --- /dev/null +++ b/scripts/update_frozen_time_fs.py @@ -0,0 +1,76 @@ +import argparse +import os +import re +from datetime import datetime + + +def parse_time_string(time_str): + """Parse a human-readable time string into seconds. + Examples: "1 hour 30 minutes", "2 days", "45 seconds" + """ + total_seconds = 0 + # Extract all number + unit pairs + patterns = [ + (r"(\d+)\s*(?:hour|hours|hr|hrs)", 3600), # hours to seconds + (r"(\d+)\s*(?:minute|minutes|min|mins)", 60), # minutes to seconds + (r"(\d+)\s*(?:second|seconds|sec|secs)", 1), # seconds + (r"(\d+)\s*(?:day|days)", 86400), # days to seconds + (r"(\d+)\s*(?:week|weeks)", 604800), # weeks to seconds + ] + + for pattern, multiplier in patterns: + matches = re.findall(pattern, time_str, re.IGNORECASE) + for match in matches: + total_seconds += int(match) * multiplier + + return total_seconds + + +def update_timestamp(delta_str, path=None): + """Add a time delta to the timestamp stored in the file.""" + if path is None: + path = os.environ.get("DAGSTER_FROZEN_TIME_PATH") + if not path: + raise ValueError("DAGSTER_FROZEN_TIME_PATH environment variable is not set") + + # Parse the delta string + delta_seconds = parse_time_string(delta_str) + + # Read the current timestamp + try: + with open(path) as f: + current_timestamp = float(f.read().strip()) + except FileNotFoundError: + print(f"File not found: {path}") + return + except ValueError: + print(f"Invalid timestamp in file: {path}") + return + + # Update the timestamp + new_timestamp = current_timestamp + delta_seconds + + # Write the new timestamp back to the file + with open(path, "w") as f: + f.write(str(new_timestamp)) + + # Print information about the update + current_time = datetime.fromtimestamp(current_timestamp) + new_time = datetime.fromtimestamp(new_timestamp) + print("Updated timestamp:") + print(f" From: {current_timestamp} ({current_time})") + print(f" To: {new_timestamp} ({new_time})") + print(f" Delta: {delta_str} ({delta_seconds} seconds)") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Update a timestamp by adding a time delta") + parser.add_argument( + "delta", help="Time delta in human readable format (e.g., '1 hour 30 minutes')" + ) + parser.add_argument( + "--path", help="Path to the timestamp file (defaults to DAGSTER_FROZEN_TIME_PATH env var)" + ) + + args = parser.parse_args() + update_timestamp(args.delta, args.path)