From f22aedd8c3c055a787c11ad43c9d13c94d3850be Mon Sep 17 00:00:00 2001 From: bossenti Date: Tue, 30 May 2023 19:35:53 +0200 Subject: [PATCH] backport to python 3.7 --- .../src/datahub/ingestion/source/sql/athena.py | 6 +++--- .../datahub/utilities/sqlalchemy_type_converter.py | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index b6c4ce5851b5d..7db2a3af17608 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -2,7 +2,7 @@ import logging import re import typing -from typing import Any, Dict, Iterable, List, Optional, Tuple, cast +from typing import Any, Dict, Iterable, List, Optional, Tuple, cast, Union import pydantic from pyathena.common import BaseCursor @@ -60,7 +60,7 @@ class CustomAthenaRestDialect(AthenaRestDialect): _complex_type_pattern = re.compile(r"(<.+>)") @typing.no_type_check - def _get_column_type(self, type_: str | dict) -> TypeEngine: # noqa: C901 + def _get_column_type(self, type_: Union[str, Dict[str, Any]]) -> TypeEngine: # noqa: C901 """Derives the data type of the Athena column. This method is overwritten to extend the behavior of PyAthena. @@ -409,7 +409,7 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: def get_schema_fields_for_column( self, dataset_name: str, - column: dict, + column: Dict, pk_constraints: Optional[dict] = None, tags: Optional[List[str]] = None, ) -> List[SchemaField]: diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py index 4142988cd859a..b76e545f0b799 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_type_converter.py @@ -1,7 +1,7 @@ import json import logging import uuid -from typing import Union +from typing import Union, Dict, Type, Any, Optional from sqlalchemy import types from sqlalchemy_bigquery import STRUCT @@ -21,7 +21,7 @@ class SqlAlchemyColumnToAvroConverter: _COMPLEX_TYPES = (STRUCT, types.ARRAY, MapType) # mapping of primitive SQLalchemy data types to AVRO schema data types - PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE: dict[type[types.TypeEngine], str] = { + PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE: Dict[Type[types.TypeEngine], str] = { types.String: "string", types.BINARY: "string", types.BOOLEAN: "boolean", @@ -35,7 +35,7 @@ class SqlAlchemyColumnToAvroConverter: @classmethod def get_avro_type( cls, column_type: Union[types.TypeEngine, STRUCT, MapType], nullable: bool - ) -> dict[str, object]: + ) -> Dict[str, Any]: """Determines the concrete AVRO schema type for a SQLalchemy-typed column""" if type(column_type) in cls.PRIMITIVE_SQL_ALCHEMY_TYPE_TO_AVRO_TYPE.keys(): @@ -119,7 +119,7 @@ def get_avro_for_sqlalchemy_column( column_name: str, column_type: types.TypeEngine, nullable: bool, - ) -> object | dict[str, object]: + ) -> Union[object, Dict[str, object]]: """Returns the AVRO schema representation of a SQLalchemy column.""" if isinstance(column_type, cls._COMPLEX_TYPES): return { @@ -140,8 +140,8 @@ def get_avro_for_sqlalchemy_column( def get_schema_fields_for_sqlalchemy_column( column_name: str, column_type: types.TypeEngine, - description: str | None = None, - nullable: bool | None = True, + description: Optional[str] = None, + nullable: Optional[bool] = True, is_part_of_key: bool | None = False, ) -> list[SchemaField]: """Creates SchemaFields from a given SQLalchemy column.