forked from yu-iskw/dbt-artifacts-parser
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun_results_v4.py
147 lines (113 loc) · 3.32 KB
/
run_results_v4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# generated by datamodel-codegen:
# filename: run-results_v4.json
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from pydantic import ConfigDict
from collate_dbt_artifacts_parser.parsers.base import BaseParserModel
class BaseArtifactMetadata(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
dbt_schema_version: str
dbt_version: Optional[str] = "1.0.0b2"
generated_at: Optional[datetime] = "2021-11-02T20:18:06.799863Z"
invocation_id: Optional[str] = None
env: Optional[Dict[str, str]] = {}
class Status(Enum):
success = "success"
error = "error"
skipped = "skipped"
class Status1(Enum):
pass_ = "pass"
error = "error"
fail = "fail"
warn = "warn"
skipped = "skipped"
class Status2(Enum):
pass_ = "pass"
warn = "warn"
error = "error"
runtime_error = "runtime error"
class TimingInfo(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
name: str
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class FreshnessMetadata(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
dbt_schema_version: Optional[str] = "https://schemas.getdbt.com/dbt/sources/v3.json"
dbt_version: Optional[str] = "1.0.0b2"
generated_at: Optional[datetime] = "2021-11-02T20:18:06.796684Z"
invocation_id: Optional[str] = None
env: Optional[Dict[str, str]] = {}
class Status3(Enum):
runtime_error = "runtime error"
class SourceFreshnessRuntimeError(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
unique_id: str
error: Optional[Union[str, int]] = None
status: Status3
class Status4(Enum):
pass_ = "pass"
warn = "warn"
error = "error"
runtime_error = "runtime error"
class Period(Enum):
minute = "minute"
hour = "hour"
day = "day"
class Time(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
count: Optional[int] = None
period: Optional[Period] = None
class RunResultOutput(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
status: Union[Status, Status1, Status2]
timing: List[TimingInfo]
thread_id: str
execution_time: float
adapter_response: Dict[str, Any]
message: Optional[str] = None
failures: Optional[int] = None
unique_id: str
class FreshnessThreshold(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
warn_after: Optional[Time] = {"count": None, "period": None}
error_after: Optional[Time] = {"count": None, "period": None}
filter: Optional[str] = None
class RunResultsV4(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
metadata: BaseArtifactMetadata
results: List[RunResultOutput]
elapsed_time: float
args: Optional[Dict[str, Any]] = {}
class SourceFreshnessOutput(BaseParserModel):
model_config = ConfigDict(
extra="forbid",
)
unique_id: str
max_loaded_at: datetime
snapshotted_at: datetime
max_loaded_at_time_ago_in_s: float
status: Status4
criteria: FreshnessThreshold
adapter_response: Dict[str, Any]
timing: List[TimingInfo]
thread_id: str
execution_time: float