From 6af06336612b2e513e4d1dae5b2e76814f81fa57 Mon Sep 17 00:00:00 2001 From: bossenti Date: Fri, 26 May 2023 10:48:48 +0200 Subject: [PATCH 01/13] chore(deps): remove internal method reference & soften PyAthena version requirements Co-authored-by: dnks23 --- metadata-ingestion/setup.py | 5 +++-- .../src/datahub/ingestion/source/sql/athena.py | 4 +--- metadata-ingestion/tests/unit/test_athena_source.py | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 6f5f5080299d6..4b907fd0c8145 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -270,8 +270,9 @@ def get_long_description(): }, "great-expectations": sql_common | sqllineage_lib, # Source plugins - # PyAthena is pinned with exact version because we use private method in PyAthena - "athena": sql_common | {"PyAthena[SQLAlchemy]==2.4.1"}, + # sqlalchemy-bigquery is included here since it provides an implementation of + # a SQLalchemy-conform STRUCT type definition + "athena": sql_common | {"PyAthena[SQLAlchemy]>=2.6.0,<3.0.0", "sqlalchemy-bigquery>=1.4.1"}, "azure-ad": set(), "bigquery": sql_common | bigquery_common diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 46f9fd240db04..af65e32472e0f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -139,9 +139,7 @@ def get_table_properties( self.cursor = cast(BaseCursor, inspector.engine.raw_connection().cursor()) assert self.cursor - # Unfortunately properties can be only get through private methods as those are not exposed - # https://github.com/laughingman7743/PyAthena/blob/9e42752b0cc7145a87c3a743bb2634fe125adfa7/pyathena/model.py#L201 - metadata: AthenaTableMetadata = self.cursor._get_table_metadata( + metadata: AthenaTableMetadata = self.cursor.get_table_metadata( table_name=table, schema_name=schema ) description = metadata.comment diff --git a/metadata-ingestion/tests/unit/test_athena_source.py b/metadata-ingestion/tests/unit/test_athena_source.py index 2558f6a46715e..cffc0fe7e4056 100644 --- a/metadata-ingestion/tests/unit/test_athena_source.py +++ b/metadata-ingestion/tests/unit/test_athena_source.py @@ -104,7 +104,7 @@ def test_athena_get_table_properties(): mock_cursor = mock.MagicMock() mock_inspector = mock.MagicMock() mock_inspector.engine.raw_connection().cursor.return_value = mock_cursor - mock_cursor._get_table_metadata.return_value = AthenaTableMetadata( + mock_cursor.get_table_metadata.return_value = AthenaTableMetadata( response=table_metadata ) From cf595d1bb668932eaa3007877acb8b1be76ac536 Mon Sep 17 00:00:00 2001 From: bossenti Date: Fri, 26 May 2023 10:53:06 +0200 Subject: [PATCH 02/13] feature(ingest/athena): implement custom SQLalchemy dialect for Athena to detect complex data types correctly Co-authored-by: dnks23 --- .../datahub/ingestion/source/sql/athena.py | 148 ++++++++++++++++++ .../ingestion/source/sql/sql_common.py | 6 + .../datahub/ingestion/source/sql/sql_types.py | 11 +- .../tests/unit/test_athena_source.py | 84 +++++++++- 4 files changed, 246 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index af65e32472e0f..ae9319121a953 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -1,12 +1,17 @@ import json import logging +import re import typing from typing import Any, Dict, Iterable, List, Optional, Tuple, cast import pydantic from pyathena.common import BaseCursor from pyathena.model import AthenaTableMetadata +from pyathena.sqlalchemy_athena import AthenaRestDialect +from sqlalchemy import create_engine, inspect, types from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.types import TypeEngine +from sqlalchemy_bigquery import STRUCT from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.emitter.mcp_builder import PlatformKey @@ -26,11 +31,154 @@ SQLAlchemyConfig, make_sqlalchemy_uri, ) +from datahub.ingestion.source.sql.sql_types import MapType from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, gen_database_key, ) +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column +from datahub.utilities.sqlalchemy_type_converter import ( + get_schema_fields_for_sqlalchemy_column, +) + +logger = logging.getLogger(__name__) + + +class CustomAthenaRestDialect(AthenaRestDialect): + """Custom definition of the Athena dialect. + + Custom implementation that allows to extend/modify the behavior of the SQLalchemy + dialect that is used by PyAthena (which is the library that is used by DataHub + to extract metadata from Athena). + This dialect can then be used by the inspector (see get_inspectors()). + + """ + + # regex to identify complex types in DDL strings which are embedded in `<>`. + _complex_type_pattern = re.compile(r"(<.+>)") + + @typing.no_type_check + def _get_column_type(self, type_: str | dict) -> TypeEngine: # noqa: C901 + """Derives the data type of the Athena column. + + This method is overwritten to extend the behavior of PyAthena. + Pyathena is not capable of detecting complex data types, e.g., + arrays, maps, or, structs (as of version 2.25.2). + The custom implementation extends the functionality by the above-mentioned data types. + """ + + # Originally, this method only handles `type_` as a string + # With the workaround used below to parse DDL strings for structs, + # `type` might also be a dictionary + if isinstance(type_, str): + match = self._pattern_column_type.match(type_) + if match: + type_name = match.group(1).lower() + type_meta_information = match.group(2) + else: + type_name = type_.lower() + type_meta_information = None + elif isinstance(type_, dict): + # this occurs only when a type parsed as part of a STRUCT is passed + # in such case type_ is a dictionary whose type can be retrieved from the attribute + type_name = type_.get("type", None) + type_meta_information = None + else: + raise RuntimeError(f"Unsupported type definition: {type_}") + + args = [] + + if type_name in ["array"]: + detected_col_type = types.ARRAY + + # here we need to account again for two options how `type_` is passed to this method + # first, the simple array definition as a DDL string (something like array) + # this is always the case when the array is not part of a complex data type (mainly STRUCT) + # second, the array definition can also be passed in form of dictionary + # this is the case when the array is part of a complex data type + if isinstance(type_, str): + # retrieve the raw name of the data type as a string + array_type_raw = self._complex_type_pattern.findall(type_)[0][ + 1:-1 + ] # array type without enclosing <> + # convert the string name of the data type into a SQLalchemy type (expected return) + array_type = self._get_column_type(array_type_raw) + elif isinstance(type_, dict): + # retrieve the data type of the array items and + # transform it into a SQLalchemy type + array_type = self._get_column_type(type_["items"]) + else: + raise RuntimeError(f"Unsupported array definition: {type_}") + + args = [array_type] + + elif type_name in ["struct", "record"]: + # STRUCT is not part of the SQLalchemy types selection + # but is provided by another official SQLalchemy library and + # compatible with the other SQLalchemy types + detected_col_type = STRUCT + + if isinstance(type_, dict): + # in case a struct as part of another struct is passed + # it is provided in form of a dictionary and + # can simply be used for the further processing + struct_type = type_ + else: + # this is the case when the type definition of the struct is passed as a DDL string + # therefore, it is required to parse the DDL string + # here a method provided in another Datahub source is used so that the parsing + # doesn't need to be implemented twice + # `get_avro_schema_for_hive_column` accepts a DDL description as column type and + # returns the parsed data types in form of a dictionary + schema = get_avro_schema_for_hive_column( + hive_column_name=type_name, hive_column_type=type_ + ) + + # the actual type description needs to be extracted + struct_type = schema["fields"][0]["type"] + + # A STRUCT consist of multiple attributes which are expected to be passed as + # a list of tuples consisting of name data type pairs. e.g., `('age', Integer())` + # See the reference: + # https://github.com/googleapis/python-bigquery-sqlalchemy/blob/main/sqlalchemy_bigquery/_struct.py#L53 + # + # To extract all of them, we simply iterate over all detected fields and + # convert them to SQLalchemy types + struct_args = [] + for field in struct_type["fields"]: + struct_args.append( + ( + field["name"], + self._get_column_type(field["type"]["type"]) + if field["type"]["type"] not in ["record", "array"] + else self._get_column_type(field["type"]), + ) + ) + + args = struct_args + + elif type_name in ["map"]: + # Instead of SQLalchemy's TupleType the custom MapType is used here + # which is just a simple wrapper around TupleType + detected_col_type = MapType + + # the type definition for maps looks like the following: key_type:val_type (e.g., string:string) + key_type_raw, value_type_raw = type_meta_information.split(",") + + # convert both type names to actual SQLalchemy types + args = [ + self._get_column_type(key_type_raw), + self._get_column_type(value_type_raw), + ] + # by using get_avro_schema_for_hive_column() for parsing STRUCTs the data type `long` + # can also be returned, so we need to extend the handling here as well + elif type_name in ["bigint", "long"]: + detected_col_type = types.BIGINT + else: + return super()._get_column_type(type_name) + return detected_col_type(*args) class AthenaConfig(SQLAlchemyConfig): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 288b4bf1e78d0..96f6476931ae0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -23,6 +23,7 @@ from sqlalchemy.exc import ProgrammingError from sqlalchemy.sql import sqltypes as types from sqlalchemy.types import TypeDecorator, TypeEngine +from sqlalchemy_bigquery import STRUCT from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -39,6 +40,7 @@ DatasetSubTypes, ) from datahub.ingestion.source.sql.sql_config import SQLAlchemyConfig +from datahub.ingestion.source.sql.sql_types import MapType from datahub.ingestion.source.sql.sql_utils import ( add_table_to_schema_container, gen_database_container, @@ -80,6 +82,7 @@ DatasetLineageTypeClass, DatasetPropertiesClass, GlobalTagsClass, + MapTypeClass, SubTypesClass, TagAssociationClass, UpstreamClass, @@ -200,6 +203,9 @@ class SqlWorkUnit(MetadataWorkUnit): types.DATETIME: TimeTypeClass, types.TIMESTAMP: TimeTypeClass, types.JSON: RecordTypeClass, + # additional type definitions that are used by the Athena source + MapType: MapTypeClass, # type: ignore + STRUCT: RecordTypeClass, # Because the postgresql dialect is used internally by many other dialects, # we add some postgres types here. This is ok to do because the postgresql # dialect is built-in to sqlalchemy. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py index 66c51cfe87ed1..c43bc32795c30 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py @@ -1,13 +1,15 @@ import re from typing import Any, Dict, ValuesView +from sqlalchemy import types + from datahub.metadata.com.linkedin.pegasus2avro.schema import ( ArrayType, BooleanType, BytesType, DateType, EnumType, - MapType, + MapType as MapTypeAvro, NullType, NumberType, RecordType, @@ -349,7 +351,7 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "time": TimeType, "timestamp": TimeType, "row": RecordType, - "map": MapType, + "map": MapTypeAvro, "array": ArrayType, } @@ -392,3 +394,8 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "geography": None, "uuid": StringType, } + + +class MapType(types.TupleType): + # Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub + pass diff --git a/metadata-ingestion/tests/unit/test_athena_source.py b/metadata-ingestion/tests/unit/test_athena_source.py index cffc0fe7e4056..9aee079186f8b 100644 --- a/metadata-ingestion/tests/unit/test_athena_source.py +++ b/metadata-ingestion/tests/unit/test_athena_source.py @@ -3,9 +3,13 @@ import pytest from freezegun import freeze_time +from sqlalchemy import types +from sqlalchemy_bigquery import STRUCT from datahub.ingestion.api.common import PipelineContext -from src.datahub.ingestion.source.aws.s3_util import make_s3_urn +from datahub.ingestion.source.aws.s3_util import make_s3_urn +from datahub.ingestion.source.sql.athena import CustomAthenaRestDialect +from datahub.ingestion.source.sql.sql_types import MapType FROZEN_TIME = "2020-04-14 07:00:00" @@ -126,3 +130,81 @@ def test_athena_get_table_properties(): } assert location == make_s3_urn("s3://testLocation", "PROD") + + +def test_get_column_type_simple_types(): + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="int"), types.Integer + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="string"), types.String + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="boolean"), types.BOOLEAN + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="long"), types.BIGINT + ) + assert isinstance( + CustomAthenaRestDialect()._get_column_type(type_="double"), types.FLOAT + ) + + +def test_get_column_type_array(): + result = CustomAthenaRestDialect()._get_column_type(type_="array") + + assert isinstance(result, types.ARRAY) + assert isinstance(result.item_type, types.String) + + +def test_get_column_type_map(): + result = CustomAthenaRestDialect()._get_column_type(type_="map") + + assert isinstance(result, MapType) + assert isinstance(result.types[0], types.String) + assert isinstance(result.types[1], types.Integer) + + +def test_column_type_struct(): + + result = CustomAthenaRestDialect()._get_column_type(type_="struct") + + assert isinstance(result, STRUCT) + assert isinstance(result._STRUCT_fields[0], tuple) + assert result._STRUCT_fields[0][0] == "test" + assert isinstance(result._STRUCT_fields[0][1], types.String) + + +def test_column_type_complex_combination(): + + result = CustomAthenaRestDialect()._get_column_type( + type_="struct>>" + ) + + assert isinstance(result, STRUCT) + + assert isinstance(result._STRUCT_fields[0], tuple) + assert result._STRUCT_fields[0][0] == "id" + assert isinstance(result._STRUCT_fields[0][1], types.String) + + assert isinstance(result._STRUCT_fields[1], tuple) + assert result._STRUCT_fields[1][0] == "name" + assert isinstance(result._STRUCT_fields[1][1], types.String) + + assert isinstance(result._STRUCT_fields[2], tuple) + assert result._STRUCT_fields[2][0] == "choices" + assert isinstance(result._STRUCT_fields[2][1], types.ARRAY) + + assert isinstance(result._STRUCT_fields[2][1].item_type, STRUCT) + + assert isinstance(result._STRUCT_fields[2][1].item_type._STRUCT_fields[0], tuple) + assert result._STRUCT_fields[2][1].item_type._STRUCT_fields[0][0] == "id" + assert isinstance( + result._STRUCT_fields[2][1].item_type._STRUCT_fields[0][1], types.String + ) + + assert isinstance(result._STRUCT_fields[2][1].item_type._STRUCT_fields[1], tuple) + assert result._STRUCT_fields[2][1].item_type._STRUCT_fields[1][0] == "label" + assert isinstance( + result._STRUCT_fields[2][1].item_type._STRUCT_fields[1][1], types.String + ) From 9c0b6aba43db23f658b90220bc2d8b1a00a1e322 Mon Sep 17 00:00:00 2001 From: bossenti Date: Fri, 26 May 2023 10:53:52 +0200 Subject: [PATCH 03/13] feature(ingest/athena): implement schema field generation supporting nested fields for SQLalchemy types and enable Athena source Co-authored-by: dnks23 --- .../datahub/ingestion/source/sql/athena.py | 36 ++++ .../utilities/sqlalchemy_type_converter.py | 201 ++++++++++++++++++ .../test_sqlalchemy_type_converter.py | 93 ++++++++ 3 files changed, 330 insertions(+) create mode 100644 metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py create mode 100644 metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index ae9319121a953..b6c4ce5851b5d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -280,6 +280,18 @@ def create(cls, config_dict, ctx): config = AthenaConfig.parse_obj(config_dict) return cls(config, ctx) + # overwrite this method to allow to specify the usage of a custom dialect + def get_inspectors(self) -> Iterable[Inspector]: + url = self.config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") + engine = create_engine(url, **self.config.options) + + # set custom dialect to be used by the inspector + engine.dialect = CustomAthenaRestDialect() + with engine.connect() as conn: + inspector = inspect(conn) + yield inspector + def get_table_properties( self, inspector: Inspector, schema: str, table: str ) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: @@ -393,6 +405,30 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: return [schema for schema in schemas if schema == athena_config.database] return schemas + # Overwrite to modify the creation of schema fields + def get_schema_fields_for_column( + self, + dataset_name: str, + column: dict, + pk_constraints: Optional[dict] = None, + tags: Optional[List[str]] = None, + ) -> List[SchemaField]: + fields = get_schema_fields_for_sqlalchemy_column( + column_name=column["name"], + column_type=column["type"], + description=column.get("comment", None), + nullable=column.get("nullable", True), + is_part_of_key=True + if ( + pk_constraints is not None + and isinstance(pk_constraints, dict) + and column["name"] in pk_constraints.get("constrained_columns", []) + ) + else False, + ) + + return fields + def close(self): if self.cursor: self.cursor.close() diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py new file mode 100644 index 0000000000000..4142988cd859a --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -0,0 +1,201 @@ +import json +import logging +import uuid +from typing import Union + +from sqlalchemy import types +from sqlalchemy_bigquery import STRUCT + +from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields +from datahub.ingestion.source.sql.sql_types import MapType +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.metadata.schema_classes import NullTypeClass, SchemaFieldDataTypeClass + +logger = logging.getLogger(__name__) + + +class SqlAlchemyColumnToAvroConverter: + """Helper class that collects some methods to convert SQLalchemy columns to Avro schema.""" + + # tuple of complex data types that require a special handling + _COMPLEX_TYPES = (STRUCT, types.ARRAY, MapType) + + # mapping of primitive SQLalchemy data types to AVRO schema data types + PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE: dict[type[types.TypeEngine], str] = { + types.String: "string", + types.BINARY: "string", + types.BOOLEAN: "boolean", + types.FLOAT: "float", + types.INTEGER: "int", + types.BIGINT: "long", + types.VARCHAR: "string", + types.CHAR: "string", + } + + @classmethod + def get_avro_type( + cls, column_type: Union[types.TypeEngine, STRUCT, MapType], nullable: bool + ) -> dict[str, object]: + """Determines the concrete AVRO schema type for a SQLalchemy-typed column""" + + if type(column_type) in cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys(): + return { + "type": cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE[type(column_type)], + "native_data_type": str(column_type), + "_nullable": nullable, + } + if type(column_type) == types.DECIMAL: + return { + "type": "bytes", + "logicalType": "decimal", + "precision": int(column_type.precision), + "scale": int(column_type.scale), + "native_data_type": str(column_type), + "_nullable": nullable, + } + if type(column_type) == types.DATE: + return { + "type": "int", + "logicalType": "date", + "native_data_type": str(column_type), + "_nullable": nullable, + } + if type(column_type) == types.TIMESTAMP: + return { + "type": "long", + "logicalType": "timestamp-millis", + "native_data_type": str(column_type), + "_nullable": nullable, + } + if isinstance(column_type, types.ARRAY): + array_type = column_type.item_type + return { + "type": "array", + "items": cls.get_avro_type(column_type=array_type, nullable=nullable), + "native_data_type": f"array<{str(column_type.item_type)}>", + } + if isinstance(column_type, MapType): + key_type = column_type.types[0] + value_type = column_type.types[1] + return { + "type": "map", + "values": cls.get_avro_type(column_type=value_type, nullable=nullable), + "native_data_type": str(column_type), + "key_type": cls.get_avro_type(column_type=key_type, nullable=nullable), + "key_native_data_type": str(key_type), + } + if isinstance(column_type, STRUCT): + + fields = [] + for field_def in column_type._STRUCT_fields: + field_name, field_type = field_def + fields.append( + { + "name": field_name, + "type": cls.get_avro_type( + column_type=field_type, nullable=nullable + ), + } + ) + struct_name = f"__struct_{str(uuid.uuid4()).replace('-', '')}" + + return { + "type": "record", + "name": struct_name, + "fields": fields, + "native_data_type": str(column_type), + "_nullable": nullable, + } + + return { + "type": "null", + "native_data_type": str(column_type), + "_nullable": nullable, + } + + @classmethod + def get_avro_for_sqlalchemy_column( + cls, + column_name: str, + column_type: types.TypeEngine, + nullable: bool, + ) -> object | dict[str, object]: + """Returns the AVRO schema representation of a SQLalchemy column.""" + if isinstance(column_type, cls._COMPLEX_TYPES): + return { + "type": "record", + "name": "__struct_", + "fields": [ + { + "name": column_name, + "type": cls.get_avro_type( + column_type=column_type, nullable=nullable + ), + } + ], + } + return cls.get_avro_type(column_type=column_type, nullable=nullable) + + +def get_schema_fields_for_sqlalchemy_column( + column_name: str, + column_type: types.TypeEngine, + description: str | None = None, + nullable: bool | None = True, + is_part_of_key: bool | None = False, +) -> list[SchemaField]: + """Creates SchemaFields from a given SQLalchemy column. + + This function is analogous to `get_schema_fields_for_hive_column` from datahub.utilities.hive_schema_to_avro. + The main purpose of implementing it this way, is to make it ready/compatible for second field path generation, + which allows to explore nested structures within the UI. + """ + + if nullable is None: + nullable = True + + try: + # as a first step, the column is converted to AVRO JSON which can then be used by an existing function + avro_schema_json = ( + SqlAlchemyColumnToAvroConverter.get_avro_for_sqlalchemy_column( + column_name=column_name, + column_type=column_type, + nullable=nullable, + ) + ) + # retrieve schema field definitions from the above generated AVRO JSON structure + schema_fields = avro_schema_to_mce_fields( + avro_schema_string=json.dumps(avro_schema_json), + default_nullable=nullable, + swallow_exceptions=False, + ) + except Exception as e: + logger.warning( + f"Unable to parse column {column_name} and type {column_type} the error was: {e}" + ) + + # fallback description in case any exception occurred + schema_fields = [ + SchemaField( + fieldPath=column_name, + type=SchemaFieldDataTypeClass(type=NullTypeClass()), + nativeDataType=str(column_type), + ) + ] + + # for all non-nested data types an additional modification of the `fieldPath` property is required + if type(column_type) in ( + *SqlAlchemyColumnToAvroConverter.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys(), + types.TIMESTAMP, + types.DATE, + types.DECIMAL, + ): + schema_fields[0].fieldPath += f".{column_name}" + + if description: + schema_fields[0].description = description + schema_fields[0].isPartOfKey = ( + is_part_of_key if is_part_of_key is not None else False + ) + + return schema_fields diff --git a/metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py b/metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py new file mode 100644 index 0000000000000..959da0987a825 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_sqlalchemy_type_converter.py @@ -0,0 +1,93 @@ +from typing import no_type_check + +from sqlalchemy import types +from sqlalchemy_bigquery import STRUCT + +from datahub.ingestion.source.sql.sql_types import MapType +from datahub.metadata.schema_classes import ( + ArrayTypeClass, + MapTypeClass, + NullTypeClass, + NumberTypeClass, + RecordTypeClass, +) +from datahub.utilities.sqlalchemy_type_converter import ( + get_schema_fields_for_sqlalchemy_column, +) + + +def test_get_avro_schema_for_sqlalchemy_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=types.INTEGER() + ) + assert len(schema_fields) == 1 + assert schema_fields[0].fieldPath == "[version=2.0].[type=int].test" + assert schema_fields[0].type.type == NumberTypeClass() + assert schema_fields[0].nativeDataType == "INTEGER" + assert schema_fields[0].nullable is True + + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=types.String(), nullable=False + ) + assert len(schema_fields) == 1 + assert schema_fields[0].fieldPath == "[version=2.0].[type=string].test" + assert schema_fields[0].type.type == NumberTypeClass() + assert schema_fields[0].nativeDataType == "VARCHAR" + assert schema_fields[0].nullable is False + + +def test_get_avro_schema_for_sqlalchemy_array_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=types.ARRAY(types.FLOAT()) + ) + assert len(schema_fields) == 1 + assert ( + schema_fields[0].fieldPath + == "[version=2.0].[type=struct].[type=array].[type=float].test" + ) + assert schema_fields[0].type.type == ArrayTypeClass(nestedType=["float"]) + assert schema_fields[0].nativeDataType == "array" + + +def test_get_avro_schema_for_sqlalchemy_map_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=MapType(types.String(), types.BOOLEAN()) + ) + assert len(schema_fields) == 1 + assert ( + schema_fields[0].fieldPath + == "[version=2.0].[type=struct].[type=map].[type=boolean].test" + ) + assert schema_fields[0].type.type == MapTypeClass( + keyType="string", valueType="boolean" + ) + assert schema_fields[0].nativeDataType == "MapType(String(), BOOLEAN())" + + +def test_get_avro_schema_for_sqlalchemy_struct_column() -> None: + + schema_fields = get_schema_fields_for_sqlalchemy_column( + column_name="test", column_type=STRUCT(("test", types.INTEGER())) + ) + assert len(schema_fields) == 2 + assert ( + schema_fields[0].fieldPath == "[version=2.0].[type=struct].[type=struct].test" + ) + assert schema_fields[0].type.type == RecordTypeClass() + assert schema_fields[0].nativeDataType == "STRUCT" + + assert ( + schema_fields[1].fieldPath + == "[version=2.0].[type=struct].[type=struct].test.[type=int].test" + ) + assert schema_fields[1].type.type == NumberTypeClass() + assert schema_fields[1].nativeDataType == "INTEGER" + + +@no_type_check +def test_get_avro_schema_for_sqlalchemy_unknown_column(): + schema_fields = get_schema_fields_for_sqlalchemy_column("invalid", "test") + assert len(schema_fields) == 1 + assert schema_fields[0].type.type == NullTypeClass() + assert schema_fields[0].fieldPath == "[version=2.0].[type=null]" + assert schema_fields[0].nativeDataType == "test" From c7f0bc75c409ac56d6047bdae57b79373b5aad9b Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 May 2023 18:48:01 +0200 Subject: [PATCH 04/13] chore: make MapType discoverable --- .../src/datahub/ingestion/source/sql/sql_types.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py index 2710a1dd6b67a..cb36fa0a6f9aa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_types.py @@ -362,6 +362,12 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "array": ArrayType, } + +class MapType(types.TupleType): + # Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub + pass + + # https://docs.aws.amazon.com/athena/latest/ug/data-types.html # https://github.com/dbt-athena/dbt-athena/tree/main ATHENA_SQL_TYPES_MAP: Dict[str, Any] = { @@ -423,8 +429,3 @@ def resolve_vertica_modified_type(type_string: str) -> Any: "geography": None, "uuid": StringType, } - - -class MapType(types.TupleType): - # Wrapper class around SQLalchemy's TupleType to increase compatibility with DataHub - pass From f22aedd8c3c055a787c11ad43c9d13c94d3850be Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 May 2023 19:35:53 +0200 Subject: [PATCH 05/13] backport to python 3.7 --- .../src/datahub/ingestion/source/sql/athena.py | 6 +++--- .../datahub/utilities/sqlalchemy_type_converter.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index b6c4ce5851b5d..7db2a3af17608 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -2,7 +2,7 @@ import logging import re import typing -from typing import Any, Dict, Iterable, List, Optional, Tuple, cast +from typing import Any, Dict, Iterable, List, Optional, Tuple, cast, Union import pydantic from pyathena.common import BaseCursor @@ -60,7 +60,7 @@ class CustomAthenaRestDialect(AthenaRestDialect): _complex_type_pattern = re.compile(r"(<.+>)") @typing.no_type_check - def _get_column_type(self, type_: str | dict) -> TypeEngine: # noqa: C901 + def _get_column_type(self, type_: Union[str, Dict[str, Any]]) -> TypeEngine: # noqa: C901 """Derives the data type of the Athena column. This method is overwritten to extend the behavior of PyAthena. @@ -409,7 +409,7 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: def get_schema_fields_for_column( self, dataset_name: str, - column: dict, + column: Dict, pk_constraints: Optional[dict] = None, tags: Optional[List[str]] = None, ) -> List[SchemaField]: diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index 4142988cd859a..b76e545f0b799 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -1,7 +1,7 @@ import json import logging import uuid -from typing import Union +from typing import Union, Dict, Type, Any, Optional from sqlalchemy import types from sqlalchemy_bigquery import STRUCT @@ -21,7 +21,7 @@ class SqlAlchemyColumnToAvroConverter: _COMPLEX_TYPES = (STRUCT, types.ARRAY, MapType) # mapping of primitive SQLalchemy data types to AVRO schema data types - PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE: dict[type[types.TypeEngine], str] = { + PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE: Dict[Type[types.TypeEngine], str] = { types.String: "string", types.BINARY: "string", types.BOOLEAN: "boolean", @@ -35,7 +35,7 @@ class SqlAlchemyColumnToAvroConverter: @classmethod def get_avro_type( cls, column_type: Union[types.TypeEngine, STRUCT, MapType], nullable: bool - ) -> dict[str, object]: + ) -> Dict[str, Any]: """Determines the concrete AVRO schema type for a SQLalchemy-typed column""" if type(column_type) in cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys(): @@ -119,7 +119,7 @@ def get_avro_for_sqlalchemy_column( column_name: str, column_type: types.TypeEngine, nullable: bool, - ) -> object | dict[str, object]: + ) -> Union[object, Dict[str, object]]: """Returns the AVRO schema representation of a SQLalchemy column.""" if isinstance(column_type, cls._COMPLEX_TYPES): return { @@ -140,8 +140,8 @@ def get_avro_for_sqlalchemy_column( def get_schema_fields_for_sqlalchemy_column( column_name: str, column_type: types.TypeEngine, - description: str | None = None, - nullable: bool | None = True, + description: Optional[str] = None, + nullable: Optional[bool] = True, is_part_of_key: bool | None = False, ) -> list[SchemaField]: """Creates SchemaFields from a given SQLalchemy column. From 5aaf3489e8f9ee26792d8c31ae8750a4a1ae5f0a Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 May 2023 19:58:08 +0200 Subject: [PATCH 06/13] fix formatting --- metadata-ingestion/src/datahub/ingestion/source/sql/athena.py | 4 +++- .../src/datahub/utilities/sqlalchemy_type_converter.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 7db2a3af17608..f1e8c8ab2282d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -60,7 +60,9 @@ class CustomAthenaRestDialect(AthenaRestDialect): _complex_type_pattern = re.compile(r"(<.+>)") @typing.no_type_check - def _get_column_type(self, type_: Union[str, Dict[str, Any]]) -> TypeEngine: # noqa: C901 + def _get_column_type( + self, type_: Union[str, Dict[str, Any]] + ) -> TypeEngine: # noqa: C901 """Derives the data type of the Athena column. This method is overwritten to extend the behavior of PyAthena. diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index b76e545f0b799..74c1b657973b1 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -119,7 +119,7 @@ def get_avro_for_sqlalchemy_column( column_name: str, column_type: types.TypeEngine, nullable: bool, - ) -> Union[object, Dict[str, object]]: + ) -> Union[object, Dict[str, object]]: """Returns the AVRO schema representation of a SQLalchemy column.""" if isinstance(column_type, cls._COMPLEX_TYPES): return { From 7dfb376e180e478b3a9217969747626421cc7703 Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 May 2023 21:13:05 +0200 Subject: [PATCH 07/13] improve linting --- metadata-ingestion/src/datahub/ingestion/source/sql/athena.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index f1e8c8ab2282d..6ad522c4a800f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -2,7 +2,7 @@ import logging import re import typing -from typing import Any, Dict, Iterable, List, Optional, Tuple, cast, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast import pydantic from pyathena.common import BaseCursor From c6f2ba065fe8cac6a7e9746368492fc136e2f6c1 Mon Sep 17 00:00:00 2001 From: bossenti Date: Wed, 31 May 2023 11:29:52 +0200 Subject: [PATCH 08/13] fix import order --- .../src/datahub/utilities/sqlalchemy_type_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index 74c1b657973b1..a0b0b07aa4f03 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -1,7 +1,7 @@ import json import logging import uuid -from typing import Union, Dict, Type, Any, Optional +from typing import Any, Dict, Optional, Type, Union from sqlalchemy import types from sqlalchemy_bigquery import STRUCT From c2dc330ccb4b3dab0168c34ee332fc6b6c1619bd Mon Sep 17 00:00:00 2001 From: bossenti Date: Fri, 22 Sep 2023 09:07:13 +0200 Subject: [PATCH 09/13] change type hints to be 3.7-compatible --- .../src/datahub/utilities/sqlalchemy_type_converter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index a0b0b07aa4f03..0ec3ece6c7347 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -1,7 +1,7 @@ import json import logging import uuid -from typing import Any, Dict, Optional, Type, Union +from typing import Any, Dict, Optional, Type, Union, List from sqlalchemy import types from sqlalchemy_bigquery import STRUCT @@ -142,8 +142,8 @@ def get_schema_fields_for_sqlalchemy_column( column_type: types.TypeEngine, description: Optional[str] = None, nullable: Optional[bool] = True, - is_part_of_key: bool | None = False, -) -> list[SchemaField]: + is_part_of_key: Optional[bool] = False, +) -> List[SchemaField]: """Creates SchemaFields from a given SQLalchemy column. This function is analogous to `get_schema_fields_for_hive_column` from datahub.utilities.hive_schema_to_avro. From 50e51ad605b447379deae1308eb86e4f5d4e0810 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 22 Sep 2023 13:07:01 -0700 Subject: [PATCH 10/13] fix lint + tests --- metadata-ingestion/setup.py | 1 + .../src/datahub/utilities/sqlalchemy_type_converter.py | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index d7ee8d411e59d..2b3591b2bbb25 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -467,6 +467,7 @@ def get_long_description(): *list( dependency for plugin in [ + "athena", "bigquery", "clickhouse", "clickhouse-usage", diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index 0ec3ece6c7347..3f73b7abd1d24 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -1,7 +1,7 @@ import json import logging import uuid -from typing import Any, Dict, Optional, Type, Union, List +from typing import Any, Dict, List, Optional, Type, Union from sqlalchemy import types from sqlalchemy_bigquery import STRUCT @@ -85,7 +85,6 @@ def get_avro_type( "key_native_data_type": str(key_type), } if isinstance(column_type, STRUCT): - fields = [] for field_def in column_type._STRUCT_fields: field_name, field_type = field_def From b58b6d5ec47d8ec78ec43263dda12d53dd2087e8 Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 3 Oct 2023 08:50:15 +0200 Subject: [PATCH 11/13] address linting issues --- .../src/datahub/utilities/sqlalchemy_type_converter.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index 0ec3ece6c7347..00fd5b039f3ce 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -44,7 +44,7 @@ def get_avro_type( "native_data_type": str(column_type), "_nullable": nullable, } - if type(column_type) == types.DECIMAL: + if isinstance(column_type, types.DECIMAL): return { "type": "bytes", "logicalType": "decimal", @@ -53,14 +53,14 @@ def get_avro_type( "native_data_type": str(column_type), "_nullable": nullable, } - if type(column_type) == types.DATE: + if isinstance(column_type, types.DATE): return { "type": "int", "logicalType": "date", "native_data_type": str(column_type), "_nullable": nullable, } - if type(column_type) == types.TIMESTAMP: + if isinstance(column_type, types.TIMESTAMP): return { "type": "long", "logicalType": "timestamp-millis", @@ -165,7 +165,7 @@ def get_schema_fields_for_sqlalchemy_column( ) # retrieve schema field definitions from the above generated AVRO JSON structure schema_fields = avro_schema_to_mce_fields( - avro_schema_string=json.dumps(avro_schema_json), + avro_schema=json.dumps(avro_schema_json), default_nullable=nullable, swallow_exceptions=False, ) From 4c0da225d225f728e72509f185de9b4c82b7028e Mon Sep 17 00:00:00 2001 From: bossenti Date: Fri, 13 Oct 2023 20:18:04 +0200 Subject: [PATCH 12/13] refactor: replace central definition of STRUCT by registering custom type --- .../src/datahub/ingestion/source/sql/athena.py | 5 ++++- .../src/datahub/ingestion/source/sql/sql_common.py | 2 -- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 583f1cc34a36c..1010c05b6f443 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -26,7 +26,7 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes -from datahub.ingestion.source.sql.sql_common import SQLAlchemySource +from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, register_custom_type from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri from datahub.ingestion.source.sql.sql_types import MapType from datahub.ingestion.source.sql.sql_utils import ( @@ -35,6 +35,7 @@ gen_database_key, ) from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.metadata.schema_classes import RecordTypeClass from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column from datahub.utilities.sqlalchemy_type_converter import ( get_schema_fields_for_sqlalchemy_column, @@ -42,6 +43,8 @@ logger = logging.getLogger(__name__) +register_custom_type(STRUCT, RecordTypeClass) + class CustomAthenaRestDialect(AthenaRestDialect): """Custom definition of the Athena dialect. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 76403dce0d2b8..6524eea8222d4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -21,7 +21,6 @@ from sqlalchemy.exc import ProgrammingError from sqlalchemy.sql import sqltypes as types from sqlalchemy.types import TypeDecorator, TypeEngine -from sqlalchemy_bigquery import STRUCT from datahub.emitter.mce_builder import ( make_data_platform_urn, @@ -159,7 +158,6 @@ class SqlWorkUnit(MetadataWorkUnit): types.JSON: RecordTypeClass, # additional type definitions that are used by the Athena source MapType: MapTypeClass, # type: ignore - STRUCT: RecordTypeClass, # Because the postgresql dialect is used internally by many other dialects, # we add some postgres types here. This is ok to do because the postgresql # dialect is built-in to sqlalchemy. From 92c8000bf1cb835e9b7f2e118681bb9109d530d5 Mon Sep 17 00:00:00 2001 From: bossenti Date: Sun, 15 Oct 2023 12:31:36 +0200 Subject: [PATCH 13/13] style: apply formatting --- .../src/datahub/ingestion/source/sql/athena.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 1010c05b6f443..dad61e5173166 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -26,7 +26,10 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes -from datahub.ingestion.source.sql.sql_common import SQLAlchemySource, register_custom_type +from datahub.ingestion.source.sql.sql_common import ( + SQLAlchemySource, + register_custom_type, +) from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri from datahub.ingestion.source.sql.sql_types import MapType from datahub.ingestion.source.sql.sql_utils import (