From e5029cdee1a8dda2b63b3d1e8defc4fa24566afb Mon Sep 17 00:00:00 2001 From: z3z1ma Date: Sun, 7 Apr 2024 21:26:13 -0700 Subject: [PATCH] chore:use impl cls contributed upstream to dlt, pin to commit for now --- pyproject.toml | 2 +- src/cdf/core/runtime/pipeline.py | 30 ++++-------------------------- 2 files changed, 5 insertions(+), 27 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index a11e451..50a112e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ authors = [ ] dependencies = [ "sqlmesh", - "dlt", + "dlt[duckdb] @ git+https://github.com/dlt-hub/dlt.git@48cdfa1eb7d7b0fb37411d61cc0eb747fba814a7", "typer[all]>=0.8.0", "harness-featureflags>=1.2.0", "python-dotenv>=1.0.0", diff --git a/src/cdf/core/runtime/pipeline.py b/src/cdf/core/runtime/pipeline.py index 048f094..2377f94 100644 --- a/src/cdf/core/runtime/pipeline.py +++ b/src/cdf/core/runtime/pipeline.py @@ -264,26 +264,6 @@ def run( schema_contract=schema_contract, ) - @classmethod - def from_pipeline(cls, pipeline: Pipeline) -> "RuntimePipeline": - """Creates a RuntimePipeline from a dlt Pipeline object.""" - return cls( - pipeline.pipeline_name, - pipelines_dir=pipeline.pipelines_dir, - pipeline_salt=pipeline.config.pipeline_salt, # type: ignore[arg-type] - destination=pipeline.destination, - staging=pipeline.staging, - dataset_name=pipeline.dataset_name, - credentials=None, - import_schema_path=pipeline._schema_storage_config.import_schema_path, # type: ignore[arg-type] - export_schema_path=pipeline._schema_storage_config.export_schema_path, # type: ignore[arg-type] - full_refresh=pipeline.full_refresh, - progress=pipeline.collector, - must_attach_to_local_pipeline=False, - config=pipeline.config, - runtime=pipeline.runtime_config, - ) - def pipeline_factory() -> RuntimePipeline: """Creates a cdf pipeline. This is used in lieu of dlt.pipeline. in user code. @@ -292,16 +272,14 @@ def pipeline_factory() -> RuntimePipeline: from the runtime context. Raises a ValueError if the runtime context is not set. """ runtime = CONTEXT.get() - options = dict( + # TODO: contribute a PR to expose an _impl_cls argument in dlt.pipeline + # https://github.com/dlt-hub/dlt/pull/1176 + return dlt.pipeline( pipeline_name=runtime.pipeline_name, destination=runtime.destination, staging=runtime.staging if runtime.enable_stage else None, dataset_name=runtime.dataset_name, - ) - # TODO: contribute a PR to expose an _impl_cls argument in dlt.pipeline - # https://github.com/dlt-hub/dlt/pull/1176 - return RuntimePipeline.from_pipeline( - pipeline=dlt.pipeline(**options), + _impl_cls=RuntimePipeline, )