Skip to content

Commit

Permalink
Implement configurable debug_query_state parameter for AthenaCursor
Browse files Browse the repository at this point in the history
  • Loading branch information
svdimchenko committed Sep 4, 2023
1 parent fda4895 commit 7a91ec6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ A dbt profile can be configured to run against AWS Athena using the following co
| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` |
| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` |
| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` |
| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` |
| aws_access_key_id | Access key ID of the user performing requests. | Optional | `AKIAIOSFODNN7EXAMPLE` |
| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` |
| aws_profile_name | Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
Expand Down
7 changes: 5 additions & 2 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AthenaCredentials(Credentials):
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
poll_interval: float = 1.0
debug_query_state: bool = False
_ALIASES = {"catalog": "database"}
num_retries: Optional[int] = 5
s3_data_dir: Optional[str] = None
Expand Down Expand Up @@ -81,7 +82,7 @@ def _connection_keys(self) -> Tuple[str, ...]:
"endpoint_url",
"s3_data_dir",
"s3_data_naming",
"lf_tags",
"debug_query_state",
)


Expand Down Expand Up @@ -122,7 +123,8 @@ def __poll(self, query_id: str) -> AthenaQueryExecution:
]:
return query_execution
else:
logger.debug(f"Query state is: {query_execution.state}. Sleeping for {self._poll_interval}...")
if self.connection.cursor_kwargs.get("debug_query_state", False):
logger.debug(f"Query state is: {query_execution.state}. Sleeping for {self._poll_interval}...")
time.sleep(self._poll_interval)

def execute( # type: ignore
Expand Down Expand Up @@ -215,6 +217,7 @@ def open(cls, connection: Connection) -> Connection:
schema_name=creds.schema,
work_group=creds.work_group,
cursor_class=AthenaCursor,
cursor_kwargs={"debug_query_state": creds.debug_query_state},
formatter=AthenaParameterFormatter(),
poll_interval=creds.poll_interval,
session=get_boto3_session(connection),
Expand Down

0 comments on commit 7a91ec6

Please sign in to comment.