Skip to content

Commit

Permalink
chore:use impl cls contributed upstream to dlt, pin to commit for now
Browse files Browse the repository at this point in the history
  • Loading branch information
z3z1ma committed Apr 8, 2024
1 parent 9927ce4 commit e5029cd
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 27 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ authors = [
]
dependencies = [
"sqlmesh",
"dlt",
"dlt[duckdb] @ git+https://github.com/dlt-hub/dlt.git@48cdfa1eb7d7b0fb37411d61cc0eb747fba814a7",
"typer[all]>=0.8.0",
"harness-featureflags>=1.2.0",
"python-dotenv>=1.0.0",
Expand Down
30 changes: 4 additions & 26 deletions src/cdf/core/runtime/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,26 +264,6 @@ def run(
schema_contract=schema_contract,
)

@classmethod
def from_pipeline(cls, pipeline: Pipeline) -> "RuntimePipeline":
"""Creates a RuntimePipeline from a dlt Pipeline object."""
return cls(
pipeline.pipeline_name,
pipelines_dir=pipeline.pipelines_dir,
pipeline_salt=pipeline.config.pipeline_salt, # type: ignore[arg-type]
destination=pipeline.destination,
staging=pipeline.staging,
dataset_name=pipeline.dataset_name,
credentials=None,
import_schema_path=pipeline._schema_storage_config.import_schema_path, # type: ignore[arg-type]
export_schema_path=pipeline._schema_storage_config.export_schema_path, # type: ignore[arg-type]
full_refresh=pipeline.full_refresh,
progress=pipeline.collector,
must_attach_to_local_pipeline=False,
config=pipeline.config,
runtime=pipeline.runtime_config,
)


def pipeline_factory() -> RuntimePipeline:
"""Creates a cdf pipeline. This is used in lieu of dlt.pipeline. in user code.
Expand All @@ -292,16 +272,14 @@ def pipeline_factory() -> RuntimePipeline:
from the runtime context. Raises a ValueError if the runtime context is not set.
"""
runtime = CONTEXT.get()
options = dict(
# TODO: contribute a PR to expose an _impl_cls argument in dlt.pipeline
# https://github.com/dlt-hub/dlt/pull/1176
return dlt.pipeline(
pipeline_name=runtime.pipeline_name,
destination=runtime.destination,
staging=runtime.staging if runtime.enable_stage else None,
dataset_name=runtime.dataset_name,
)
# TODO: contribute a PR to expose an _impl_cls argument in dlt.pipeline
# https://github.com/dlt-hub/dlt/pull/1176
return RuntimePipeline.from_pipeline(
pipeline=dlt.pipeline(**options),
_impl_cls=RuntimePipeline,
)


Expand Down

0 comments on commit e5029cd

Please sign in to comment.