Skip to content

Commit

Permalink
Merge branch 'main' into feature/add_iceberg_sync_column_types_support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jrmyy authored Jun 9, 2023
2 parents 92c7faf + 12b4a0c commit bcc23be
Show file tree
Hide file tree
Showing 17 changed files with 213 additions and 153 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @jessedobbelaere @Jrmyy @mattiamatrix @nicor88 @thenaturalist
* @jessedobbelaere @Jrmyy @mattiamatrix @nicor88 @svdimchenko @thenaturalist
51 changes: 51 additions & 0 deletions .github/workflows/functional-tests-workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# reusable workflow to be called from the main workflow
name: functional-tests-workflow

on:
workflow_call:
inputs:
checkout-ref:
required: true
type: string
checkout-repository:
required: true
type: string
aws-region:
required: true
type: string

env:
DBT_TEST_ATHENA_DATABASE: awsdatacatalog
DBT_TEST_ATHENA_SCHEMA: dbt-tests
DBT_TEST_ATHENA_WORK_GROUP: athena-dbt-tests
DBT_TEST_ATHENA_THREADS: 16
DBT_TEST_ATHENA_POLL_INTERVAL: 0.5

jobs:
functional-tests:
name: Functional Tests
runs-on: ubuntu-latest
env:
DBT_TEST_ATHENA_S3_STAGING_DIR: s3://dbt-athena-query-results-${{ inputs.aws-region }}
DBT_TEST_ATHENA_REGION_NAME: ${{ inputs.aws-region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
ref: ${{ inputs.checkout-ref }}
repository: ${{ inputs.checkout-repository }}
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
make install_deps
- name: Configure AWS credentials from Test account
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.ASSUMABLE_ROLE_NAME }}
aws-region: ${{ inputs.aws-region }}
- name: Functional Test
run: |
pytest -n 8 tests/functional
84 changes: 21 additions & 63 deletions .github/workflows/functional-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,44 @@ name: functional-tests

on:
# we use pull_request_target to run the CI also for forks
pull_request_target: # Please read https://securitylab.github.com/research/github-actions-preventing-pwn-requests/ before using
pull_request_target:
types: [opened, reopened, synchronize, labeled]
push:
branches: [main]

env:
DBT_TEST_ATHENA_DATABASE: awsdatacatalog
DBT_TEST_ATHENA_SCHEMA: dbt-tests
DBT_TEST_ATHENA_WORK_GROUP: athena-dbt-tests
DBT_TEST_ATHENA_THREADS: 16
DBT_TEST_ATHENA_POLL_INTERVAL: 0.5

jobs:
# workflow that is invoked when for PRs with labels 'enable-functional-tests'
functional-tests-pr:
name: Functional Test - PR
# trigger on PRs with label 'enable-ci'
name: Functional Tests - PR
if: contains(github.event.pull_request.labels.*.name, 'enable-functional-tests')
runs-on: ubuntu-latest
uses: ./.github/workflows/functional-tests-workflow.yml
secrets: inherit
strategy:
matrix:
aws-region: ['us-east-1', 'eu-west-1', 'eu-west-2', 'eu-central-1']
aws-region: ['us-east-1', 'eu-central-1']
permissions:
id-token: write
contents: read
env:
DBT_TEST_ATHENA_S3_STAGING_DIR: s3://dbt-athena-query-results-${{ matrix.aws-region }}
DBT_TEST_ATHENA_REGION_NAME: ${{ matrix.aws-region }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
# this is needed to checkout the PR branch
ref: ${{ github.event.pull_request.head.ref }}
repository: ${{ github.event.pull_request.head.repo.full_name }}
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
make install_deps
- name: Configure AWS credentials from Test account
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.ASSUMABLE_ROLE_NAME }}
aws-region: ${{ matrix.aws-region }}
- name: Functional Test
run: |
pytest -n 8 tests/functional
with:
# this allows to pick the branch from the PR
checkout-ref: ${{ github.event.pull_request.head.ref }}
# this allows to work on fork
checkout-repository: ${{ github.event.pull_request.head.repo.full_name }}
aws-region: ${{ matrix.aws-region }}

# TODO: this is a workaround for now, we should use the same job for PR and main branch
# workflow that is invoked when a push to main happens
functional-tests-main:
name: Functional Test - main
# trigger push to main branch
name: Functional Tests - main
if: github.event_name == 'push' && github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
uses: ./.github/workflows/functional-tests-workflow.yml
secrets: inherit
strategy:
matrix:
aws-region: [ 'us-east-1', 'eu-west-1', 'eu-west-2', 'eu-central-1' ]
aws-region: ['us-east-1', 'eu-central-1']
permissions:
id-token: write
contents: read
env:
DBT_TEST_ATHENA_S3_STAGING_DIR: s3://dbt-athena-query-results-${{ matrix.aws-region }}
DBT_TEST_ATHENA_REGION_NAME: ${{ matrix.aws-region }}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
make install_deps
- name: Configure AWS credentials from Test account
uses: aws-actions/configure-aws-credentials@v2
with:
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.ASSUMABLE_ROLE_NAME }}
aws-region: ${{ matrix.aws-region }}
- name: Functional Test
run: |
pytest -n 8 tests/functional
with:
checkout-ref: ${{ github.ref }}
checkout-repository: ${{ github.repository }}
aws-region: ${{ matrix.aws-region }}
13 changes: 13 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,16 @@ repos:
- 'Flake8-pyproject~=1.1'
args:
- '.'
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.3.0
hooks:
- id: mypy
args:
- --strict
- --ignore-missing-imports
- --install-types
- --allow-subclassing-any
- --allow-untyped-decorators
additional_dependencies:
- types-setuptools==67.8.0.0
exclude: ^tests/
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<a href="https://pypi.org/project/dbt-athena-community/"><img src="https://badge.fury.io/py/dbt-athena-community.svg" /></a>
<a href="https://pycqa.github.io/isort/"><img src="https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336" /></a>
<a href="https://github.com/psf/black"><img src="https://img.shields.io/badge/code%20style-black-000000.svg" /></a>
<a href="https://github.com/python/mypy"><img src="https://www.mypy-lang.org/static/mypy_badge.svg" /></a>
<a href="https://pepy.tech/project/dbt-athena-community"><img src="https://pepy.tech/badge/dbt-athena-community/month" /></a>
</p>

