Skip to content

Commit

Permalink
[fivetran] add set_kind_tag_for_service toggle
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Oct 4, 2024
1 parent cb76b4e commit 7e13925
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def build_asset_defn_metadata(
extra_metadata={
"connector_id": self.connector_id,
"io_manager_key": io_manager_key,
"storage_kind": self.service,
"service": self.service,
},
)

Expand All @@ -373,11 +373,12 @@ def _build_fivetran_assets_from_metadata(
poll_interval: float,
poll_timeout: Optional[float],
fetch_column_metadata: bool,
set_kind_tag_for_service: bool,
) -> AssetsDefinition:
metadata = cast(Mapping[str, Any], assets_defn_meta.extra_metadata)
connector_id = cast(str, metadata["connector_id"])
io_manager_key = cast(Optional[str], metadata["io_manager_key"])
storage_kind = cast(Optional[str], metadata.get("storage_kind"))
service = cast(Optional[str], metadata.get("service"))

return _build_fivetran_assets(
connector_id=connector_id,
Expand All @@ -396,7 +397,7 @@ def _build_fivetran_assets_from_metadata(
group_name=assets_defn_meta.group_name,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
asset_tags=build_kind_tag(storage_kind) if storage_kind else None,
asset_tags=build_kind_tag(service) if set_kind_tag_for_service and service else None,
fetch_column_metadata=fetch_column_metadata,
infer_missing_tables=False,
op_tags=None,
Expand All @@ -416,6 +417,7 @@ def __init__(
poll_interval: float,
poll_timeout: Optional[float],
fetch_column_metadata: bool,
set_kind_tag_for_service: bool,
):
self._fivetran_resource_def = fivetran_resource_def
if isinstance(fivetran_resource_def, FivetranResource):
Expand Down Expand Up @@ -445,6 +447,7 @@ def __init__(
self._poll_interval = poll_interval
self._poll_timeout = poll_timeout
self._fetch_column_metadata = fetch_column_metadata
self._set_kind_tag_for_service = set_kind_tag_for_service

contents = hashlib.sha1()
contents.update(",".join(key_prefix).encode("utf-8"))
Expand Down Expand Up @@ -535,6 +538,7 @@ def build_definitions(
poll_interval=self._poll_interval,
poll_timeout=self._poll_timeout,
fetch_column_metadata=self._fetch_column_metadata,
set_kind_tag_for_service=self._set_kind_tag_for_service,
)
for meta in data
]
Expand All @@ -559,6 +563,7 @@ def load_assets_from_fivetran_instance(
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
fetch_column_metadata: bool = True,
set_kind_tag_for_service: bool = True,
) -> CacheableAssetsDefinition:
"""Loads Fivetran connector assets from a configured FivetranResource instance. This fetches information
about defined connectors at initialization time, and will error on workspace load if the Fivetran
Expand Down Expand Up @@ -588,6 +593,7 @@ def load_assets_from_fivetran_instance(
timed out. By default, this will never time out.
fetch_column_metadata (bool): If True, will fetch column schema information for each table in the connector.
This will induce additional API calls.
set_kind_tag_for_service (bool): If True, will set a kind tag on the asset for the destination service.
**Examples:**
Expand Down Expand Up @@ -644,4 +650,5 @@ def load_assets_from_fivetran_instance(
poll_interval=poll_interval,
poll_timeout=poll_timeout,
fetch_column_metadata=fetch_column_metadata,
set_kind_tag_for_service=set_kind_tag_for_service,
)

0 comments on commit 7e13925

Please sign in to comment.