Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-tableau] Implement refreshable workbooks #24862

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 151 additions & 16 deletions python_modules/libraries/dagster-tableau/dagster_tableau/resources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
import json
import logging
import time
import uuid
from abc import abstractmethod
from contextlib import contextmanager
Expand All @@ -13,9 +15,12 @@
AssetsDefinition,
ConfigurableResource,
Definitions,
Failure,
ObserveResult,
Output,
_check as check,
external_assets_from_specs,
get_dagster_logger,
multi_asset,
)
from dagster._annotations import experimental
Expand All @@ -34,6 +39,9 @@
TableauWorkspaceData,
)

DEFAULT_POLL_INTERVAL_SECONDS = 10
DEFAULT_POLL_TIMEOUT = 600


@experimental
class BaseTableauClient:
Expand All @@ -58,6 +66,11 @@ def __init__(
def base_url(self) -> str:
raise NotImplementedError()

@property
@cached_method
def _log(self) -> logging.Logger:
return get_dagster_logger()

@cached_method
def get_workbooks(self) -> Mapping[str, object]:
"""Fetches a list of all Tableau workbooks in the workspace."""
Expand All @@ -82,6 +95,75 @@ def get_view(
self._server.views.get_request(f"{self._server.views.baseurl}/{view_id}")
)

def get_job(
self,
job_id: str,
) -> TSC.JobItem:
"""Fetches information for a given job."""
return self._server.jobs.get_by_id(job_id)

def cancel_job(
self,
job_id: str,
) -> Mapping[str, object]:
"""Fetches information for a given job."""
return self._response_to_dict(self._server.jobs.cancel(job_id))

def refresh_workbook(self, workbook_id) -> TSC.JobItem:
"""Refreshes all extracts for a given workbook and return the JobItem object."""
return self._server.workbooks.refresh(workbook_id)

def refresh_and_poll(
self,
workbook_id: str,
poll_interval: Optional[float] = None,
poll_timeout: Optional[float] = None,
) -> Optional[str]:
job = self.refresh_workbook(workbook_id)

if not poll_interval:
poll_interval = DEFAULT_POLL_INTERVAL_SECONDS
if not poll_timeout:
poll_timeout = DEFAULT_POLL_TIMEOUT

self._log.info(f"Job {job.id} initialized for workbook_id={workbook_id}.")
start = time.monotonic()

try:
while True:
if poll_timeout and start + poll_timeout < time.monotonic():
raise Failure(
f"Timeout: Tableau job {job.id} is not ready after the timeout"
f" {poll_timeout} seconds"
)
time.sleep(poll_interval)
job = self.get_job(job_id=job.id)

if job.finish_code == -1:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be nice to stick these codes in enum values or constants so it's clearer what they represent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 1ddbbaa

# -1 is the default value for JobItem.finish_code, when the job is in progress
continue
elif job.finish_code == TSC.JobItem.FinishCode.Success:
break
elif job.finish_code == TSC.JobItem.FinishCode.Failed:
raise Failure(f"Job failed: {job.id}")
elif job.finish_code == TSC.JobItem.FinishCode.Cancelled:
raise Failure(f"Job was cancelled: {job.id}")
else:
raise Failure(
f"Encountered unexpected finish code `{job.finish_code}` for job {job.id}"
)
finally:
# if Tableau sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if job.finish_code not in (
TSC.JobItem.FinishCode.Success,
TSC.JobItem.FinishCode.Failed,
TSC.JobItem.FinishCode.Cancelled,
):
self.cancel_job(job.id)

return job.workbook_id

