Skip to content

Commit

Permalink
feat: optimize retry if needed (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jrmyy authored Jan 5, 2024
1 parent 60c3182 commit 78dee30
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
30 changes: 24 additions & 6 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.column import AthenaColumn
from dbt.adapters.athena.config import get_boto3_config
from dbt.adapters.athena.connections import AthenaCursor
from dbt.adapters.athena.constants import LOGGER
from dbt.adapters.athena.exceptions import (
S3LocationException,
Expand Down Expand Up @@ -1247,14 +1248,9 @@ def _get_table_input(table: TableTypeDef) -> TableInputTypeDef:

@available
def run_query_with_partitions_limit_catching(self, sql: str) -> str:
query = self.connections._add_query_comment(sql)
conn = self.connections.get_thread_connection()
cursor = conn.handle.cursor()
LOGGER.debug(f"Running Athena query:\n{query}")
try:
cursor.execute(query, catch_partitions_limit=True)
cursor = self._run_query(sql, catch_partitions_limit=True)
except OperationalError as e:
LOGGER.debug(f"CAUGHT EXCEPTION: {e}")
if "TOO_MANY_OPEN_PARTITIONS" in str(e):
return "TOO_MANY_OPEN_PARTITIONS"
raise e
Expand Down Expand Up @@ -1314,3 +1310,25 @@ def format_value_for_partition(self, value: Any, column_type: str) -> Tuple[str,
else:
# Raise an error for unsupported column types
raise ValueError(f"Unsupported column type: {column_type}")

@available
def run_optimize_with_partition_limit_catching(self, optimize_query: str) -> None:
while True:
try:
self._run_query(optimize_query, catch_partitions_limit=False)
break
except OperationalError as e:
if "ICEBERG_OPTIMIZE_MORE_RUNS_NEEDED" not in str(e):
raise e

def _run_query(self, sql: str, catch_partitions_limit: bool) -> AthenaCursor:
query = self.connections._add_query_comment(sql)
conn = self.connections.get_thread_connection()
cursor: AthenaCursor = conn.handle.cursor()
LOGGER.debug(f"Running Athena query:\n{query}")
try:
cursor.execute(query, catch_partitions_limit=catch_partitions_limit)
except OperationalError as e:
LOGGER.debug(f"CAUGHT EXCEPTION: {e}")
raise e
return cursor
15 changes: 15 additions & 0 deletions dbt/include/athena/macros/materializations/hooks.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro run_hooks(hooks, inside_transaction=True) %}
{% set re = modules.re %}
{% for hook in hooks %}
{% set rendered = render(hook.get('sql')) | trim %}
{% if (rendered | length) > 0 %}
{%- if re.match("optimize\W+\w+\W+rewrite data using bin_pack", rendered.lower(), re.MULTILINE) -%}
{%- do adapter.run_optimize_with_partition_limit_catching(rendered) -%}
{%- else -%}
{% call statement(auto_begin=inside_transaction) %}
{{ rendered }}
{% endcall %}
{%- endif -%}
{% endif %}
{% endfor %}
{% endmacro %}

0 comments on commit 78dee30

Please sign in to comment.