From 3b77780fa072c6dfc994431ce90e1c8ac8a90e83 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 6 Feb 2024 13:24:15 -0800 Subject: [PATCH] feat(ingest/dbt): speed up test result only ingestion --- .../datahub/ingestion/source/dbt/dbt_common.py | 17 +++++++++++++---- .../tests/unit/test_dbt_source.py | 4 ++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 985c9118f3422..2fbaec5448455 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -184,17 +184,18 @@ def process_only_directive(cls, values): return values - def can_emit_node_type(self, node_type: str) -> bool: + def _node_type_allow_map(self): # Node type comes from dbt's node types. - - node_type_allow_map = { + return { "model": self.models, "source": self.sources, "seed": self.seeds, "snapshot": self.snapshots, "test": self.test_definitions, } - allowed = node_type_allow_map.get(node_type) + + def can_emit_node_type(self, node_type: str) -> bool: + allowed = self._node_type_allow_map().get(node_type) if allowed is None: return False @@ -204,6 +205,11 @@ def can_emit_node_type(self, node_type: str) -> bool: def can_emit_test_results(self) -> bool: return self.test_results == EmitDirective.YES + def is_only_test_results(self) -> bool: + return self.test_results == EmitDirective.YES and all( + v == EmitDirective.NO for v in self._node_type_allow_map().values() + ) + class DBTCommonConfig( StatefulIngestionConfigBase, DatasetSourceConfigMixin, LineageConfig @@ -877,6 +883,9 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No 5. If we haven't already added the node's schema to the schema resolver, do that. """ + if self.config.entities_enabled.is_only_test_results(): + # If we're not emitting any other entities, so there's no need to infer schemas. + return if not self.config.infer_dbt_schemas: if self.config.include_column_lineage: raise ConfigurationError( diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index 737cf6aca33cc..6e8c08d5bdf35 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -256,6 +256,7 @@ def test_dbt_entity_emission_configuration_helpers(): assert not config.entities_enabled.can_emit_node_type("source") assert not config.entities_enabled.can_emit_node_type("test") assert not config.entities_enabled.can_emit_test_results + assert not config.entities_enabled.is_only_test_results() config_dict = { "manifest_path": "dummy_path", @@ -267,6 +268,7 @@ def test_dbt_entity_emission_configuration_helpers(): assert config.entities_enabled.can_emit_node_type("source") assert config.entities_enabled.can_emit_node_type("test") assert config.entities_enabled.can_emit_test_results + assert not config.entities_enabled.is_only_test_results() config_dict = { "manifest_path": "dummy_path", @@ -281,6 +283,7 @@ def test_dbt_entity_emission_configuration_helpers(): assert not config.entities_enabled.can_emit_node_type("source") assert not config.entities_enabled.can_emit_node_type("test") assert config.entities_enabled.can_emit_test_results + assert config.entities_enabled.is_only_test_results() config_dict = { "manifest_path": "dummy_path", @@ -298,6 +301,7 @@ def test_dbt_entity_emission_configuration_helpers(): assert not config.entities_enabled.can_emit_node_type("source") assert config.entities_enabled.can_emit_node_type("test") assert config.entities_enabled.can_emit_test_results + assert not config.entities_enabled.is_only_test_results() def test_dbt_cloud_config_access_url():