Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/redshift): add path_spec support for copy command #10596

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,14 @@ def _get_s3_path(self, path: str) -> Optional[str]:
if self.config.s3_lineage_config:
for path_spec in self.config.s3_lineage_config.path_specs:
if path_spec.allowed(path):
self.report.num_s3_lineage_path_spec_match += 1
_, table_path = path_spec.extract_table_name_and_path(path)
return table_path

if self.config.s3_lineage_config.path_specs:
self.report.num_s3_lineage_path_spec_mismatch += 1
self.report.s3_lineage_path_spec_mismatch.append(path)

if (
self.config.s3_lineage_config.ignore_non_path_spec_path
and len(self.config.s3_lineage_config.path_specs) > 0
Expand Down Expand Up @@ -373,29 +378,24 @@ def _get_sources(
self.report.num_lineage_dropped_query_parser += 1
else:
if lineage_type == lineage_type.COPY and filename is not None:
platform = LineageDatasetPlatform.S3
path = filename.strip()
if urlparse(path).scheme != "s3":
logger.warning(
"Only s3 source supported with copy. The source was: {path}."
try:
platform = LineageDatasetPlatform.S3
# Following call requires 'filename' key in lineage_row
source_path = self._build_s3_path_from_row(filename)
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
name=source_path,
env=self.config.env,
platform_instance=(
self.config.platform_instance_map.get(platform.value)
if self.config.platform_instance_map is not None
else None
),
)
except ValueError as e:
self.warn(logger, "non-s3-lineage", str(e))
self.report.num_lineage_dropped_not_support_copy_path += 1
return [], None
s3_path = self._get_s3_path(path)
if s3_path is None:
return [], None

path = strip_s3_prefix(s3_path)
urn = make_dataset_urn_with_platform_instance(
platform=platform.value,
name=path,
env=self.config.env,
platform_instance=(
self.config.platform_instance_map.get(platform.value)
if self.config.platform_instance_map is not None
else None
),
)
elif source_schema is not None and source_table is not None:
platform = LineageDatasetPlatform.REDSHIFT
path = f"{db_name}.{source_schema}.{source_table}"
Expand Down Expand Up @@ -569,6 +569,7 @@ def _get_target_lineage(
)
except ValueError as e:
self.warn(logger, "non-s3-lineage", str(e))
self.report.num_lineage_dropped_not_support_copy_path += 1
return None
else:
target_platform = LineageDatasetPlatform.REDSHIFT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.sql_parsing.sql_parsing_aggregator import SqlAggregatorReport
from datahub.utilities.lossy_collections import LossyDict
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.stats_collections import TopKDict

Expand Down Expand Up @@ -37,6 +37,7 @@ class RedshiftReport(
lineage_mem_size: Dict[str, str] = field(default_factory=TopKDict)
tables_in_mem_size: Dict[str, str] = field(default_factory=TopKDict)
views_in_mem_size: Dict[str, str] = field(default_factory=TopKDict)
s3_lineage_path_spec_mismatch: LossyList = field(default_factory=LossyList)
num_operational_stats_filtered: int = 0
num_repeated_operations_dropped: int = 0
num_usage_stat_skipped: int = 0
Expand All @@ -46,6 +47,9 @@ class RedshiftReport(
num_lineage_processed_temp_tables = 0
num_lineage_dropped_s3_path: int = 0
num_alter_table_parse_errors: int = 0
num_lineage_processed_temp_tables: int = 0
num_s3_lineage_path_spec_mismatch: int = 0
num_s3_lineage_path_spec_match: int = 0

lineage_start_time: Optional[datetime] = None
lineage_end_time: Optional[datetime] = None
Expand Down
Loading