Expand Down
10 changes: 4 additions & 6 deletions dbt/adapters/athena/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,17 @@ def timestamp_type(self) -> str:
def string_size(self) -> int:
if not self.is_string():
raise DbtRuntimeError("Called string_size() on non-string field!")
if not self.char_size:
# Handle error: '>' not supported between instances of 'NoneType' and 'NoneType' for union relations macro
return 0
return self.char_size
# Handle error: '>' not supported between instances of 'NoneType' and 'NoneType' for union relations macro
return self.char_size or 0

@property
def data_type(self) -> str:
if self.is_string():
return self.string_type(self.string_size())
elif self.is_numeric():
return self.numeric_type(self.dtype, self.numeric_precision, self.numeric_scale)
return self.numeric_type(self.dtype, self.numeric_precision, self.numeric_scale) # type: ignore
elif self.is_binary():
return self.binary_type()
elif self.is_timestamp():
return self.timestamp_type()
return self.dtype
return self.dtype # type: ignore
28 changes: 14 additions & 14 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from copy import deepcopy
from dataclasses import dataclass
from decimal import Decimal
from typing import Any, ContextManager, Dict, List, Optional, Tuple, Union
from typing import Any, ContextManager, Dict, List, Optional, Tuple

import tenacity
from pyathena.connection import Connection as AthenaConnection
Expand Down Expand Up @@ -56,7 +56,7 @@ def type(self) -> str:
return "athena"

@property
def unique_field(self):
def unique_field(self) -> str:
return f"athena-{hashlib.md5(self.s3_staging_dir.encode()).hexdigest()}"

def _connection_keys(self) -> Tuple[str, ...]:
Expand All @@ -78,7 +78,7 @@ def _connection_keys(self) -> Tuple[str, ...]:


class AthenaCursor(Cursor):
def __init__(self, **kwargs):
def __init__(self, **kwargs) -> None: # type: ignore
super().__init__(**kwargs)
self._executor = ThreadPoolExecutor()

Expand All @@ -92,7 +92,7 @@ def _collect_result_set(self, query_id: str) -> AthenaResultSet:
retry_config=self._retry_config,
)

def execute(
def execute( # type: ignore
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
Expand All @@ -103,7 +103,7 @@ def execute(
cache_expiration_time: int = 0,
**kwargs,
):
def inner():
def inner() -> AthenaCursor:
query_id = self._execute(
operation,
parameters=parameters,
Expand Down Expand Up @@ -143,7 +143,7 @@ class AthenaConnectionManager(SQLConnectionManager):
TYPE = "athena"

@classmethod
def data_type_code_to_name(cls, type_code: Union[int, str]) -> str:
def data_type_code_to_name(cls, type_code: str) -> str:
"""
Get the string representation of the data type from the Athena metadata. Dbt performs a
query to retrieve the types of the columns in the SQL query. Then these types are compared
Expand All @@ -152,8 +152,8 @@ def data_type_code_to_name(cls, type_code: Union[int, str]) -> str:
"""
return type_code.split("(")[0].split("<")[0].upper()

@contextmanager
def exception_handler(self, sql: str) -> ContextManager:
@contextmanager # type: ignore
def exception_handler(self, sql: str) -> ContextManager: # type: ignore
try:
yield
except Exception as e:
Expand Down Expand Up @@ -201,23 +201,23 @@ def open(cls, connection: Connection) -> Connection:
return connection

@classmethod
def get_response(cls, cursor) -> AdapterResponse:
def get_response(cls, cursor: AthenaCursor) -> AdapterResponse:
code = "OK" if cursor.state == AthenaQueryExecution.STATE_SUCCEEDED else "ERROR"
return AdapterResponse(_message=f"{code} {cursor.rowcount}", rows_affected=cursor.rowcount, code=code)

def cancel(self, connection: Connection):
def cancel(self, connection: Connection) -> None:
connection.handle.cancel()

def add_begin_query(self):
def add_begin_query(self) -> None:
pass

def add_commit_query(self):
def add_commit_query(self) -> None:
pass

def begin(self):
def begin(self) -> None:
pass

def commit(self):
def commit(self) -> None:
pass


Expand Down
Loading

0 comments on commit bcc23be

Please sign in to comment.