Skip to content

Commit

Permalink
feat: return a handle to the dlt pipeline after executing a spec
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed May 15, 2024
1 parent 30d9e4b commit 7415aca
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 39 deletions.
9 changes: 6 additions & 3 deletions src/cdf/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ def discover(
for i, source in enumerate(
execute_pipeline_specification(
spec, "dummy", dry_run=True, quiet=not no_quiet
).unwrap()
)
.map(lambda rv: rv.pipeline.tracked_sources)
.unwrap()
):
console.print(f"{i}: {source.name}")
for j, resource in enumerate(source.resources.values(), 1):
Expand Down Expand Up @@ -243,7 +245,9 @@ def head(
resource
for src in execute_pipeline_specification(
spec, "dummy", dry_run=True, quiet=True
).unwrap()
)
.map(lambda rv: rv.pipeline.tracked_sources)
.unwrap()
for resource in src.resources.values()
),
),
Expand Down Expand Up @@ -478,7 +482,6 @@ def export_schema(
.unwrap_or((destination, None))
)
spec = workspace.get_pipeline(source).unwrap()

for src in execute_pipeline_specification(
spec, sink, dry_run=True, quiet=True
).unwrap():
Expand Down
49 changes: 13 additions & 36 deletions src/cdf/core/runtime/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ def source_hooks(
"""The source hooks for the pipeline."""
return self._source_hooks

@property
def tracked_sources(self) -> t.Set[dlt.sources.DltSource]:
"""The sources tracked by the pipeline."""
return self._tracked_sources

def extract(
self,
data: t.Any,
Expand Down Expand Up @@ -248,36 +253,11 @@ def run(
)


@t.overload
def execute_pipeline_specification(
spec: PipelineSpecification,
destination: TDestinationReferenceArg,
staging: t.Optional[TDestinationReferenceArg] = None,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
force_replace: bool = False,
dry_run: t.Literal[False] = False,
enable_stage: bool = True,
quiet: bool = False,
metric_container: t.Optional[t.Dict[str, t.Any]] = None,
**pipeline_options: t.Any,
) -> M.Result[t.Dict[str, t.Any], Exception]: ...
class PipelineResult(t.NamedTuple):
"""The result of executing a pipeline specification."""


@t.overload
def execute_pipeline_specification(
spec: PipelineSpecification,
destination: TDestinationReferenceArg,
staging: t.Optional[TDestinationReferenceArg] = None,
select: t.Optional[t.List[str]] = None,
exclude: t.Optional[t.List[str]] = None,
force_replace: bool = False,
dry_run: t.Literal[True] = True,
enable_stage: bool = True,
quiet: bool = False,
metric_container: t.Optional[t.Dict[str, t.Any]] = None,
**pipeline_options: t.Any,
) -> M.Result[t.Set[dlt.sources.DltSource], Exception]: ...
exports: t.Dict[str, t.Any]
pipeline: RuntimePipeline


def execute_pipeline_specification(
Expand All @@ -292,10 +272,7 @@ def execute_pipeline_specification(
quiet: bool = False,
metric_container: t.Optional[t.MutableMapping[str, t.Any]] = None,
**pipeline_options: t.Any,
) -> t.Union[
M.Result[t.Dict[str, t.Any], Exception],
M.Result[t.Set[dlt.sources.DltSource], Exception],
]:
) -> M.Result[PipelineResult, Exception]:
"""Executes a pipeline specification."""

metric_container = metric_container or {}
Expand Down Expand Up @@ -325,9 +302,9 @@ def execute_pipeline_specification(
maybe_redirect = redirect_stdout(null) if quiet else nullcontext()
try:
with maybe_redirect:
exports = spec()
result = PipelineResult(exports=spec(), pipeline=pipe_reference)
if dry_run:
return M.ok(pipe_reference._tracked_sources)
return M.ok(result)
with (
suppress(KeyError),
pipe_reference.sql_client() as client,
Expand All @@ -339,7 +316,7 @@ def execute_pipeline_specification(
f"Cleaning up staging dataset {client_staging.dataset_name}"
)
client_staging.drop_dataset()
return M.ok(exports)
return M.ok(result)
except Exception as e:
return M.error(e)
finally:
Expand Down

0 comments on commit 7415aca

Please sign in to comment.