Skip to content

Commit

Permalink
feat(ingestion/redshift): collapse lineage to permanent table (#9704)
Browse files Browse the repository at this point in the history
Co-authored-by: Harshal Sheth <[email protected]>
Co-authored-by: treff7es <[email protected]>
  • Loading branch information
3 people authored Feb 1, 2024
1 parent eb97120 commit 5331304
Show file tree
Hide file tree
Showing 11 changed files with 1,515 additions and 59 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Convenience functions for creating MCEs"""

import hashlib
import json
import logging
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def _set_dataset_urn_to_lower_if_needed(self) -> None:
# TODO: Get rid of this function once lower-casing is the standard.
if self.graph:
server_config = self.graph.get_config()
if server_config and server_config.get("datasetUrnNameCasing"):
if server_config and server_config.get("datasetUrnNameCasing") is True:
set_dataset_urn_to_lower(True)

def register_checkpointer(self, committable: Committable) -> None:
Expand Down
17 changes: 10 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ class RedshiftConfig(
description="The default schema to use if the sql parser fails to parse the schema with `sql_based` lineage collector",
)

include_table_lineage: Optional[bool] = Field(
include_table_lineage: bool = Field(
default=True, description="Whether table lineage should be ingested."
)
include_copy_lineage: Optional[bool] = Field(
include_copy_lineage: bool = Field(
default=True,
description="Whether lineage should be collected from copy commands",
)
Expand All @@ -107,17 +107,15 @@ class RedshiftConfig(
description="Generate usage statistic. email_domain config parameter needs to be set if enabled",
)

include_unload_lineage: Optional[bool] = Field(
include_unload_lineage: bool = Field(
default=True,
description="Whether lineage should be collected from unload commands",
)

capture_lineage_query_parser_failures: Optional[bool] = Field(
hide_from_schema=True,
include_table_rename_lineage: bool = Field(
default=False,
description="Whether to capture lineage query parser errors with dataset properties for debugging",
description="Whether we should follow `alter table ... rename to` statements when computing lineage. ",
)

table_lineage_mode: Optional[LineageMode] = Field(
default=LineageMode.STL_SCAN_BASED,
description="Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed]",
Expand All @@ -139,6 +137,11 @@ class RedshiftConfig(
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.",
)

resolve_temp_table_in_lineage: bool = Field(
default=False,
description="Whether to resolve temp table appear in lineage to upstream permanent tables.",
)

@root_validator(pre=True)
def check_email_is_set_on_usage(cls, values):
if values.get("include_usage_statistics"):
Expand Down
Loading

0 comments on commit 5331304

Please sign in to comment.