def sign_in(self) -> Auth.contextmgr:
"""Sign in to the site in Tableau."""
jwt_token = jwt.encode(
Expand All @@ -91,7 +173,7 @@ def sign_in(self) -> Auth.contextmgr:
"jti": str(uuid.uuid4()),
"aud": "tableau",
"sub": self.username,
"scp": ["tableau:content:read"],
"scp": ["tableau:content:read", "tableau:tasks:run"],
},
self.connected_app_secret_value,
algorithm="HS256",
Expand Down Expand Up @@ -303,36 +385,66 @@ def fetch_tableau_workspace_data(

def build_assets(
self,
refreshable_workbook_ids: Sequence[str],
dagster_tableau_translator: Type[DagsterTableauTranslator],
) -> Sequence[CacheableAssetsDefinition]:
"""Returns a set of CacheableAssetsDefinition which will load Tableau content from
the workspace and translates it into AssetSpecs, using the provided translator.

Args:
refreshable_workbook_ids (Sequence[str]): A list of workbook IDs. The workbooks provided must
have extracts as data sources and be refreshable in Tableau.

When materializing your Tableau assets, the workbooks provided are refreshed,
refreshing their sheets and dashboards before pulling their data in Dagster.

This feature is equivalent to selecting Refreshing Extracts for a workbook in Tableau UI
and only works for workbooks for which the data sources are extracts.
See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now
for documentation.
dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use
to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator.

Returns:
Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which
will load the Tableau content.
"""
return [TableauCacheableAssetsDefinition(self, dagster_tableau_translator)]
return [
TableauCacheableAssetsDefinition(
self, refreshable_workbook_ids, dagster_tableau_translator
)
]

def build_defs(
self, dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator
self,
refreshable_workbook_ids: Optional[Sequence[str]] = None,
dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator,
) -> Definitions:
"""Returns a Definitions object which will load Tableau content from
the workspace and translate it into assets, using the provided translator.

Args:
refreshable_workbook_ids (Optional[Sequence[str]]): A list of workbook IDs. The workbooks provided must
have extracts as data sources and be refreshable in Tableau.

When materializing your Tableau assets, the workbooks provided are refreshed,
refreshing their sheets and dashboards before pulling their data in Dagster.

This feature is equivalent to selecting Refreshing Extracts for a workbook in Tableau UI
and only works for workbooks for which the data sources are extracts.
See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now
for documentation.
dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use
to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator.

Returns:
Definitions: A Definitions object which will build and return the Power BI content.
"""
defs = Definitions(
assets=self.build_assets(dagster_tableau_translator=dagster_tableau_translator)
assets=self.build_assets(
refreshable_workbook_ids=refreshable_workbook_ids or [],
dagster_tableau_translator=dagster_tableau_translator,
),
)
return defs

Expand Down Expand Up @@ -376,8 +488,14 @@ def build_client(self) -> None:


class TableauCacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, workspace: BaseTableauWorkspace, translator: Type[DagsterTableauTranslator]):
def __init__(
self,
workspace: BaseTableauWorkspace,
refreshable_workbook_ids: Sequence[str],
translator: Type[DagsterTableauTranslator],
):
self._workspace = workspace
self._refreshable_workbook_ids = refreshable_workbook_ids
self._translator_cls = translator
super().__init__(unique_id=self._workspace.site_name)

