Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sla] POC #28062

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import uniq from 'lodash/uniq';
import React, {useCallback, useMemo, useRef} from 'react';

import {AssetBaseData, __resetForJest as __resetBaseData} from './AssetBaseDataProvider';
import {AssetSLAData} from './AssetSLADataProvider';
import {
AssetStaleStatusData,
__resetForJest as __resetStaleData,
Expand Down Expand Up @@ -155,7 +156,9 @@ export const AssetLiveDataProvider = ({children}: {children: React.ReactNode}) =

return (
<AssetBaseData.LiveDataProvider>
<AssetStaleStatusData.LiveDataProvider>{children}</AssetStaleStatusData.LiveDataProvider>
<AssetStaleStatusData.LiveDataProvider>
<AssetSLAData.LiveDataProvider>{children}</AssetSLAData.LiveDataProvider>
</AssetStaleStatusData.LiveDataProvider>
</AssetBaseData.LiveDataProvider>
);
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import {useMemo} from 'react';
import {getAssetSLAQueryResponse} from 'shared/asset-data/getAssetSLAQueryResponse.oss';

import {ApolloClient, useApolloClient} from '../apollo-client';
import {tokenForAssetKey} from '../asset-graph/Utils';
import {AssetKeyInput} from '../graphql/types';
import {liveDataFactory} from '../live-data-provider/Factory';
import {LiveDataThreadID} from '../live-data-provider/LiveDataThread';
import {useBlockTraceUntilTrue} from '../performance/TraceContext';

export function init() {
return liveDataFactory(
() => {
return useApolloClient();
},
async (keys, client: ApolloClient<any>) => {
return getAssetSLAQueryResponse(keys, client);
},
);
}

export const AssetSLAData = init();

export function useAssetSLAData(assetKey: AssetKeyInput, thread: LiveDataThreadID = 'default') {
const result = AssetSLAData.useLiveDataSingle(tokenForAssetKey(assetKey), thread);
useBlockTraceUntilTrue('useAssetStaleData', !!result.liveData);
return result;
}

export function useAssetsSLAData(assetKeys: AssetKeyInput[], thread: LiveDataThreadID = 'default') {
const result = AssetSLAData.useLiveData(
useMemo(() => assetKeys.map((key) => tokenForAssetKey(key)), [assetKeys]),
thread,
);
useBlockTraceUntilTrue(
'useAssetsSLAData',
!!(Object.keys(result.liveDataByNode).length === assetKeys.length),
);
return result;
}

export function AssetSLADataRefreshButton() {
return <AssetSLAData.LiveDataRefresh />;
}

// For tests
export function __resetForJest() {
Object.assign(AssetSLAData, init());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {ApolloClient} from '../apollo-client';

export const getAssetSLAQueryResponse = (_assetKeys: string[], _client: ApolloClient<any>) => {
return {};
};
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import {Box, Colors, FontFamily, Icon, Tooltip} from '@dagster-io/ui-components'
import isEqual from 'lodash/isEqual';
import * as React from 'react';
import {Link} from 'react-router-dom';
import {AssetNodeSLARow} from 'shared/asset-graph/AssetNodeSLARow.oss';
import styled, {CSSObject} from 'styled-components';

import {AssetNodeMenuProps, useAssetNodeMenu} from './AssetNodeMenu';
import {AssetNodeRowBox} from './AssetNodeRowBox';
import {buildAssetNodeStatusContent} from './AssetNodeStatusContent';
import {ContextMenuWrapper} from './ContextMenuWrapper';
import {LiveDataForNode} from './Utils';
Expand Down Expand Up @@ -70,6 +72,7 @@ export const AssetNode = React.memo(

<AssetNodeStatusRow definition={definition} liveData={liveData} />
{hasChecks && <AssetNodeChecksRow definition={definition} liveData={liveData} />}
<AssetNodeSLARow assetKey={definition.assetKey} />
</AssetNodeBox>
<Box
style={{minHeight: ASSET_NODE_TAGS_HEIGHT}}
Expand Down Expand Up @@ -114,20 +117,6 @@ export const AssetNameRow = ({definition}: {definition: AssetNodeFragment}) => {
);
};

const AssetNodeRowBox = styled(Box)`
white-space: nowrap;
line-height: 12px;
font-size: 12px;
height: 24px;
a:hover {
text-decoration: none;
}
&:last-child {
border-bottom-left-radius: 8px;
border-bottom-right-radius: 8px;
}
`;

interface StatusRowProps {
definition: AssetNodeFragment;
liveData: LiveDataForNode | undefined;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import {Box} from '@dagster-io/ui-components';
import styled from 'styled-components';

export const AssetNodeRowBox = styled(Box)`
white-space: nowrap;
line-height: 12px;
font-size: 12px;
height: 24px;
a:hover {
text-decoration: none;
}
&:last-child {
border-bottom-left-radius: 8px;
border-bottom-right-radius: 8px;
}
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {AssetKey} from '../assets/types';

export interface AssetNodeSLARowProps {
assetKey: AssetKey;
}

export const AssetNodeSLARow = (_props: AssetNodeSLARowProps) => null;
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import React, {useMemo} from 'react';
import {Link} from 'react-router-dom';
import {AssetAlertsSection} from 'shared/assets/AssetAlertsSection.oss';
import {AssetNodeOverviewSLASummary} from 'shared/assets/overview/AssetNodeOverviewSLASummary.oss';

import {AssetEventMetadataEntriesTable} from '../AssetEventMetadataEntriesTable';
import {metadataForAssetNode} from '../AssetMetadata';
Expand Down Expand Up @@ -115,7 +116,7 @@ export const AssetNodeOverview = ({

const renderStatusSection = () => (
<Box flex={{direction: 'column', gap: 16}}>
<Box style={{display: 'grid', gridTemplateColumns: 'repeat(3, minmax(0, 1fr))'}}>
<Box style={{display: 'grid', gridTemplateColumns: 'repeat(4, minmax(0, 1fr))'}}>
<Box flex={{direction: 'column', gap: 6}}>
<Subtitle2>
Latest {assetNode?.isObservable ? 'observation' : 'materialization'}
Expand Down Expand Up @@ -144,6 +145,7 @@ export const AssetNodeOverview = ({
/>
</Box>
) : undefined}
<AssetNodeOverviewSLASummary assetKey={cachedOrLiveAssetNode.assetKey} />
{rowCountMeta?.intValue ? (
<Box flex={{direction: 'column', gap: 6}}>
<Subtitle2>Row count</Subtitle2>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import {AssetKey} from '../types';

export type AssetNodeOverviewSLASummaryProps = {
assetKey: AssetKey;
};

export const AssetNodeOverviewSLASummary = ({
assetKey: _assetKey,
}: AssetNodeOverviewSLASummaryProps) => null;
20 changes: 20 additions & 0 deletions python_modules/dagster-test/dagster_test/toys/basic_assets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from dagster import AssetSelection, MetadataValue, asset, define_asset_job
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)


@asset(group_name="basic_assets")
Expand All @@ -17,6 +21,22 @@ def basic_asset_3(basic_asset_1): ...
def basic_asset_4(basic_asset_2, basic_asset_3): ...


@asset(
automation_condition=AutomationCondition.on_cron("* * * * *"),
group_name="SLA_demo",
check_specs=[
AssetCheckSpec(
name="my_favorite_check", asset="my_cron_asset", description="This is my favorite check"
)
],
)
def my_cron_asset(): ...


@asset(group_name="SLA_demo")
def my_sensored_asset(): ...


basic_assets_job = define_asset_job(
"basic_assets_job",
selection=AssetSelection.groups("basic_assets"),
Expand Down
67 changes: 67 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/sla.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# SLA definition stuff
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Union

from dagster._core.definitions.events import AssetKey
from dagster._record import record
from dagster._serdes import whitelist_for_serdes

if TYPE_CHECKING:
from dagster._core.events import DagsterEventType


@whitelist_for_serdes
@record
class SlaViolating:
sla_name: str
asset_key: AssetKey
# The timestamp at which the asset began violating its SLA.
violating_since: float
last_event_timestamp: float
last_event_storage_id: int
reason_md: str
metadata: Mapping[str, Any]

@classmethod
def dagster_event_type(cls) -> "DagsterEventType":
from dagster._core.events import DagsterEventType

return DagsterEventType.SLA_VIOLATING


@whitelist_for_serdes
@record
class SlaPassing:
sla_name: str
asset_key: AssetKey
# The associated event log entry ID which led to the asset passing its SLA.
last_event_storage_id: int
# Time at which the passing event occurred
last_event_timestamp: float
reason_md: str
metadata: Mapping[str, Any]

@classmethod
def dagster_event_type(cls) -> "DagsterEventType":
from dagster._core.events import DagsterEventType

return DagsterEventType.SLA_PASSING


# If the asset is missing and therefore the SLA cannot be evaluated.
@whitelist_for_serdes
@record
class SlaUnknown:
sla_name: str
asset_key: AssetKey
reason_md: str
metadata: Mapping[str, Any]

@classmethod
def dagster_event_type(cls) -> "DagsterEventType":
from dagster._core.events import DagsterEventType

return DagsterEventType.SLA_UNKNOWN


SlaEvaluationResult = Union[SlaPassing, SlaViolating, SlaUnknown]
8 changes: 8 additions & 0 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

if TYPE_CHECKING:
from dagster._core.definitions.events import ObjectStoreOperation
from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating
from dagster._core.execution.plan.plan import ExecutionPlan
from dagster._core.execution.plan.step import StepKind

Expand All @@ -80,6 +81,9 @@
"AssetMaterializationPlannedData",
"AssetCheckEvaluation",
"AssetCheckEvaluationPlanned",
"SlaViolating",
"SlaPassing",
"SlaUnknown",
]


Expand Down Expand Up @@ -155,6 +159,10 @@ class DagsterEventType(str, Enum):

LOGS_CAPTURED = "LOGS_CAPTURED"

SLA_PASSING = "SLA_PASSING"
SLA_VIOLATING = "SLA_VIOLATING"
SLA_UNKNOWN = "SLA_UNKNOWN"


EVENT_TYPE_TO_DISPLAY_STRING = {
DagsterEventType.PIPELINE_ENQUEUED: "RUN_ENQUEUED",
Expand Down
13 changes: 13 additions & 0 deletions python_modules/dagster/dagster/_core/events/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
from dagster._core.definitions.events import AssetMaterialization, AssetObservation
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating
from dagster._core.events import DagsterEvent, DagsterEventType
from dagster._core.utils import coerce_valid_log_level
from dagster._serdes.serdes import deserialize_value, serialize_value, whitelist_for_serdes
Expand Down Expand Up @@ -141,6 +142,18 @@ def asset_materialization(self) -> Optional[AssetMaterialization]:

return None

@property
def sla_violating(self) -> Optional[SlaViolating]:
pass

@property
def sla_passing(self) -> Optional[SlaPassing]:
pass

@property
def sla_unknown(self) -> Optional[SlaUnknown]:
pass

@property
def asset_observation(self) -> Optional[AssetObservation]:
if (
Expand Down
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
RepositoryLoadData,
)
from dagster._core.definitions.run_request import InstigatorType
from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating
from dagster._core.event_api import (
AssetRecordsFilter,
EventHandlerFn,
Expand Down Expand Up @@ -3278,9 +3279,17 @@ def get_latest_materialization_code_versions(
@public
def report_runless_asset_event(
self,
asset_event: Union["AssetMaterialization", "AssetObservation", "AssetCheckEvaluation"],
asset_event: Union[
"AssetMaterialization",
"AssetObservation",
"AssetCheckEvaluation",
"SlaViolating",
"SlaPassing",
"SlaUnknown",
],
):
"""Record an event log entry related to assets that does not belong to a Dagster run."""
from dagster._core.definitions.sla import SlaPassing, SlaUnknown, SlaViolating
from dagster._core.events import (
AssetMaterialization,
AssetObservationData,
Expand All @@ -3298,6 +3307,9 @@ def report_runless_asset_event(
elif isinstance(asset_event, AssetObservation):
event_type_value = DagsterEventType.ASSET_OBSERVATION.value
data_payload = AssetObservationData(asset_event)
elif isinstance(asset_event, (SlaViolating, SlaPassing, SlaUnknown)):
event_type_value = asset_event.dagster_event_type().value
data_payload = asset_event
else:
raise DagsterInvariantViolationError(
f"Received unexpected asset event type {asset_event}, expected"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
DAGSTER_EVENT_TYPES = [DagsterEventType.RUN_FAILURE, DagsterEventType.RUN_SUCCESS]


# Should rename this the eventlogrunconsumer or something
class EventLogConsumerDaemon(IntervalDaemon):
def __init__(
self,
Expand Down
Loading