Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fivetran] add set_kind_tag_for_service toggle #25080

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def build_asset_defn_metadata(
extra_metadata={
"connector_id": self.connector_id,
"io_manager_key": io_manager_key,
"storage_kind": self.service,
"service": self.service,
},
)

Expand All @@ -373,11 +373,12 @@ def _build_fivetran_assets_from_metadata(
poll_interval: float,
poll_timeout: Optional[float],
fetch_column_metadata: bool,
set_kind_tag_for_service: bool,
) -> AssetsDefinition:
metadata = cast(Mapping[str, Any], assets_defn_meta.extra_metadata)
connector_id = cast(str, metadata["connector_id"])
io_manager_key = cast(Optional[str], metadata["io_manager_key"])
storage_kind = cast(Optional[str], metadata.get("storage_kind"))
service = cast(Optional[str], metadata.get("service"))

return _build_fivetran_assets(
connector_id=connector_id,
Expand All @@ -396,7 +397,7 @@ def _build_fivetran_assets_from_metadata(
group_name=assets_defn_meta.group_name,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
asset_tags=build_kind_tag(storage_kind) if storage_kind else None,
asset_tags=build_kind_tag(service) if set_kind_tag_for_service and service else None,
fetch_column_metadata=fetch_column_metadata,
infer_missing_tables=False,
op_tags=None,
Expand All @@ -416,6 +417,7 @@ def __init__(
poll_interval: float,
poll_timeout: Optional[float],
fetch_column_metadata: bool,
set_kind_tag_for_service: bool,
):
self._fivetran_resource_def = fivetran_resource_def
if isinstance(fivetran_resource_def, FivetranResource):
Expand Down Expand Up @@ -445,6 +447,7 @@ def __init__(
self._poll_interval = poll_interval
self._poll_timeout = poll_timeout
self._fetch_column_metadata = fetch_column_metadata
self._set_kind_tag_for_service = set_kind_tag_for_service

contents = hashlib.sha1()
contents.update(",".join(key_prefix).encode("utf-8"))
Expand Down Expand Up @@ -535,6 +538,7 @@ def build_definitions(
poll_interval=self._poll_interval,
poll_timeout=self._poll_timeout,
fetch_column_metadata=self._fetch_column_metadata,
set_kind_tag_for_service=self._set_kind_tag_for_service,
)
for meta in data
]
Expand All @@ -559,6 +563,7 @@ def load_assets_from_fivetran_instance(
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
fetch_column_metadata: bool = True,
set_kind_tag_for_service: bool = True,
) -> CacheableAssetsDefinition:
"""Loads Fivetran connector assets from a configured FivetranResource instance. This fetches information
about defined connectors at initialization time, and will error on workspace load if the Fivetran
Expand Down Expand Up @@ -588,6 +593,7 @@ def load_assets_from_fivetran_instance(
timed out. By default, this will never time out.
fetch_column_metadata (bool): If True, will fetch column schema information for each table in the connector.
This will induce additional API calls.
set_kind_tag_for_service (bool): If True, will set a kind tag on the asset for the destination service.

**Examples:**

Expand Down Expand Up @@ -644,4 +650,5 @@ def load_assets_from_fivetran_instance(
poll_interval=poll_interval,
poll_timeout=poll_timeout,
fetch_column_metadata=fetch_column_metadata,
set_kind_tag_for_service=set_kind_tag_for_service,
)
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@
)
@pytest.mark.parametrize("multiple_connectors", [True, False])
@pytest.mark.parametrize("destination_ids", [None, [], ["some_group"]])
@pytest.mark.parametrize("set_kind_tag_for_service", [True, False])
def test_load_from_instance(
connector_to_group_fn,
filter_connector,
connector_to_asset_key_fn,
multiple_connectors,
destination_ids,
set_kind_tag_for_service,
) -> None:
with environ({"FIVETRAN_API_KEY": "some_key", "FIVETRAN_API_SECRET": "some_secret"}):
load_calls = []
Expand Down Expand Up @@ -132,6 +134,7 @@ def load_input(self, context: InputContext) -> Any:
connector_to_io_manager_key_fn=(lambda _: "test_io_manager"),
poll_interval=10,
poll_timeout=600,
set_kind_tag_for_service=set_kind_tag_for_service,
)
else:
ft_cacheable_assets = load_assets_from_fivetran_instance(
Expand All @@ -141,6 +144,7 @@ def load_input(self, context: InputContext) -> Any:
io_manager_key="test_io_manager",
poll_interval=10,
poll_timeout=600,
set_kind_tag_for_service=set_kind_tag_for_service,
)
ft_assets = ft_cacheable_assets.build_definitions(
ft_cacheable_assets.compute_cacheable_data()
Expand Down Expand Up @@ -216,7 +220,10 @@ def downstream_asset(xyz):
assert metadata.get("dagster/relation_identifier") == (
"example_database." + ".".join(key.path[-2:])
)
assert has_kind(assets_def.tags_by_key[key], "snowflake")
if set_kind_tag_for_service:
assert has_kind(assets_def.tags_by_key[key], "snowflake")
else:
assert not has_kind(assets_def.tags_by_key[key], "snowflake")

assert ft_assets[0].keys == tables
assert all(
Expand Down