From 4d9b87112b81e92a9fd0f5aaf7dc3225f0a141d7 Mon Sep 17 00:00:00 2001 From: treff7es Date: Mon, 27 May 2024 14:04:46 +0200 Subject: [PATCH] Adding path_spec support for copy command --- .../ingestion/source/redshift/lineage.py | 41 ++++++++++--------- .../ingestion/source/redshift/report.py | 6 ++- 2 files changed, 26 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py index dadb06b6a95e2..a5b2ffe00c535 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py @@ -269,9 +269,14 @@ def _get_s3_path(self, path: str) -> Optional[str]: if self.config.s3_lineage_config: for path_spec in self.config.s3_lineage_config.path_specs: if path_spec.allowed(path): + self.report.num_s3_lineage_path_spec_match += 1 _, table_path = path_spec.extract_table_name_and_path(path) return table_path + if self.config.s3_lineage_config.path_specs: + self.report.num_s3_lineage_path_spec_mismatch += 1 + self.report.s3_lineage_path_spec_mismatch.append(path) + if ( self.config.s3_lineage_config.ignore_non_path_spec_path and len(self.config.s3_lineage_config.path_specs) > 0 @@ -373,29 +378,24 @@ def _get_sources( self.report.num_lineage_dropped_query_parser += 1 else: if lineage_type == lineage_type.COPY and filename is not None: - platform = LineageDatasetPlatform.S3 - path = filename.strip() - if urlparse(path).scheme != "s3": - logger.warning( - "Only s3 source supported with copy. The source was: {path}." + try: + platform = LineageDatasetPlatform.S3 + # Following call requires 'filename' key in lineage_row + source_path = self._build_s3_path_from_row(filename) + urn = make_dataset_urn_with_platform_instance( + platform=platform.value, + name=source_path, + env=self.config.env, + platform_instance=( + self.config.platform_instance_map.get(platform.value) + if self.config.platform_instance_map is not None + else None + ), ) + except ValueError as e: + self.warn(logger, "non-s3-lineage", str(e)) self.report.num_lineage_dropped_not_support_copy_path += 1 return [], None - s3_path = self._get_s3_path(path) - if s3_path is None: - return [], None - - path = strip_s3_prefix(s3_path) - urn = make_dataset_urn_with_platform_instance( - platform=platform.value, - name=path, - env=self.config.env, - platform_instance=( - self.config.platform_instance_map.get(platform.value) - if self.config.platform_instance_map is not None - else None - ), - ) elif source_schema is not None and source_table is not None: platform = LineageDatasetPlatform.REDSHIFT path = f"{db_name}.{source_schema}.{source_table}" @@ -569,6 +569,7 @@ def _get_target_lineage( ) except ValueError as e: self.warn(logger, "non-s3-lineage", str(e)) + self.report.num_lineage_dropped_not_support_copy_path += 1 return None else: target_platform = LineageDatasetPlatform.REDSHIFT diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py index ff28ed2c5e849..ce941a8282dd8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/report.py @@ -7,7 +7,7 @@ from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport -from datahub.utilities.lossy_collections import LossyDict +from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.stats_collections import TopKDict @@ -37,6 +37,7 @@ class RedshiftReport( lineage_mem_size: Dict[str, str] = field(default_factory=TopKDict) tables_in_mem_size: Dict[str, str] = field(default_factory=TopKDict) views_in_mem_size: Dict[str, str] = field(default_factory=TopKDict) + s3_lineage_path_spec_mismatch: LossyList = field(default_factory=LossyList) num_operational_stats_filtered: int = 0 num_repeated_operations_dropped: int = 0 num_usage_stat_skipped: int = 0 @@ -46,6 +47,9 @@ class RedshiftReport( num_lineage_processed_temp_tables = 0 num_lineage_dropped_s3_path: int = 0 num_alter_table_parse_errors: int = 0 + num_lineage_processed_temp_tables: int = 0 + num_s3_lineage_path_spec_mismatch: int = 0 + num_s3_lineage_path_spec_match: int = 0 lineage_start_time: Optional[datetime] = None lineage_end_time: Optional[datetime] = None