Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/tableau): add retry on timeout #10995

Merged
merged 6 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ def report_log(
)

# Add the simple exception details to the context.
context = f"{context}: {exc}"
if context:
context = f"{context} {type(exc)}: {exc}"
else:
context = f"{type(exc)}: {exc}"
elif log:
logger.log(level=level.value, msg=log_content, stacklevel=stacklevel)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:

if copy_previous_state_and_fail:
logger.info(
f"Copying urns from last state (size {last_checkpoint_state.urns}) to current state (size {cur_checkpoint_state.urns}) "
f"Copying urns from last state (size {len(last_checkpoint_state.urns)}) to current state (size {len(cur_checkpoint_state.urns)}) "
"to ensure stale entities from previous runs are deleted on the next successful run."
)
for urn in last_checkpoint_state.urns:
Expand Down
25 changes: 25 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import re
import time
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime
Expand Down Expand Up @@ -1038,6 +1039,30 @@ def get_connection_object_page(
)

else:
# As of Tableau Server 2024.2, the metadata API sporadically returns a 30 second
# timeout error. It doesn't reliably happen, so retrying a couple times makes sense.
if all(
error.get("message")
== "Execution canceled because timeout of 30000 millis was reached"
for error in errors
):
# If it was only a timeout error, we can retry.
if retries_remaining <= 0:
raise
logger.info(
f"Query {connection_type} received a 30 second timeout error - will retry in a few seconds"
)
# This is a pretty dumb backoff mechanism, but it's good enough for now.
time.sleep(60 / retries_remaining)
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement exponential backoff for retries.

The current retry mechanism uses a simple sleep function that spaces out retries based on the remaining attempts. An exponential backoff strategy would be more effective in handling transient errors and reducing load on the server.

- time.sleep(60 / retries_remaining)
+ backoff_time = (self.config.max_retries - retries_remaining + 1) ** 2
+ time.sleep(backoff_time)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# As of Tableau Server 2024.2, the metadata API sporadically returns a 30 second
# timeout error. It doesn't reliably happen, so retrying a couple times makes sense.
if all(
error.get("message")
== "Execution canceled because timeout of 30000 millis was reached"
for error in errors
):
# If it was only a timeout error, we can retry.
if retries_remaining <= 0:
raise
logger.info(
f"Query {connection_type} received a 30 second timeout error - will retry in a few seconds"
)
# This is a pretty dumb backoff mechanism, but it's good enough for now.
time.sleep(60 / retries_remaining)
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
)
# As of Tableau Server 2024.2, the metadata API sporadically returns a 30 second
# timeout error. It doesn't reliably happen, so retrying a couple times makes sense.
if all(
error.get("message")
== "Execution canceled because timeout of 30000 millis was reached"
for error in errors
):
# If it was only a timeout error, we can retry.
if retries_remaining <= 0:
raise
logger.info(
f"Query {connection_type} received a 30 second timeout error - will retry in a few seconds"
)
# This is a pretty dumb backoff mechanism, but it's good enough for now.
backoff_time = (self.config.max_retries - retries_remaining + 1) ** 2
time.sleep(backoff_time)
return self.get_connection_object_page(
query,
connection_type,
query_filter,
count,
offset,
retry_on_auth_error=False,
retries_remaining=retries_remaining - 1,
)

raise RuntimeError(f"Query {connection_type} error: {errors}")

connection_object = query_data.get(c.DATA, {}).get(connection_type, {})
Expand Down
Loading