From 51f6a048def634858f5818d8dee5fd8dd5af6a4c Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 27 Sep 2024 17:49:07 -0400 Subject: [PATCH 1/4] [dagster-tableau] Implement StartWorkbookRefreshRequest --- .../dagster_tableau/__init__.py | 5 +- .../dagster_tableau/resources.py | 153 ++++++++++++++++-- .../dagster_tableau/translator.py | 5 + .../dagster_tableau_tests/conftest.py | 31 +++- .../dagster_tableau_tests/repos/__init__.py | 0 .../{ => repos}/pending_repo.py | 0 .../pending_repo_with_refresh_requests.py | 46 ++++++ .../test_pending_repo.py | 84 +++++++++- 8 files changed, 304 insertions(+), 20 deletions(-) create mode 100644 python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/__init__.py rename python_modules/libraries/dagster-tableau/dagster_tableau_tests/{ => repos}/pending_repo.py (100%) create mode 100644 python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refresh_requests.py diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py b/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py index 6d23dec168c55..5ac447897b157 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py @@ -4,7 +4,10 @@ TableauCloudWorkspace as TableauCloudWorkspace, TableauServerWorkspace as TableauServerWorkspace, ) -from dagster_tableau.translator import DagsterTableauTranslator as DagsterTableauTranslator +from dagster_tableau.translator import ( + DagsterTableauTranslator as DagsterTableauTranslator, + StartWorkbookRefreshRequest as StartWorkbookRefreshRequest, +) # Move back to version.py and edit setup.py once we are ready to publish. __version__ = "1!0+dev" diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py index ced07f4b22d50..51248f67ff785 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py @@ -1,5 +1,7 @@ import datetime import json +import logging +import time import uuid from abc import abstractmethod from contextlib import contextmanager @@ -13,9 +15,12 @@ AssetsDefinition, ConfigurableResource, Definitions, + Failure, ObserveResult, + Output, _check as check, external_assets_from_specs, + get_dagster_logger, multi_asset, ) from dagster._annotations import experimental @@ -29,11 +34,15 @@ from dagster_tableau.translator import ( DagsterTableauTranslator, + StartWorkbookRefreshRequest, TableauContentData, TableauContentType, TableauWorkspaceData, ) +DEFAULT_POLL_INTERVAL_SECONDS = 10 +DEFAULT_POLL_TIMEOUT = 600 + @experimental class BaseTableauClient: @@ -58,6 +67,11 @@ def __init__( def base_url(self) -> str: raise NotImplementedError() + @property + @cached_method + def _log(self) -> logging.Logger: + return get_dagster_logger() + @cached_method def get_workbooks(self) -> Mapping[str, object]: """Fetches a list of all Tableau workbooks in the workspace.""" @@ -82,6 +96,70 @@ def get_view( self._server.views.get_request(f"{self._server.views.baseurl}/{view_id}") ) + def get_job( + self, + job_id: str, + ) -> TSC.JobItem: + """Fetches information for a given job.""" + return self._server.jobs.get_by_id(job_id) + + def cancel_job( + self, + job_id: str, + ) -> Mapping[str, object]: + """Fetches information for a given job.""" + return self._response_to_dict(self._server.jobs.cancel(job_id)) + + def refresh_workbook(self, workbook_id) -> TSC.JobItem: + """Refreshes all extracts for a given workbook and return the JobItem object.""" + return self._server.workbooks.refresh(workbook_id) + + def refresh_and_poll( + self, + workbook_id: str, + poll_interval: Optional[float] = None, + poll_timeout: Optional[float] = None, + ) -> str: + job = self.refresh_workbook(workbook_id) + + if not poll_interval: + poll_interval = DEFAULT_POLL_INTERVAL_SECONDS + if not poll_timeout: + poll_timeout = DEFAULT_POLL_TIMEOUT + + self._log.info(f"Job {job.id} initialized for workbook_id={workbook_id}.") + start = time.monotonic() + + try: + while True: + if poll_timeout and start + poll_timeout < time.monotonic(): + raise Failure( + f"Timeout: Tableau job {job.id} is not ready after the timeout" + f" {poll_timeout} seconds" + ) + time.sleep(poll_interval) + job = self.get_job(job_id=job.id) + + if job.finish_code == -1: + continue + elif job.finish_code == 0: + break + elif job.finish_code == 1: + raise Failure(f"Job failed: {job.id}") + elif job.finish_code == 2: + raise Failure(f"Job was cancelled: {job.id}") + else: + raise Failure( + f"Encountered unexpected finish code `{job.finish_code}` for job {job.id}" + ) + finally: + # if Tableau sync has not completed, make sure to cancel it so that it doesn't outlive + # the python process + if job.finish_code not in (0, 1, 2): + self.cancel_job(job.id) + + return job.workbook_id + def sign_in(self) -> Auth.contextmgr: """Sign in to the site in Tableau.""" jwt_token = jwt.encode( @@ -91,7 +169,7 @@ def sign_in(self) -> Auth.contextmgr: "jti": str(uuid.uuid4()), "aud": "tableau", "sub": self.username, - "scp": ["tableau:content:read"], + "scp": ["tableau:content:read", "tableau:tasks:run"], }, self.connected_app_secret_value, algorithm="HS256", @@ -303,12 +381,18 @@ def fetch_tableau_workspace_data( def build_assets( self, + start_workbook_refresh_requests: Sequence[StartWorkbookRefreshRequest], dagster_tableau_translator: Type[DagsterTableauTranslator], ) -> Sequence[CacheableAssetsDefinition]: """Returns a set of CacheableAssetsDefinition which will load Tableau content from the workspace and translates it into AssetSpecs, using the provided translator. Args: + start_workbook_refresh_requests (Sequence[StartWorkbookRefreshRequest]): A list of + requests to start workbook refreshes. This feature is equivalent to selecting Refreshing Extracts + for a workbook in Tableau UI and only works for workbooks for which the data sources are extracts. + See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now + for documentation. dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator. @@ -316,15 +400,26 @@ def build_assets( Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which will load the Tableau content. """ - return [TableauCacheableAssetsDefinition(self, dagster_tableau_translator)] + return [ + TableauCacheableAssetsDefinition( + self, start_workbook_refresh_requests, dagster_tableau_translator + ) + ] def build_defs( - self, dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator + self, + start_workbook_refresh_requests: Optional[Sequence[StartWorkbookRefreshRequest]] = None, + dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator, ) -> Definitions: """Returns a Definitions object which will load Tableau content from the workspace and translate it into assets, using the provided translator. Args: + start_workbook_refresh_requests (Optional[Sequence[StartWorkbookRefreshRequest]]): A list of + requests to start workbook refreshes. This feature is equivalent to selecting Refreshing Extracts + for a workbook in Tableau UI and only works for workbooks for which the data sources are extracts. + See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now + for documentation. dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator. @@ -332,7 +427,10 @@ def build_defs( Definitions: A Definitions object which will build and return the Power BI content. """ defs = Definitions( - assets=self.build_assets(dagster_tableau_translator=dagster_tableau_translator) + assets=self.build_assets( + start_workbook_refresh_requests=start_workbook_refresh_requests or [], + dagster_tableau_translator=dagster_tableau_translator, + ), ) return defs @@ -376,8 +474,14 @@ def build_client(self) -> None: class TableauCacheableAssetsDefinition(CacheableAssetsDefinition): - def __init__(self, workspace: BaseTableauWorkspace, translator: Type[DagsterTableauTranslator]): + def __init__( + self, + workspace: BaseTableauWorkspace, + refresh_requests: Sequence[StartWorkbookRefreshRequest], + translator: Type[DagsterTableauTranslator], + ): self._workspace = workspace + self._refresh_requests = refresh_requests self._translator_cls = translator super().__init__(unique_id=self._workspace.site_name) @@ -440,6 +544,9 @@ def _build_tableau_assets_from_workspace_data( ) def _assets(tableau: BaseTableauWorkspace): with tableau.get_client() as client: + refreshed_workbooks = set() + for refresh_request in self._refresh_requests: + refreshed_workbooks.add(client.refresh_and_poll(refresh_request.workbook_id)) for view_id, view_content_data in [ *workspace_data.sheets_by_id.items(), *workspace_data.dashboards_by_id.items(), @@ -451,16 +558,30 @@ def _assets(tableau: BaseTableauWorkspace): asset_key = translator.get_dashboard_asset_key(view_content_data) else: check.assert_never(view_content_data.content_type) - yield ObserveResult( - asset_key=asset_key, - metadata={ - "workbook_id": data["workbook"]["id"], - "owner_id": data["owner"]["id"], - "name": data["name"], - "contentUrl": data["contentUrl"], - "createdAt": data["createdAt"], - "updatedAt": data["updatedAt"], - }, - ) + if view_content_data.properties["workbook"]["luid"] in refreshed_workbooks: + yield Output( + value=None, + output_name="__".join(asset_key.path), + metadata={ + "workbook_id": data["workbook"]["id"], + "owner_id": data["owner"]["id"], + "name": data["name"], + "contentUrl": data["contentUrl"], + "createdAt": data["createdAt"], + "updatedAt": data["updatedAt"], + }, + ) + else: + yield ObserveResult( + asset_key=asset_key, + metadata={ + "workbook_id": data["workbook"]["id"], + "owner_id": data["owner"]["id"], + "name": data["name"], + "contentUrl": data["contentUrl"], + "createdAt": data["createdAt"], + "updatedAt": data["updatedAt"], + }, + ) return [_assets] diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py b/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py index 35f176a80ab32..f95cecdaeaf3f 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py @@ -15,6 +15,11 @@ def _clean_asset_name(name: str) -> str: return re.sub(r"[^a-z0-9A-Z.]+", "_", name).lower() +@record +class StartWorkbookRefreshRequest: + workbook_id: str + + class TableauContentType(Enum): """Enum representing each object in Tableau's ontology.""" diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py index 5c874316c6b5e..f1ccb66bb6b46 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py @@ -1,7 +1,7 @@ # ruff: noqa: SLF001 import uuid -from unittest.mock import patch +from unittest.mock import patch, PropertyMock import pytest from dagster_tableau.translator import TableauContentData, TableauContentType, TableauWorkspaceData @@ -129,6 +129,11 @@ def dashboard_id_fixture() -> str: return "c9bf8403-5daf-427a-b3d6-2ce9bed7798f" +@pytest.fixture(name="job_id") +def job_id_fixture() -> str: + return uuid.uuid4().hex + + @pytest.fixture(name="sign_in", autouse=True) def sign_in_fixture(): with patch("dagster_tableau.resources.BaseTableauClient.sign_in") as mocked_function: @@ -156,6 +161,30 @@ def get_view_fixture(): yield mocked_function +@pytest.fixture(name="get_job", autouse=True) +def get_job_fixture(workbook_id, job_id): + with patch("dagster_tableau.resources.BaseTableauClient.get_job") as mocked_function: + type(mocked_function.return_value).id = PropertyMock(return_value=job_id) + type(mocked_function.return_value).finish_code = PropertyMock(return_value=0) + type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id) + yield mocked_function + + +@pytest.fixture(name="refresh_workbook", autouse=True) +def refresh_workbook_fixture(workbook_id, job_id): + with patch("dagster_tableau.resources.BaseTableauClient.refresh_workbook") as mocked_function: + type(mocked_function.return_value).id = PropertyMock(return_value=job_id) + type(mocked_function.return_value).finish_code = PropertyMock(return_value=-1) + type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id) + yield mocked_function + + +@pytest.fixture(name="cancel_job", autouse=True) +def cancel_job_fixture(): + with patch("dagster_tableau.resources.BaseTableauClient.cancel_job") as mocked_function: + yield mocked_function + + @pytest.fixture( name="workspace_data", ) diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/__init__.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/pending_repo.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo.py similarity index 100% rename from python_modules/libraries/dagster-tableau/dagster_tableau_tests/pending_repo.py rename to python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo.py diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refresh_requests.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refresh_requests.py new file mode 100644 index 0000000000000..d81472b659cf6 --- /dev/null +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refresh_requests.py @@ -0,0 +1,46 @@ +from typing import cast + +from dagster import asset, define_asset_job +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.repository_definition.repository_definition import ( + PendingRepositoryDefinition, +) +from dagster_tableau import StartWorkbookRefreshRequest, TableauCloudWorkspace + +from dagster_tableau_tests.conftest import ( + FAKE_CONNECTED_APP_CLIENT_ID, + FAKE_CONNECTED_APP_SECRET_ID, + FAKE_CONNECTED_APP_SECRET_VALUE, + FAKE_POD_NAME, + FAKE_SITE_NAME, + FAKE_USERNAME, +) + +resource = TableauCloudWorkspace( + connected_app_client_id=FAKE_CONNECTED_APP_CLIENT_ID, + connected_app_secret_id=FAKE_CONNECTED_APP_SECRET_ID, + connected_app_secret_value=FAKE_CONNECTED_APP_SECRET_VALUE, + username=FAKE_USERNAME, + site_name=FAKE_SITE_NAME, + pod_name=FAKE_POD_NAME, +) + +pbi_defs = resource.build_defs( + start_workbook_refresh_requests=[ + StartWorkbookRefreshRequest(workbook_id="b75fc023-a7ca-4115-857b-4342028640d0") + ] +) + + +@asset +def my_materializable_asset(): + pass + + +pending_repo_from_cached_asset_metadata = cast( + PendingRepositoryDefinition, + Definitions.merge( + Definitions(assets=[my_materializable_asset], jobs=[define_asset_job("all_asset_job")]), + pbi_defs, + ).get_inner_repository(), +) diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py index d97db6121e0de..2cafb4be01968 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py @@ -14,7 +14,7 @@ def test_using_cached_asset_data( get_view: MagicMock, ) -> None: with instance_for_test() as instance: - from dagster_tableau_tests.pending_repo import pending_repo_from_cached_asset_metadata + from dagster_tableau_tests.repos.pending_repo import pending_repo_from_cached_asset_metadata assert sign_in.call_count == 0 assert get_workbooks.call_count == 0 @@ -36,7 +36,7 @@ def test_using_cached_asset_data( repository_load_data = repository_def.repository_load_data recon_repo = ReconstructableRepository.for_file( - file_relative_path(__file__, "pending_repo.py"), + file_relative_path(__file__, "repos/pending_repo.py"), fn_name="pending_repo_from_cached_asset_metadata", ) recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job") @@ -62,3 +62,83 @@ def test_using_cached_asset_data( assert get_workbooks.call_count == 1 assert get_workbook.call_count == 1 assert get_view.call_count == 2 + + +from unittest.mock import MagicMock + +from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository +from dagster._core.events import DagsterEventType +from dagster._core.execution.api import create_execution_plan, execute_plan +from dagster._core.instance_for_test import instance_for_test +from dagster._utils import file_relative_path + + +def test_using_cached_asset_data_with_refresh_request( + sign_in: MagicMock, + get_workbooks: MagicMock, + get_workbook: MagicMock, + get_view: MagicMock, + get_job: MagicMock, + refresh_workbook: MagicMock, + cancel_job: MagicMock, +) -> None: + with instance_for_test() as instance: + from dagster_tableau_tests.repos.pending_repo_with_refresh_requests import pending_repo_from_cached_asset_metadata + + assert sign_in.call_count == 0 + assert get_workbooks.call_count == 0 + assert get_workbook.call_count == 0 + assert get_view.call_count == 0 + assert refresh_workbook.call_count == 0 + assert get_job.call_count == 0 + assert cancel_job.call_count == 0 + + # first, we resolve the repository to generate our cached metadata + repository_def = pending_repo_from_cached_asset_metadata.compute_repository_definition() + # 3 calls to creates the defs + assert sign_in.call_count == 1 + assert get_workbooks.call_count == 1 + assert get_workbook.call_count == 1 + assert get_view.call_count == 0 + assert refresh_workbook.call_count == 0 + assert get_job.call_count == 0 + assert cancel_job.call_count == 0 + + # 1 Tableau external assets, 2 Tableau materializable asset and 1 Dagster materializable asset + assert len(repository_def.assets_defs_by_key) == 1 + 2 + 1 + + job_def = repository_def.get_job("all_asset_job") + repository_load_data = repository_def.repository_load_data + + recon_repo = ReconstructableRepository.for_file( + file_relative_path(__file__, "repos/pending_repo_with_refresh_requests.py"), + fn_name="pending_repo_from_cached_asset_metadata", + ) + recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job") + + execution_plan = create_execution_plan(recon_job, repository_load_data=repository_load_data) + + run = instance.create_run_for_job(job_def=job_def, execution_plan=execution_plan) + + events = execute_plan( + execution_plan=execution_plan, + job=recon_job, + dagster_run=run, + instance=instance, + ) + + assert ( + len([event for event in events if event.event_type == DagsterEventType.STEP_SUCCESS]) + == 2 + ), "Expected two successful steps" + + # 3 calls to create the defs + 5 calls to materialize the Tableau assets + # with 1 workbook to refresh, 1 sheet and 1 dashboard + assert sign_in.call_count == 2 + assert get_workbooks.call_count == 1 + assert get_workbook.call_count == 1 + assert get_view.call_count == 2 + assert refresh_workbook.call_count == 1 + assert get_job.call_count == 1 + # The finish_code of the mocked get_job is 0, so no cancel_job is not called + assert cancel_job.call_count == 0 From 0c2cbd3c800260ecf6f2904fc1cac74bb138f6e2 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 3 Oct 2024 13:55:59 -0400 Subject: [PATCH 2/4] Use workbook ids instead of StartWorkbookRefreshRequest --- .../dagster_tableau/__init__.py | 5 +-- .../dagster_tableau/resources.py | 41 +++++++++++-------- .../dagster_tableau/translator.py | 5 --- .../dagster_tableau_tests/conftest.py | 2 +- ...ing_repo_with_refreshable_workbook_ids.py} | 8 +--- .../test_pending_repo.py | 19 +++------ 6 files changed, 35 insertions(+), 45 deletions(-) rename python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/{pending_repo_with_refresh_requests.py => pending_repo_with_refreshable_workbook_ids.py} (81%) diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py b/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py index 5ac447897b157..6d23dec168c55 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/__init__.py @@ -4,10 +4,7 @@ TableauCloudWorkspace as TableauCloudWorkspace, TableauServerWorkspace as TableauServerWorkspace, ) -from dagster_tableau.translator import ( - DagsterTableauTranslator as DagsterTableauTranslator, - StartWorkbookRefreshRequest as StartWorkbookRefreshRequest, -) +from dagster_tableau.translator import DagsterTableauTranslator as DagsterTableauTranslator # Move back to version.py and edit setup.py once we are ready to publish. __version__ = "1!0+dev" diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py index 51248f67ff785..d7357b9773297 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py @@ -34,7 +34,6 @@ from dagster_tableau.translator import ( DagsterTableauTranslator, - StartWorkbookRefreshRequest, TableauContentData, TableauContentType, TableauWorkspaceData, @@ -119,7 +118,7 @@ def refresh_and_poll( workbook_id: str, poll_interval: Optional[float] = None, poll_timeout: Optional[float] = None, - ) -> str: + ) -> Optional[str]: job = self.refresh_workbook(workbook_id) if not poll_interval: @@ -381,16 +380,21 @@ def fetch_tableau_workspace_data( def build_assets( self, - start_workbook_refresh_requests: Sequence[StartWorkbookRefreshRequest], + refreshable_workbook_ids: Sequence[str], dagster_tableau_translator: Type[DagsterTableauTranslator], ) -> Sequence[CacheableAssetsDefinition]: """Returns a set of CacheableAssetsDefinition which will load Tableau content from the workspace and translates it into AssetSpecs, using the provided translator. Args: - start_workbook_refresh_requests (Sequence[StartWorkbookRefreshRequest]): A list of - requests to start workbook refreshes. This feature is equivalent to selecting Refreshing Extracts - for a workbook in Tableau UI and only works for workbooks for which the data sources are extracts. + refreshable_workbook_ids (Sequence[str]): A list of workbook IDs. The workbooks provided must + have extracts as data sources and be refreshable in Tableau. + + When materializing your Tableau assets, the workbooks provided are refreshed, + refreshing their sheets and dashboards before pulling their data in Dagster. + + This feature is equivalent to selecting Refreshing Extracts for a workbook in Tableau UI + and only works for workbooks for which the data sources are extracts. See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now for documentation. dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use @@ -402,22 +406,27 @@ def build_assets( """ return [ TableauCacheableAssetsDefinition( - self, start_workbook_refresh_requests, dagster_tableau_translator + self, refreshable_workbook_ids, dagster_tableau_translator ) ] def build_defs( self, - start_workbook_refresh_requests: Optional[Sequence[StartWorkbookRefreshRequest]] = None, + refreshable_workbook_ids: Optional[Sequence[str]] = None, dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator, ) -> Definitions: """Returns a Definitions object which will load Tableau content from the workspace and translate it into assets, using the provided translator. Args: - start_workbook_refresh_requests (Optional[Sequence[StartWorkbookRefreshRequest]]): A list of - requests to start workbook refreshes. This feature is equivalent to selecting Refreshing Extracts - for a workbook in Tableau UI and only works for workbooks for which the data sources are extracts. + refreshable_workbook_ids (Optional[Sequence[str]]): A list of workbook IDs. The workbooks provided must + have extracts as data sources and be refreshable in Tableau. + + When materializing your Tableau assets, the workbooks provided are refreshed, + refreshing their sheets and dashboards before pulling their data in Dagster. + + This feature is equivalent to selecting Refreshing Extracts for a workbook in Tableau UI + and only works for workbooks for which the data sources are extracts. See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now for documentation. dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use @@ -428,7 +437,7 @@ def build_defs( """ defs = Definitions( assets=self.build_assets( - start_workbook_refresh_requests=start_workbook_refresh_requests or [], + refreshable_workbook_ids=refreshable_workbook_ids or [], dagster_tableau_translator=dagster_tableau_translator, ), ) @@ -477,11 +486,11 @@ class TableauCacheableAssetsDefinition(CacheableAssetsDefinition): def __init__( self, workspace: BaseTableauWorkspace, - refresh_requests: Sequence[StartWorkbookRefreshRequest], + refreshable_workbook_ids: Sequence[str], translator: Type[DagsterTableauTranslator], ): self._workspace = workspace - self._refresh_requests = refresh_requests + self._refreshable_workbook_ids = refreshable_workbook_ids self._translator_cls = translator super().__init__(unique_id=self._workspace.site_name) @@ -545,8 +554,8 @@ def _build_tableau_assets_from_workspace_data( def _assets(tableau: BaseTableauWorkspace): with tableau.get_client() as client: refreshed_workbooks = set() - for refresh_request in self._refresh_requests: - refreshed_workbooks.add(client.refresh_and_poll(refresh_request.workbook_id)) + for refreshable_workbook_id in self._refreshable_workbook_ids: + refreshed_workbooks.add(client.refresh_and_poll(refreshable_workbook_id)) for view_id, view_content_data in [ *workspace_data.sheets_by_id.items(), *workspace_data.dashboards_by_id.items(), diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py b/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py index f95cecdaeaf3f..35f176a80ab32 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/translator.py @@ -15,11 +15,6 @@ def _clean_asset_name(name: str) -> str: return re.sub(r"[^a-z0-9A-Z.]+", "_", name).lower() -@record -class StartWorkbookRefreshRequest: - workbook_id: str - - class TableauContentType(Enum): """Enum representing each object in Tableau's ontology.""" diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py index f1ccb66bb6b46..ec0f00730e12d 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py @@ -1,7 +1,7 @@ # ruff: noqa: SLF001 import uuid -from unittest.mock import patch, PropertyMock +from unittest.mock import PropertyMock, patch import pytest from dagster_tableau.translator import TableauContentData, TableauContentType, TableauWorkspaceData diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refresh_requests.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refreshable_workbook_ids.py similarity index 81% rename from python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refresh_requests.py rename to python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refreshable_workbook_ids.py index d81472b659cf6..595a3a3cacd31 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refresh_requests.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refreshable_workbook_ids.py @@ -5,7 +5,7 @@ from dagster._core.definitions.repository_definition.repository_definition import ( PendingRepositoryDefinition, ) -from dagster_tableau import StartWorkbookRefreshRequest, TableauCloudWorkspace +from dagster_tableau import TableauCloudWorkspace from dagster_tableau_tests.conftest import ( FAKE_CONNECTED_APP_CLIENT_ID, @@ -25,11 +25,7 @@ pod_name=FAKE_POD_NAME, ) -pbi_defs = resource.build_defs( - start_workbook_refresh_requests=[ - StartWorkbookRefreshRequest(workbook_id="b75fc023-a7ca-4115-857b-4342028640d0") - ] -) +pbi_defs = resource.build_defs(refreshable_workbook_ids=["b75fc023-a7ca-4115-857b-4342028640d0"]) @asset diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py index 2cafb4be01968..964cc4958d9e8 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py @@ -64,15 +64,6 @@ def test_using_cached_asset_data( assert get_view.call_count == 2 -from unittest.mock import MagicMock - -from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository -from dagster._core.events import DagsterEventType -from dagster._core.execution.api import create_execution_plan, execute_plan -from dagster._core.instance_for_test import instance_for_test -from dagster._utils import file_relative_path - - def test_using_cached_asset_data_with_refresh_request( sign_in: MagicMock, get_workbooks: MagicMock, @@ -83,7 +74,9 @@ def test_using_cached_asset_data_with_refresh_request( cancel_job: MagicMock, ) -> None: with instance_for_test() as instance: - from dagster_tableau_tests.repos.pending_repo_with_refresh_requests import pending_repo_from_cached_asset_metadata + from dagster_tableau_tests.repos.pending_repo_with_refreshable_workbook_ids import ( + pending_repo_from_cached_asset_metadata, + ) assert sign_in.call_count == 0 assert get_workbooks.call_count == 0 @@ -111,7 +104,7 @@ def test_using_cached_asset_data_with_refresh_request( repository_load_data = repository_def.repository_load_data recon_repo = ReconstructableRepository.for_file( - file_relative_path(__file__, "repos/pending_repo_with_refresh_requests.py"), + file_relative_path(__file__, "repos/pending_repo_with_refreshable_workbook_ids.py"), fn_name="pending_repo_from_cached_asset_metadata", ) recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job") @@ -128,8 +121,8 @@ def test_using_cached_asset_data_with_refresh_request( ) assert ( - len([event for event in events if event.event_type == DagsterEventType.STEP_SUCCESS]) - == 2 + len([event for event in events if event.event_type == DagsterEventType.STEP_SUCCESS]) + == 2 ), "Expected two successful steps" # 3 calls to create the defs + 5 calls to materialize the Tableau assets From cf5a7f061b76aab3d6c933a393e45c595d0ac58e Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 3 Oct 2024 14:32:46 -0400 Subject: [PATCH 3/4] Use FinishCode enum --- .../dagster-tableau/dagster_tableau/resources.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py index d7357b9773297..e15b8ec1e6e8e 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py @@ -140,12 +140,13 @@ def refresh_and_poll( job = self.get_job(job_id=job.id) if job.finish_code == -1: + # -1 is the default value for JobItem.finish_code, when the job is in progress continue - elif job.finish_code == 0: + elif job.finish_code == TSC.JobItem.FinishCode.Success: break - elif job.finish_code == 1: + elif job.finish_code == TSC.JobItem.FinishCode.Failed: raise Failure(f"Job failed: {job.id}") - elif job.finish_code == 2: + elif job.finish_code == TSC.JobItem.FinishCode.Cancelled: raise Failure(f"Job was cancelled: {job.id}") else: raise Failure( @@ -154,7 +155,7 @@ def refresh_and_poll( finally: # if Tableau sync has not completed, make sure to cancel it so that it doesn't outlive # the python process - if job.finish_code not in (0, 1, 2): + if job.finish_code not in (TSC.JobItem.FinishCode.Success, TSC.JobItem.FinishCode.Failed, TSC.JobItem.FinishCode.Cancelled): self.cancel_job(job.id) return job.workbook_id From f0bb01009c89ae23d844e72914cae09090f00dc1 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 3 Oct 2024 14:34:34 -0400 Subject: [PATCH 4/4] Lint --- .../libraries/dagster-tableau/dagster_tableau/resources.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py index e15b8ec1e6e8e..de6b5992bfa4f 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py @@ -155,7 +155,11 @@ def refresh_and_poll( finally: # if Tableau sync has not completed, make sure to cancel it so that it doesn't outlive # the python process - if job.finish_code not in (TSC.JobItem.FinishCode.Success, TSC.JobItem.FinishCode.Failed, TSC.JobItem.FinishCode.Cancelled): + if job.finish_code not in ( + TSC.JobItem.FinishCode.Success, + TSC.JobItem.FinishCode.Failed, + TSC.JobItem.FinishCode.Cancelled, + ): self.cancel_job(job.id) return job.workbook_id