Expand Down Expand Up @@ -440,6 +558,9 @@ def _build_tableau_assets_from_workspace_data(
)
def _assets(tableau: BaseTableauWorkspace):
with tableau.get_client() as client:
refreshed_workbooks = set()
for refreshable_workbook_id in self._refreshable_workbook_ids:
refreshed_workbooks.add(client.refresh_and_poll(refreshable_workbook_id))
for view_id, view_content_data in [
*workspace_data.sheets_by_id.items(),
*workspace_data.dashboards_by_id.items(),
Expand All @@ -451,16 +572,30 @@ def _assets(tableau: BaseTableauWorkspace):
asset_key = translator.get_dashboard_asset_key(view_content_data)
else:
check.assert_never(view_content_data.content_type)
yield ObserveResult(
asset_key=asset_key,
metadata={
"workbook_id": data["workbook"]["id"],
"owner_id": data["owner"]["id"],
"name": data["name"],
"contentUrl": data["contentUrl"],
"createdAt": data["createdAt"],
"updatedAt": data["updatedAt"],
},
)
if view_content_data.properties["workbook"]["luid"] in refreshed_workbooks:
yield Output(
value=None,
output_name="__".join(asset_key.path),
metadata={
"workbook_id": data["workbook"]["id"],
"owner_id": data["owner"]["id"],
"name": data["name"],
"contentUrl": data["contentUrl"],
"createdAt": data["createdAt"],
"updatedAt": data["updatedAt"],
},
)
else:
yield ObserveResult(
asset_key=asset_key,
metadata={
"workbook_id": data["workbook"]["id"],
"owner_id": data["owner"]["id"],
"name": data["name"],
"contentUrl": data["contentUrl"],
"createdAt": data["createdAt"],
"updatedAt": data["updatedAt"],
},
)

return [_assets]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ruff: noqa: SLF001

import uuid
from unittest.mock import patch
from unittest.mock import PropertyMock, patch

import pytest
from dagster_tableau.translator import TableauContentData, TableauContentType, TableauWorkspaceData
Expand Down Expand Up @@ -129,6 +129,11 @@ def dashboard_id_fixture() -> str:
return "c9bf8403-5daf-427a-b3d6-2ce9bed7798f"


@pytest.fixture(name="job_id")
def job_id_fixture() -> str:
return uuid.uuid4().hex


@pytest.fixture(name="sign_in", autouse=True)
def sign_in_fixture():
with patch("dagster_tableau.resources.BaseTableauClient.sign_in") as mocked_function:
Expand Down Expand Up @@ -156,6 +161,30 @@ def get_view_fixture():
yield mocked_function


@pytest.fixture(name="get_job", autouse=True)
def get_job_fixture(workbook_id, job_id):
with patch("dagster_tableau.resources.BaseTableauClient.get_job") as mocked_function:
type(mocked_function.return_value).id = PropertyMock(return_value=job_id)
type(mocked_function.return_value).finish_code = PropertyMock(return_value=0)
type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id)
yield mocked_function


@pytest.fixture(name="refresh_workbook", autouse=True)
def refresh_workbook_fixture(workbook_id, job_id):
with patch("dagster_tableau.resources.BaseTableauClient.refresh_workbook") as mocked_function:
type(mocked_function.return_value).id = PropertyMock(return_value=job_id)
type(mocked_function.return_value).finish_code = PropertyMock(return_value=-1)
type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id)
yield mocked_function


@pytest.fixture(name="cancel_job", autouse=True)
def cancel_job_fixture():
with patch("dagster_tableau.resources.BaseTableauClient.cancel_job") as mocked_function:
yield mocked_function


@pytest.fixture(
name="workspace_data",
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import cast

from dagster import asset, define_asset_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.repository_definition.repository_definition import (
PendingRepositoryDefinition,
)
from dagster_tableau import TableauCloudWorkspace

from dagster_tableau_tests.conftest import (
FAKE_CONNECTED_APP_CLIENT_ID,
FAKE_CONNECTED_APP_SECRET_ID,
FAKE_CONNECTED_APP_SECRET_VALUE,
FAKE_POD_NAME,
FAKE_SITE_NAME,
FAKE_USERNAME,
)

resource = TableauCloudWorkspace(
connected_app_client_id=FAKE_CONNECTED_APP_CLIENT_ID,
connected_app_secret_id=FAKE_CONNECTED_APP_SECRET_ID,
connected_app_secret_value=FAKE_CONNECTED_APP_SECRET_VALUE,
username=FAKE_USERNAME,
site_name=FAKE_SITE_NAME,
pod_name=FAKE_POD_NAME,
)

pbi_defs = resource.build_defs(refreshable_workbook_ids=["b75fc023-a7ca-4115-857b-4342028640d0"])


@asset
def my_materializable_asset():
pass


pending_repo_from_cached_asset_metadata = cast(
PendingRepositoryDefinition,
Definitions.merge(
Definitions(assets=[my_materializable_asset], jobs=[define_asset_job("all_asset_job")]),
pbi_defs,
).get_inner_repository(),
)
Loading