From 44820d7a9603abe0ef5862251e62c2d048d79a95 Mon Sep 17 00:00:00 2001 From: treff7es Date: Wed, 30 Aug 2023 15:26:12 +0200 Subject: [PATCH 1/5] Converting windows style path to posix one --- .../src/datahub/ingestion/source/s3/source.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index ab5d3a4e007ac..ac4433b7eb1f0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -7,6 +7,7 @@ import time from collections import OrderedDict from datetime import datetime +from pathlib import PurePath from typing import Any, Dict, Iterable, List, Optional, Tuple from more_itertools import peekable @@ -819,7 +820,10 @@ def local_browser(self, path_spec: PathSpec) -> Iterable[Tuple[str, datetime, in dirs.sort(key=functools.cmp_to_key(partitioned_folder_comparator)) for file in sorted(files): - full_path = os.path.join(root, file) + # We need to make sure the path is in posix style which is not true on windows + full_path = PurePath( + os.path.normpath(os.path.join(root, file)) + ).as_posix() yield full_path, datetime.utcfromtimestamp( os.path.getmtime(full_path) ), os.path.getsize(full_path) From 98ac1001753e4ca003980d11b08d2ae9612fcc55 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 26 Sep 2023 10:16:12 +0200 Subject: [PATCH 2/5] Option to add s3 partitions to schema --- .../src/datahub/ingestion/source/s3/config.py | 5 ++- .../src/datahub/ingestion/source/s3/source.py | 33 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py index f1dd622efb746..559311d048093 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/config.py @@ -75,7 +75,10 @@ class DataLakeSourceConfig( default=100, description="Maximum number of rows to use when inferring schemas for TSV and CSV files.", ) - + add_partition_columns_to_schema: bool = Field( + default=False, + description="Whether to add partition fields to the schema.", + ) verify_ssl: Union[bool, str] = Field( default=True, description="Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index ac4433b7eb1f0..47af884169230 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -77,10 +77,11 @@ NullTypeClass, NumberTypeClass, RecordTypeClass, - SchemaFieldDataType, SchemaMetadata, StringTypeClass, TimeTypeClass, + SchemaField, + SchemaFieldDataType, ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, @@ -90,6 +91,7 @@ OperationTypeClass, OtherSchemaClass, _Aspect, + SchemaFieldDataTypeClass, ) from datahub.telemetry import stats, telemetry from datahub.utilities.perf_timer import PerfTimer @@ -452,8 +454,37 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: logger.debug(f"Extracted fields in schema: {fields}") fields = sorted(fields, key=lambda f: f.fieldPath) + if self.source_config.add_partition_columns_to_schema: + self.add_partition_columns_to_schema(fields, path_spec, table_data) + return fields + def add_partition_columns_to_schema( + self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] + ): + is_fieldpath_v2 = False + for field in fields: + if field.fieldPath.startswith("[version=2.0]"): + is_fieldpath_v2 = True + break + vars = path_spec.get_named_vars(table_data.full_path) + if "partition_key" in vars: + for partition_key in vars["partition_key"].values(): + fields.append( + SchemaField( + fieldPath=f"{partition_key}" + if not is_fieldpath_v2 + else f"[version=2.0].[type=string].{partition_key}", + nativeDataType="string", + type=SchemaFieldType(StringTypeClass) + if not is_fieldpath_v2 + else SchemaFieldDataTypeClass(type=StringTypeClass()), + isPartitioningKey=True, + nullable=True, + recursive=False, + ) + ) + def get_table_profile( self, table_data: TableData, dataset_urn: str ) -> Iterable[MetadataWorkUnit]: From 709da845acdae3e285beafd04e83d62c0158665f Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 26 Sep 2023 11:18:45 +0200 Subject: [PATCH 3/5] Running isort --- .../src/datahub/ingestion/source/s3/source.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 47af884169230..41bfa86041486 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -77,11 +77,11 @@ NullTypeClass, NumberTypeClass, RecordTypeClass, + SchemaField, + SchemaFieldDataType, SchemaMetadata, StringTypeClass, TimeTypeClass, - SchemaField, - SchemaFieldDataType, ) from datahub.metadata.schema_classes import ( DataPlatformInstanceClass, @@ -90,8 +90,8 @@ OperationClass, OperationTypeClass, OtherSchemaClass, - _Aspect, SchemaFieldDataTypeClass, + _Aspect, ) from datahub.telemetry import stats, telemetry from datahub.utilities.perf_timer import PerfTimer From 82f9a84e27409eb6af7390349ba4ccc0b8ef5bd7 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 26 Sep 2023 20:18:35 +0200 Subject: [PATCH 4/5] Small fixes --- .../src/datahub/ingestion/source/s3/source.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index 41bfa86041486..e441949a2e2fa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -455,7 +455,9 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: fields = sorted(fields, key=lambda f: f.fieldPath) if self.source_config.add_partition_columns_to_schema: - self.add_partition_columns_to_schema(fields, path_spec, table_data) + self.add_partition_columns_to_schema( + fields=fields, path_spec=path_spec, table_data=table_data + ) return fields @@ -476,7 +478,7 @@ def add_partition_columns_to_schema( if not is_fieldpath_v2 else f"[version=2.0].[type=string].{partition_key}", nativeDataType="string", - type=SchemaFieldType(StringTypeClass) + type=SchemaFieldDataType(StringTypeClass) if not is_fieldpath_v2 else SchemaFieldDataTypeClass(type=StringTypeClass()), isPartitioningKey=True, From 97b713d729b6394e7f51d126a92cd2d35517b6bf Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 2 Oct 2023 12:27:43 +0200 Subject: [PATCH 5/5] Fixing linter issues --- .../src/datahub/ingestion/source/s3/source.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py index e441949a2e2fa..9edbb7c846147 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/s3/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/s3/source.py @@ -456,21 +456,21 @@ def get_fields(self, table_data: TableData, path_spec: PathSpec) -> List: if self.source_config.add_partition_columns_to_schema: self.add_partition_columns_to_schema( - fields=fields, path_spec=path_spec, table_data=table_data + fields=fields, path_spec=path_spec, full_path=table_data.full_path ) return fields def add_partition_columns_to_schema( self, path_spec: PathSpec, full_path: str, fields: List[SchemaField] - ): + ) -> None: is_fieldpath_v2 = False for field in fields: if field.fieldPath.startswith("[version=2.0]"): is_fieldpath_v2 = True break - vars = path_spec.get_named_vars(table_data.full_path) - if "partition_key" in vars: + vars = path_spec.get_named_vars(full_path) + if vars is not None and "partition_key" in vars: for partition_key in vars["partition_key"].values(): fields.append( SchemaField( @@ -478,7 +478,7 @@ def add_partition_columns_to_schema( if not is_fieldpath_v2 else f"[version=2.0].[type=string].{partition_key}", nativeDataType="string", - type=SchemaFieldDataType(StringTypeClass) + type=SchemaFieldDataType(StringTypeClass()) if not is_fieldpath_v2 else SchemaFieldDataTypeClass(type=StringTypeClass()), isPartitioningKey=True,