Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/tableau): add retry on timeout #10995

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@
},
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
# to remove that dependency.
"tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib,
"tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib,
"teradata": sql_common
| usage_common
| sqlglot_lib
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ def report_log(
)

# Add the simple exception details to the context.
context = f"{context}: {exc}"
if context:
context = f"{context} {type(exc)}: {exc}"
else:
context = f"{type(exc)}: {exc}"
elif log:
logger.log(level=level.value, msg=log_content, stacklevel=stacklevel)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:

if copy_previous_state_and_fail:
logger.info(
f"Copying urns from last state (size {last_checkpoint_state.urns}) to current state (size {cur_checkpoint_state.urns}) "
f"Copying urns from last state (size {len(last_checkpoint_state.urns)}) to current state (size {len(cur_checkpoint_state.urns)}) "
"to ensure stale entities from previous runs are deleted on the next successful run."
)
for urn in last_checkpoint_state.urns:
Expand Down
48 changes: 47 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
import time
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime
Expand All @@ -13,6 +14,7 @@
Optional,
Set,
Tuple,
Type,
Union,
cast,
)
Expand Down Expand Up @@ -158,6 +160,21 @@
from datahub.utilities import config_clean
from datahub.utilities.urns.dataset_urn import DatasetUrn

try:
# On earlier versions of the tableauserverclient, the NonXMLResponseError
# was thrown when reauthentication was needed. We'll keep both exceptions
# around for now, but can remove this in the future.
from tableauserverclient.server.endpoint.exceptions import ( # type: ignore
NotSignedInError,
)

REAUTHENTICATE_ERRORS: Tuple[Type[Exception], ...] = (
NotSignedInError,
NonXMLResponseError,
)
except ImportError:
REAUTHENTICATE_ERRORS = (NonXMLResponseError,)

logger: logging.Logger = logging.getLogger(__name__)

# Replace / with |
Expand Down Expand Up @@ -965,7 +982,7 @@ def get_connection_object_page(
query_data = query_metadata(
self.server, query, connection_type, count, offset, query_filter
)
except NonXMLResponseError:
except REAUTHENTICATE_ERRORS:
if not retry_on_auth_error:
raise

Expand Down Expand Up @@ -1038,6 +1055,35 @@ def get_connection_object_page(
)

else:
# As of Tableau Server 2024.2, the metadata API sporadically returns a 30 second
# timeout error. It doesn't reliably happen, so retrying a couple times makes sense.
if all(
error.get("message")
== "Execution canceled because timeout of 30000 millis was reached"
for error in errors
):
# If it was only a timeout error, we can retry.
if retries_remaining <= 0:
raise

# This is a pretty dumb backoff mechanism, but it's good enough for now.
backoff_time = min(
(self.config.max_retries - retries_remaining + 1) ** 2, 60
)
logger.info(
f"Query {connection_type} received a 30 second timeout error - will retry in {backoff_time} seconds. "
f"Retries remaining: {retries_remaining}"
)
time.sleep(backoff_time)
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
)
raise RuntimeError(f"Query {connection_type} error: {errors}")

connection_object = query_data.get(c.DATA, {}).get(connection_type, {})
Expand Down
Loading