Skip to content

Commit

Permalink
[dagster-tableau] Implement StartWorkbookRefreshRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Oct 2, 2024
1 parent 4d5429f commit 7cefd11
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
TableauCloudWorkspace as TableauCloudWorkspace,
TableauServerWorkspace as TableauServerWorkspace,
)
from dagster_tableau.translator import DagsterTableauTranslator as DagsterTableauTranslator
from dagster_tableau.translator import (
DagsterTableauTranslator as DagsterTableauTranslator,
StartWorkbookRefreshRequest as StartWorkbookRefreshRequest,
)

# Move back to version.py and edit setup.py once we are ready to publish.
__version__ = "1!0+dev"
Expand Down
153 changes: 137 additions & 16 deletions python_modules/libraries/dagster-tableau/dagster_tableau/resources.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
import json
import logging
import time
import uuid
from abc import abstractmethod
from contextlib import contextmanager
Expand All @@ -13,9 +15,12 @@
AssetsDefinition,
ConfigurableResource,
Definitions,
Failure,
ObserveResult,
Output,
_check as check,
external_assets_from_specs,
get_dagster_logger,
multi_asset,
)
from dagster._annotations import experimental
Expand All @@ -29,11 +34,15 @@

from dagster_tableau.translator import (
DagsterTableauTranslator,
StartWorkbookRefreshRequest,
TableauContentData,
TableauContentType,
TableauWorkspaceData,
)

DEFAULT_POLL_INTERVAL_SECONDS = 10
DEFAULT_POLL_TIMEOUT = 600


