diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py index ced07f4b22d50..de6b5992bfa4f 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau/resources.py @@ -1,5 +1,7 @@ import datetime import json +import logging +import time import uuid from abc import abstractmethod from contextlib import contextmanager @@ -13,9 +15,12 @@ AssetsDefinition, ConfigurableResource, Definitions, + Failure, ObserveResult, + Output, _check as check, external_assets_from_specs, + get_dagster_logger, multi_asset, ) from dagster._annotations import experimental @@ -34,6 +39,9 @@ TableauWorkspaceData, ) +DEFAULT_POLL_INTERVAL_SECONDS = 10 +DEFAULT_POLL_TIMEOUT = 600 + @experimental class BaseTableauClient: @@ -58,6 +66,11 @@ def __init__( def base_url(self) -> str: raise NotImplementedError() + @property + @cached_method + def _log(self) -> logging.Logger: + return get_dagster_logger() + @cached_method def get_workbooks(self) -> Mapping[str, object]: """Fetches a list of all Tableau workbooks in the workspace.""" @@ -82,6 +95,75 @@ def get_view( self._server.views.get_request(f"{self._server.views.baseurl}/{view_id}") ) + def get_job( + self, + job_id: str, + ) -> TSC.JobItem: + """Fetches information for a given job.""" + return self._server.jobs.get_by_id(job_id) + + def cancel_job( + self, + job_id: str, + ) -> Mapping[str, object]: + """Fetches information for a given job.""" + return self._response_to_dict(self._server.jobs.cancel(job_id)) + + def refresh_workbook(self, workbook_id) -> TSC.JobItem: + """Refreshes all extracts for a given workbook and return the JobItem object.""" + return self._server.workbooks.refresh(workbook_id) + + def refresh_and_poll( + self, + workbook_id: str, + poll_interval: Optional[float] = None, + poll_timeout: Optional[float] = None, + ) -> Optional[str]: + job = self.refresh_workbook(workbook_id) + + if not poll_interval: + poll_interval = DEFAULT_POLL_INTERVAL_SECONDS + if not poll_timeout: + poll_timeout = DEFAULT_POLL_TIMEOUT + + self._log.info(f"Job {job.id} initialized for workbook_id={workbook_id}.") + start = time.monotonic() + + try: + while True: + if poll_timeout and start + poll_timeout < time.monotonic(): + raise Failure( + f"Timeout: Tableau job {job.id} is not ready after the timeout" + f" {poll_timeout} seconds" + ) + time.sleep(poll_interval) + job = self.get_job(job_id=job.id) + + if job.finish_code == -1: + # -1 is the default value for JobItem.finish_code, when the job is in progress + continue + elif job.finish_code == TSC.JobItem.FinishCode.Success: + break + elif job.finish_code == TSC.JobItem.FinishCode.Failed: + raise Failure(f"Job failed: {job.id}") + elif job.finish_code == TSC.JobItem.FinishCode.Cancelled: + raise Failure(f"Job was cancelled: {job.id}") + else: + raise Failure( + f"Encountered unexpected finish code `{job.finish_code}` for job {job.id}" + ) + finally: + # if Tableau sync has not completed, make sure to cancel it so that it doesn't outlive + # the python process + if job.finish_code not in ( + TSC.JobItem.FinishCode.Success, + TSC.JobItem.FinishCode.Failed, + TSC.JobItem.FinishCode.Cancelled, + ): + self.cancel_job(job.id) + + return job.workbook_id + def sign_in(self) -> Auth.contextmgr: """Sign in to the site in Tableau.""" jwt_token = jwt.encode( @@ -91,7 +173,7 @@ def sign_in(self) -> Auth.contextmgr: "jti": str(uuid.uuid4()), "aud": "tableau", "sub": self.username, - "scp": ["tableau:content:read"], + "scp": ["tableau:content:read", "tableau:tasks:run"], }, self.connected_app_secret_value, algorithm="HS256", @@ -303,12 +385,23 @@ def fetch_tableau_workspace_data( def build_assets( self, + refreshable_workbook_ids: Sequence[str], dagster_tableau_translator: Type[DagsterTableauTranslator], ) -> Sequence[CacheableAssetsDefinition]: """Returns a set of CacheableAssetsDefinition which will load Tableau content from the workspace and translates it into AssetSpecs, using the provided translator. Args: + refreshable_workbook_ids (Sequence[str]): A list of workbook IDs. The workbooks provided must + have extracts as data sources and be refreshable in Tableau. + + When materializing your Tableau assets, the workbooks provided are refreshed, + refreshing their sheets and dashboards before pulling their data in Dagster. + + This feature is equivalent to selecting Refreshing Extracts for a workbook in Tableau UI + and only works for workbooks for which the data sources are extracts. + See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now + for documentation. dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator. @@ -316,15 +409,31 @@ def build_assets( Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which will load the Tableau content. """ - return [TableauCacheableAssetsDefinition(self, dagster_tableau_translator)] + return [ + TableauCacheableAssetsDefinition( + self, refreshable_workbook_ids, dagster_tableau_translator + ) + ] def build_defs( - self, dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator + self, + refreshable_workbook_ids: Optional[Sequence[str]] = None, + dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator, ) -> Definitions: """Returns a Definitions object which will load Tableau content from the workspace and translate it into assets, using the provided translator. Args: + refreshable_workbook_ids (Optional[Sequence[str]]): A list of workbook IDs. The workbooks provided must + have extracts as data sources and be refreshable in Tableau. + + When materializing your Tableau assets, the workbooks provided are refreshed, + refreshing their sheets and dashboards before pulling their data in Dagster. + + This feature is equivalent to selecting Refreshing Extracts for a workbook in Tableau UI + and only works for workbooks for which the data sources are extracts. + See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now + for documentation. dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator. @@ -332,7 +441,10 @@ def build_defs( Definitions: A Definitions object which will build and return the Power BI content. """ defs = Definitions( - assets=self.build_assets(dagster_tableau_translator=dagster_tableau_translator) + assets=self.build_assets( + refreshable_workbook_ids=refreshable_workbook_ids or [], + dagster_tableau_translator=dagster_tableau_translator, + ), ) return defs @@ -376,8 +488,14 @@ def build_client(self) -> None: class TableauCacheableAssetsDefinition(CacheableAssetsDefinition): - def __init__(self, workspace: BaseTableauWorkspace, translator: Type[DagsterTableauTranslator]): + def __init__( + self, + workspace: BaseTableauWorkspace, + refreshable_workbook_ids: Sequence[str], + translator: Type[DagsterTableauTranslator], + ): self._workspace = workspace + self._refreshable_workbook_ids = refreshable_workbook_ids self._translator_cls = translator super().__init__(unique_id=self._workspace.site_name) @@ -440,6 +558,9 @@ def _build_tableau_assets_from_workspace_data( ) def _assets(tableau: BaseTableauWorkspace): with tableau.get_client() as client: + refreshed_workbooks = set() + for refreshable_workbook_id in self._refreshable_workbook_ids: + refreshed_workbooks.add(client.refresh_and_poll(refreshable_workbook_id)) for view_id, view_content_data in [ *workspace_data.sheets_by_id.items(), *workspace_data.dashboards_by_id.items(), @@ -451,16 +572,30 @@ def _assets(tableau: BaseTableauWorkspace): asset_key = translator.get_dashboard_asset_key(view_content_data) else: check.assert_never(view_content_data.content_type) - yield ObserveResult( - asset_key=asset_key, - metadata={ - "workbook_id": data["workbook"]["id"], - "owner_id": data["owner"]["id"], - "name": data["name"], - "contentUrl": data["contentUrl"], - "createdAt": data["createdAt"], - "updatedAt": data["updatedAt"], - }, - ) + if view_content_data.properties["workbook"]["luid"] in refreshed_workbooks: + yield Output( + value=None, + output_name="__".join(asset_key.path), + metadata={ + "workbook_id": data["workbook"]["id"], + "owner_id": data["owner"]["id"], + "name": data["name"], + "contentUrl": data["contentUrl"], + "createdAt": data["createdAt"], + "updatedAt": data["updatedAt"], + }, + ) + else: + yield ObserveResult( + asset_key=asset_key, + metadata={ + "workbook_id": data["workbook"]["id"], + "owner_id": data["owner"]["id"], + "name": data["name"], + "contentUrl": data["contentUrl"], + "createdAt": data["createdAt"], + "updatedAt": data["updatedAt"], + }, + ) return [_assets] diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py index 5c874316c6b5e..ec0f00730e12d 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/conftest.py @@ -1,7 +1,7 @@ # ruff: noqa: SLF001 import uuid -from unittest.mock import patch +from unittest.mock import PropertyMock, patch import pytest from dagster_tableau.translator import TableauContentData, TableauContentType, TableauWorkspaceData @@ -129,6 +129,11 @@ def dashboard_id_fixture() -> str: return "c9bf8403-5daf-427a-b3d6-2ce9bed7798f" +@pytest.fixture(name="job_id") +def job_id_fixture() -> str: + return uuid.uuid4().hex + + @pytest.fixture(name="sign_in", autouse=True) def sign_in_fixture(): with patch("dagster_tableau.resources.BaseTableauClient.sign_in") as mocked_function: @@ -156,6 +161,30 @@ def get_view_fixture(): yield mocked_function +@pytest.fixture(name="get_job", autouse=True) +def get_job_fixture(workbook_id, job_id): + with patch("dagster_tableau.resources.BaseTableauClient.get_job") as mocked_function: + type(mocked_function.return_value).id = PropertyMock(return_value=job_id) + type(mocked_function.return_value).finish_code = PropertyMock(return_value=0) + type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id) + yield mocked_function + + +@pytest.fixture(name="refresh_workbook", autouse=True) +def refresh_workbook_fixture(workbook_id, job_id): + with patch("dagster_tableau.resources.BaseTableauClient.refresh_workbook") as mocked_function: + type(mocked_function.return_value).id = PropertyMock(return_value=job_id) + type(mocked_function.return_value).finish_code = PropertyMock(return_value=-1) + type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id) + yield mocked_function + + +@pytest.fixture(name="cancel_job", autouse=True) +def cancel_job_fixture(): + with patch("dagster_tableau.resources.BaseTableauClient.cancel_job") as mocked_function: + yield mocked_function + + @pytest.fixture( name="workspace_data", ) diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/__init__.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/pending_repo.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo.py similarity index 100% rename from python_modules/libraries/dagster-tableau/dagster_tableau_tests/pending_repo.py rename to python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo.py diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refreshable_workbook_ids.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refreshable_workbook_ids.py new file mode 100644 index 0000000000000..595a3a3cacd31 --- /dev/null +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/repos/pending_repo_with_refreshable_workbook_ids.py @@ -0,0 +1,42 @@ +from typing import cast + +from dagster import asset, define_asset_job +from dagster._core.definitions.definitions_class import Definitions +from dagster._core.definitions.repository_definition.repository_definition import ( + PendingRepositoryDefinition, +) +from dagster_tableau import TableauCloudWorkspace + +from dagster_tableau_tests.conftest import ( + FAKE_CONNECTED_APP_CLIENT_ID, + FAKE_CONNECTED_APP_SECRET_ID, + FAKE_CONNECTED_APP_SECRET_VALUE, + FAKE_POD_NAME, + FAKE_SITE_NAME, + FAKE_USERNAME, +) + +resource = TableauCloudWorkspace( + connected_app_client_id=FAKE_CONNECTED_APP_CLIENT_ID, + connected_app_secret_id=FAKE_CONNECTED_APP_SECRET_ID, + connected_app_secret_value=FAKE_CONNECTED_APP_SECRET_VALUE, + username=FAKE_USERNAME, + site_name=FAKE_SITE_NAME, + pod_name=FAKE_POD_NAME, +) + +pbi_defs = resource.build_defs(refreshable_workbook_ids=["b75fc023-a7ca-4115-857b-4342028640d0"]) + + +@asset +def my_materializable_asset(): + pass + + +pending_repo_from_cached_asset_metadata = cast( + PendingRepositoryDefinition, + Definitions.merge( + Definitions(assets=[my_materializable_asset], jobs=[define_asset_job("all_asset_job")]), + pbi_defs, + ).get_inner_repository(), +) diff --git a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py index d97db6121e0de..964cc4958d9e8 100644 --- a/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py +++ b/python_modules/libraries/dagster-tableau/dagster_tableau_tests/test_pending_repo.py @@ -14,7 +14,7 @@ def test_using_cached_asset_data( get_view: MagicMock, ) -> None: with instance_for_test() as instance: - from dagster_tableau_tests.pending_repo import pending_repo_from_cached_asset_metadata + from dagster_tableau_tests.repos.pending_repo import pending_repo_from_cached_asset_metadata assert sign_in.call_count == 0 assert get_workbooks.call_count == 0 @@ -36,7 +36,7 @@ def test_using_cached_asset_data( repository_load_data = repository_def.repository_load_data recon_repo = ReconstructableRepository.for_file( - file_relative_path(__file__, "pending_repo.py"), + file_relative_path(__file__, "repos/pending_repo.py"), fn_name="pending_repo_from_cached_asset_metadata", ) recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job") @@ -62,3 +62,76 @@ def test_using_cached_asset_data( assert get_workbooks.call_count == 1 assert get_workbook.call_count == 1 assert get_view.call_count == 2 + + +def test_using_cached_asset_data_with_refresh_request( + sign_in: MagicMock, + get_workbooks: MagicMock, + get_workbook: MagicMock, + get_view: MagicMock, + get_job: MagicMock, + refresh_workbook: MagicMock, + cancel_job: MagicMock, +) -> None: + with instance_for_test() as instance: + from dagster_tableau_tests.repos.pending_repo_with_refreshable_workbook_ids import ( + pending_repo_from_cached_asset_metadata, + ) + + assert sign_in.call_count == 0 + assert get_workbooks.call_count == 0 + assert get_workbook.call_count == 0 + assert get_view.call_count == 0 + assert refresh_workbook.call_count == 0 + assert get_job.call_count == 0 + assert cancel_job.call_count == 0 + + # first, we resolve the repository to generate our cached metadata + repository_def = pending_repo_from_cached_asset_metadata.compute_repository_definition() + # 3 calls to creates the defs + assert sign_in.call_count == 1 + assert get_workbooks.call_count == 1 + assert get_workbook.call_count == 1 + assert get_view.call_count == 0 + assert refresh_workbook.call_count == 0 + assert get_job.call_count == 0 + assert cancel_job.call_count == 0 + + # 1 Tableau external assets, 2 Tableau materializable asset and 1 Dagster materializable asset + assert len(repository_def.assets_defs_by_key) == 1 + 2 + 1 + + job_def = repository_def.get_job("all_asset_job") + repository_load_data = repository_def.repository_load_data + + recon_repo = ReconstructableRepository.for_file( + file_relative_path(__file__, "repos/pending_repo_with_refreshable_workbook_ids.py"), + fn_name="pending_repo_from_cached_asset_metadata", + ) + recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job") + + execution_plan = create_execution_plan(recon_job, repository_load_data=repository_load_data) + + run = instance.create_run_for_job(job_def=job_def, execution_plan=execution_plan) + + events = execute_plan( + execution_plan=execution_plan, + job=recon_job, + dagster_run=run, + instance=instance, + ) + + assert ( + len([event for event in events if event.event_type == DagsterEventType.STEP_SUCCESS]) + == 2 + ), "Expected two successful steps" + + # 3 calls to create the defs + 5 calls to materialize the Tableau assets + # with 1 workbook to refresh, 1 sheet and 1 dashboard + assert sign_in.call_count == 2 + assert get_workbooks.call_count == 1 + assert get_workbook.call_count == 1 + assert get_view.call_count == 2 + assert refresh_workbook.call_count == 1 + assert get_job.call_count == 1 + # The finish_code of the mocked get_job is 0, so no cancel_job is not called + assert cancel_job.call_count == 0