From a9df50f1e316d6f250f8ef14ff320a17718785fc Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 30 Sep 2024 20:55:49 +0200 Subject: [PATCH 1/6] first pass: replace os env with project flag --- core/dbt/context/providers.py | 2 +- core/dbt/contracts/project.py | 2 ++ core/dbt/parser/manifest.py | 2 +- core/dbt/task/run.py | 5 ++-- .../functional/microbatch/test_microbatch.py | 24 +++++++------------ .../test_microbatch_config_validation.py | 21 ++++++++++++---- tests/unit/context/test_providers.py | 12 +++++++--- 7 files changed, 39 insertions(+), 29 deletions(-) diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index a0e3751587a..54d88a95c27 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -238,7 +238,7 @@ def resolve_limit(self) -> Optional[int]: def resolve_event_time_filter(self, target: ManifestNode) -> Optional[EventTimeFilter]: event_time_filter = None if ( - os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + get_flags().require_builtin_microbatch_strategy and (isinstance(target.config, NodeConfig) or isinstance(target.config, SourceConfig)) and target.config.event_time and self.model.config.materialized == "incremental" diff --git a/core/dbt/contracts/project.py b/core/dbt/contracts/project.py index f5a4ec605ec..4a722eb66d7 100644 --- a/core/dbt/contracts/project.py +++ b/core/dbt/contracts/project.py @@ -338,6 +338,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): write_json: Optional[bool] = None # legacy behaviors - https://github.com/dbt-labs/dbt-core/blob/main/docs/guides/behavior-change-flags.md + require_builtin_microbatch_strategy: bool = False require_explicit_package_overrides_for_builtin_materializations: bool = True require_resource_names_without_spaces: bool = False source_freshness_run_project_hooks: bool = False @@ -348,6 +349,7 @@ class ProjectFlags(ExtensibleDbtClassMixin): @property def project_only_flags(self) -> Dict[str, Any]: return { + "require_builtin_microbatch_strategy": self.require_builtin_microbatch_strategy, "require_explicit_package_overrides_for_builtin_materializations": self.require_explicit_package_overrides_for_builtin_materializations, "require_resource_names_without_spaces": self.require_resource_names_without_spaces, "source_freshness_run_project_hooks": self.source_freshness_run_project_hooks, diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7ffd00febc5..0e7d6a3dea9 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1383,7 +1383,7 @@ def check_valid_snapshot_config(self): node.config.final_validate() def check_valid_microbatch_config(self): - if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + if get_flags().require_builtin_microbatch_strategy: for node in self.manifest.nodes.values(): if ( node.config.materialized == "incremental" diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 673a400ec02..187b29b4e36 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -1,5 +1,4 @@ import functools -import os import threading from datetime import datetime from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -31,6 +30,7 @@ RunningOperationCaughtError, ) from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError +from dbt.flags import get_flags from dbt.graph import ResourceTypeSelector from dbt.hooks import get_hook_dict from dbt.materializations.incremental.microbatch import MicrobatchBuilder @@ -447,9 +447,8 @@ def execute(self, model, manifest): ) hook_ctx = self.adapter.pre_model_hook(context_config) - if ( - os.environ.get("DBT_EXPERIMENTAL_MICROBATCH") + get_flags().require_builtin_microbatch_strategy and model.config.materialized == "incremental" and model.config.incremental_strategy == "microbatch" ): diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c233657f180..9058fba2d93 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -1,4 +1,3 @@ -import os from unittest import mock import pytest @@ -111,6 +110,14 @@ def models(self): def macros(self): return {"microbatch.sql": custom_microbatch_strategy} + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + class TestMicrobatchCustomUserStrategyDefault(BaseMicrobatchCustomUserStrategy): def test_use_custom_microbatch_strategy_by_default(self, project): @@ -126,7 +133,6 @@ def test_use_custom_microbatch_strategy_by_default(self, project): class TestMicrobatchCustomUserStrategyEnvVarTrueValid(BaseMicrobatchCustomUserStrategy): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( self, project ): @@ -143,10 +149,7 @@ def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strateg assert "custom microbatch strategy" in logs -# TODO: Consider a behaviour flag here if DBT_EXPERIMENTAL_MICROBATCH is removed -# Since this causes an exception prior to using an override class TestMicrobatchCustomUserStrategyEnvVarTrueInvalid(BaseMicrobatchCustomUserStrategy): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_use_custom_microbatch_strategy_env_var_true_invalid_incremental_strategy( self, project ): @@ -183,7 +186,6 @@ def assert_row_count(self, project, relation_name: str, expected_row_count: int) class TestMicrobatchCLI(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run without --event-time-start or --event-time-end - 3 expected rows in output with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -214,7 +216,6 @@ def test_run_with_event_time(self, project): class TestMicroBatchBoundsDefault(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -266,7 +267,6 @@ def models(self): "seeds.yml": seeds_yaml, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # ensure seed is created for source run_dbt(["seed"]) @@ -314,7 +314,6 @@ def models(self): "microbatch_model.sql": microbatch_model_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -342,7 +341,6 @@ def test_run_with_event_time(self, project): class TestMicrobatchUsingRefRenderSkipsFilter(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # initial run -- backfills all data with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -395,7 +393,6 @@ def models(self): "microbatch_model.sql": microbatch_model_context_vars, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time_logs(self, project): with patch_microbatch_end_time("2020-01-03 13:57:00"): _, logs = run_dbt_and_capture(["run", "--event-time-start", "2020-01-01"]) @@ -428,7 +425,6 @@ def models(self): "downstream_model.sql": downstream_model_of_microbatch_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -453,7 +449,6 @@ def models(self): "microbatch_model.sql": microbatch_model_failing_incremental_partition_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -502,7 +497,6 @@ def models(self): "microbatch_model.sql": microbatch_model_first_partition_failing_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -511,7 +505,6 @@ def test_run_with_event_time(self, project): class TestMicrobatchCompiledRunPaths(BaseMicrobatchTest): - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): @@ -594,7 +587,6 @@ def models(self): "downstream_model.sql": downstream_model_of_microbatch_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from 2020-01-02 to spoofed "now" - 2 expected rows in output with patch_microbatch_end_time("2020-01-03 13:57:00"): diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index cdebd3a791b..f97e85a8f8a 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -1,6 +1,3 @@ -import os -from unittest import mock - import pytest from dbt.exceptions import ParsingError @@ -86,7 +83,14 @@ class BaseMicrobatchTestParseError: def models(self): return {} - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def test_parsing_error_raised(self, project): with pytest.raises(ParsingError): run_dbt(["parse"]) @@ -97,7 +101,14 @@ class BaseMicrobatchTestNoError: def models(self): return {} - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def test_parsing_error_not_raised(self, project): run_dbt(["parse"]) diff --git a/tests/unit/context/test_providers.py b/tests/unit/context/test_providers.py index 46c29254a9a..daa8d1d1bda 100644 --- a/tests/unit/context/test_providers.py +++ b/tests/unit/context/test_providers.py @@ -1,4 +1,4 @@ -import os +from argparse import Namespace from unittest import mock import pytest @@ -13,6 +13,7 @@ RuntimeRefResolver, RuntimeSourceResolver, ) +from dbt.flags import set_from_args class TestBaseResolver: @@ -56,8 +57,9 @@ def test_resolve_event_time_filter( incremental_strategy: str, expect_filter: bool, ) -> None: - if dbt_experimental_microbatch: - mocker.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + set_from_args( + Namespace(require_builtin_microbatch_strategy=dbt_experimental_microbatch), None + ) # Target mocking target = mock.Mock() @@ -117,6 +119,8 @@ def test_create_relation_with_empty(self, resolver, empty, is_ephemeral_model, e mock_node.is_ephemeral_model = is_ephemeral_model mock_node.defer_relation = None + set_from_args(Namespace(require_builtin_microbatch_strategy=False), None) + # create limited relation with mock.patch("dbt.contracts.graph.nodes.ParsedNode", new=mock.Mock): relation = resolver.create_relation(mock_node) @@ -156,6 +160,8 @@ def test_create_relation_with_empty(self, resolver, empty, expected_limit): mock_source.quoting_dict = {} resolver.manifest.resolve_source.return_value = mock_source + set_from_args(Namespace(require_builtin_microbatch_strategy=False), None) + # create limited relation relation = resolver.resolve("test", "test") assert relation.limit == expected_limit From 704494f38b9956cda7a50ef40ca0c6158257e644 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 15:57:02 -0500 Subject: [PATCH 2/6] Fix `TestMicrobatchMultipleRetries` to not use `os.env` --- tests/functional/microbatch/test_microbatch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 37182c135d7..308ecd0bb72 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -488,7 +488,6 @@ def models(self): "microbatch_model.sql": microbatch_model_failing_incremental_partition_sql, } - @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_run_with_event_time(self, project): # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): From bfecde1b024b5f7478ce75b113335c5ceafde01c Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 15:58:03 -0500 Subject: [PATCH 3/6] Turn off microbatch project flag for `TestMicrobatchCustomUserStrategyDefault` as it was prior to a9df50f --- tests/functional/microbatch/test_microbatch.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 308ecd0bb72..8552e27afa0 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -120,6 +120,14 @@ def project_config_update(self): class TestMicrobatchCustomUserStrategyDefault(BaseMicrobatchCustomUserStrategy): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": False, + } + } + def test_use_custom_microbatch_strategy_by_default(self, project): with mock.patch.object( type(project.adapter), "valid_incremental_strategies", lambda _: [] From d690decd17148146bc5a15e58b692cda60244570 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 16:00:30 -0500 Subject: [PATCH 4/6] Update `BaseMicrobatchTest` to turn on microbatch via project flags --- tests/functional/microbatch/test_microbatch.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 8552e27afa0..1cf07222f17 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -182,6 +182,14 @@ def models(self): "microbatch_model.sql": microbatch_model_sql, } + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "flags": { + "require_builtin_microbatch_strategy": True, + } + } + def assert_row_count(self, project, relation_name: str, expected_row_count: int): relation = relation_from_name(project.adapter, relation_name) result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one") From 5b9d7f7b14ee73d39c77b20557505f7480d16633 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 16:01:25 -0500 Subject: [PATCH 5/6] [REVERT BEFORE MERGE] Update dbt-adapters dev req to point to branch with behavior flag changes This is needed to get the tests to pass. This is only necessary until the changes in dbt-adapters are merged. Said another way: we need to merge the changes to dbt-adapters first, and then revert these dependency changes before merging these changes. --- dev-requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev-requirements.txt b/dev-requirements.txt index 20605e632b8..83b2911011b 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ -git+https://github.com/dbt-labs/dbt-adapters.git@main -git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter +git+https://github.com/dbt-labs/dbt-adapters.git@microbatch-behavior-flag +git+https://github.com/dbt-labs/dbt-adapters.git@microbatch-behavior-flag#subdirectory=dbt-tests-adapter git+https://github.com/dbt-labs/dbt-common.git@main git+https://github.com/dbt-labs/dbt-postgres.git@main # black must match what's in .pre-commit-config.yaml to be sure local env matches CI From 1105e42fa530a6cdcdbcc09aee32eb6c46b902b5 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Tue, 1 Oct 2024 16:14:43 -0500 Subject: [PATCH 6/6] Add changie doc --- .changes/unreleased/Features-20241001-161422.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20241001-161422.yaml diff --git a/.changes/unreleased/Features-20241001-161422.yaml b/.changes/unreleased/Features-20241001-161422.yaml new file mode 100644 index 00000000000..60f10d8d667 --- /dev/null +++ b/.changes/unreleased/Features-20241001-161422.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Change gating of microbatch feature to be behind project flag / behavior flag +time: 2024-10-01T16:14:22.267253-05:00 +custom: + Author: MichelleArk QMalcolm + Issue: "10798"