@experimental
class BaseTableauClient:
Expand All @@ -58,6 +67,11 @@ def __init__(
def base_url(self) -> str:
raise NotImplementedError()

@property
@cached_method
def _log(self) -> logging.Logger:
return get_dagster_logger()

@cached_method
def get_workbooks(self) -> Mapping[str, object]:
"""Fetches a list of all Tableau workbooks in the workspace."""
Expand All @@ -82,6 +96,70 @@ def get_view(
self._server.views.get_request(f"{self._server.views.baseurl}/{view_id}")
)

def get_job(
self,
job_id: str,
) -> TSC.JobItem:
"""Fetches information for a given job."""
return self._server.jobs.get_by_id(job_id)

def cancel_job(
self,
job_id: str,
) -> Mapping[str, object]:
"""Fetches information for a given job."""
return self._response_to_dict(self._server.jobs.cancel(job_id))

def refresh_workbook(self, workbook_id) -> TSC.JobItem:
"""Refreshes all extracts for a given workbook and return the JobItem object."""
return self._server.workbooks.refresh(workbook_id)

def refresh_and_poll(
self,
workbook_id: str,
poll_interval: Optional[float] = None,
poll_timeout: Optional[float] = None,
) -> str:
job = self.refresh_workbook(workbook_id)

if not poll_interval:
poll_interval = DEFAULT_POLL_INTERVAL_SECONDS
if not poll_timeout:
poll_timeout = DEFAULT_POLL_TIMEOUT

self._log.info(f"Job {job.id} initialized for workbook_id={workbook_id}.")
start = time.monotonic()

try:
while True:
if poll_timeout and start + poll_timeout < time.monotonic():
raise Failure(
f"Timeout: Tableau job {job.id} is not ready after the timeout"
f" {poll_timeout} seconds"
)
time.sleep(poll_interval)
job = self.get_job(job_id=job.id)

if job.finish_code == -1:
continue
elif job.finish_code == 0:
break
elif job.finish_code == 1:
raise Failure(f"Job failed: {job.id}")
elif job.finish_code == 2:
raise Failure(f"Job was cancelled: {job.id}")
else:
raise Failure(
f"Encountered unexpected finish code `{job.finish_code}` for job {job.id}"
)
finally:
# if Tableau sync has not completed, make sure to cancel it so that it doesn't outlive
# the python process
if job.finish_code not in (0, 1, 2):
self.cancel_job(job.id)

return job.workbook_id

def sign_in(self) -> Auth.contextmgr:
"""Sign in to the site in Tableau."""
jwt_token = jwt.encode(
Expand All @@ -91,7 +169,7 @@ def sign_in(self) -> Auth.contextmgr:
"jti": str(uuid.uuid4()),
"aud": "tableau",
"sub": self.username,
"scp": ["tableau:content:read"],
"scp": ["tableau:content:read", "tableau:tasks:run"],
},
self.connected_app_secret_value,
algorithm="HS256",
Expand Down Expand Up @@ -303,36 +381,56 @@ def fetch_tableau_workspace_data(

def build_assets(
self,
start_workbook_refresh_requests: Sequence[StartWorkbookRefreshRequest],
dagster_tableau_translator: Type[DagsterTableauTranslator],
) -> Sequence[CacheableAssetsDefinition]:
"""Returns a set of CacheableAssetsDefinition which will load Tableau content from
the workspace and translates it into AssetSpecs, using the provided translator.
Args:
start_workbook_refresh_requests (Sequence[StartWorkbookRefreshRequest]): A list of
requests to start workbook refreshes. This feature is equivalent to selecting Refreshing Extracts
for a workbook in Tableau UI and only works for workbooks for which the data sources are extracts.
See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now
for documentation.
dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use
to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator.
Returns:
Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which
will load the Tableau content.
"""
return [TableauCacheableAssetsDefinition(self, dagster_tableau_translator)]
return [
TableauCacheableAssetsDefinition(
self, start_workbook_refresh_requests, dagster_tableau_translator
)
]

def build_defs(
self, dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator
self,
start_workbook_refresh_requests: Optional[Sequence[StartWorkbookRefreshRequest]] = None,
dagster_tableau_translator: Type[DagsterTableauTranslator] = DagsterTableauTranslator,
) -> Definitions:
"""Returns a Definitions object which will load Tableau content from
the workspace and translate it into assets, using the provided translator.
Args:
start_workbook_refresh_requests (Optional[Sequence[StartWorkbookRefreshRequest]]): A list of
requests to start workbook refreshes. This feature is equivalent to selecting Refreshing Extracts
for a workbook in Tableau UI and only works for workbooks for which the data sources are extracts.
See https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_ref_workbooks_and_views.htm#update_workbook_now
for documentation.
dagster_tableau_translator (Type[DagsterTableauTranslator]): The translator to use
to convert Tableau content into AssetSpecs. Defaults to DagsterTableauTranslator.
Returns:
Definitions: A Definitions object which will build and return the Power BI content.
"""
defs = Definitions(
assets=self.build_assets(dagster_tableau_translator=dagster_tableau_translator)
assets=self.build_assets(
start_workbook_refresh_requests=start_workbook_refresh_requests or [],
dagster_tableau_translator=dagster_tableau_translator,
),
)
return defs

Expand Down Expand Up @@ -376,8 +474,14 @@ def build_client(self) -> None:


class TableauCacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, workspace: BaseTableauWorkspace, translator: Type[DagsterTableauTranslator]):
def __init__(
self,
workspace: BaseTableauWorkspace,
refresh_requests: Sequence[StartWorkbookRefreshRequest],
translator: Type[DagsterTableauTranslator],
):
self._workspace = workspace
self._refresh_requests = refresh_requests
self._translator_cls = translator
super().__init__(unique_id=self._workspace.site_name)

Expand Down Expand Up @@ -440,6 +544,9 @@ def _build_tableau_assets_from_workspace_data(
)
def _assets(tableau: BaseTableauWorkspace):
with tableau.get_client() as client:
refreshed_workbooks = set()
for refresh_request in self._refresh_requests:
refreshed_workbooks.add(client.refresh_and_poll(refresh_request.workbook_id))
for view_id, view_content_data in [
*workspace_data.sheets_by_id.items(),
*workspace_data.dashboards_by_id.items(),
Expand All @@ -451,16 +558,30 @@ def _assets(tableau: BaseTableauWorkspace):
asset_key = translator.get_dashboard_asset_key(view_content_data)
else:
check.assert_never(view_content_data.content_type)
yield ObserveResult(
asset_key=asset_key,
metadata={
"workbook_id": data["workbook"]["id"],
"owner_id": data["owner"]["id"],
"name": data["name"],
"contentUrl": data["contentUrl"],
"createdAt": data["createdAt"],
"updatedAt": data["updatedAt"],
},
)
if view_content_data.properties["workbook"]["luid"] in refreshed_workbooks:
yield Output(
value=None,
output_name="__".join(asset_key.path),
metadata={
"workbook_id": data["workbook"]["id"],
"owner_id": data["owner"]["id"],
"name": data["name"],
"contentUrl": data["contentUrl"],
"createdAt": data["createdAt"],
"updatedAt": data["updatedAt"],
},
)
else:
yield ObserveResult(
asset_key=asset_key,
metadata={
"workbook_id": data["workbook"]["id"],
"owner_id": data["owner"]["id"],
"name": data["name"],
"contentUrl": data["contentUrl"],
"createdAt": data["createdAt"],
"updatedAt": data["updatedAt"],
},
)

return [_assets]
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def _clean_asset_name(name: str) -> str:
return re.sub(r"[^a-z0-9A-Z.]+", "_", name).lower()


@record
class StartWorkbookRefreshRequest:
workbook_id: str


class TableauContentType(Enum):
"""Enum representing each object in Tableau's ontology."""

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# ruff: noqa: SLF001

import uuid
from unittest.mock import patch
from unittest.mock import patch, PropertyMock

import pytest
from dagster_tableau.translator import TableauContentData, TableauContentType, TableauWorkspaceData
Expand Down Expand Up @@ -129,6 +129,11 @@ def dashboard_id_fixture() -> str:
return "c9bf8403-5daf-427a-b3d6-2ce9bed7798f"


@pytest.fixture(name="job_id")
def job_id_fixture() -> str:
return uuid.uuid4().hex


@pytest.fixture(name="sign_in", autouse=True)
def sign_in_fixture():
with patch("dagster_tableau.resources.BaseTableauClient.sign_in") as mocked_function:
Expand Down Expand Up @@ -156,6 +161,30 @@ def get_view_fixture():
yield mocked_function


@pytest.fixture(name="get_job", autouse=True)
def get_job_fixture(workbook_id, job_id):
with patch("dagster_tableau.resources.BaseTableauClient.get_job") as mocked_function:
type(mocked_function.return_value).id = PropertyMock(return_value=job_id)
type(mocked_function.return_value).finish_code = PropertyMock(return_value=0)
type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id)
yield mocked_function


@pytest.fixture(name="refresh_workbook", autouse=True)
def refresh_workbook_fixture(workbook_id, job_id):
with patch("dagster_tableau.resources.BaseTableauClient.refresh_workbook") as mocked_function:
type(mocked_function.return_value).id = PropertyMock(return_value=job_id)
type(mocked_function.return_value).finish_code = PropertyMock(return_value=-1)
type(mocked_function.return_value).workbook_id = PropertyMock(return_value=workbook_id)
yield mocked_function


@pytest.fixture(name="cancel_job", autouse=True)
def cancel_job_fixture():
with patch("dagster_tableau.resources.BaseTableauClient.cancel_job") as mocked_function:
yield mocked_function


@pytest.fixture(
name="workspace_data",
)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import cast

from dagster import asset, define_asset_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.repository_definition.repository_definition import (
PendingRepositoryDefinition,
)
from dagster_tableau import StartWorkbookRefreshRequest, TableauCloudWorkspace

from dagster_tableau_tests.conftest import (
FAKE_CONNECTED_APP_CLIENT_ID,
FAKE_CONNECTED_APP_SECRET_ID,
FAKE_CONNECTED_APP_SECRET_VALUE,
FAKE_POD_NAME,
FAKE_SITE_NAME,
FAKE_USERNAME,
)

resource = TableauCloudWorkspace(
connected_app_client_id=FAKE_CONNECTED_APP_CLIENT_ID,
connected_app_secret_id=FAKE_CONNECTED_APP_SECRET_ID,
connected_app_secret_value=FAKE_CONNECTED_APP_SECRET_VALUE,
username=FAKE_USERNAME,
site_name=FAKE_SITE_NAME,
pod_name=FAKE_POD_NAME,
)

pbi_defs = resource.build_defs(
start_workbook_refresh_requests=[
StartWorkbookRefreshRequest(workbook_id="b75fc023-a7ca-4115-857b-4342028640d0")
]
)


@asset
def my_materializable_asset():
pass


pending_repo_from_cached_asset_metadata = cast(
PendingRepositoryDefinition,
Definitions.merge(
Definitions(assets=[my_materializable_asset], jobs=[define_asset_job("all_asset_job")]),
pbi_defs,
).get_inner_repository(),
)
Loading

0 comments on commit 7cefd11

Please sign in to comment.