Skip to content

Commit

Permalink
feat(ingest/redshift): redshift lineage v2 (#9904)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Feb 24, 2024
1 parent def4b24 commit 3921588
Show file tree
Hide file tree
Showing 16 changed files with 969 additions and 85 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
`datahub delete --platform databricks --soft` and then reingesting with latest cli version.

- #9601 - The Unity Catalog(UC) ingestion source config `include_hive_metastore` is now enabled by default. This requires config `warehouse_id` to be set. You can disable `include_hive_metastore` by setting it to `False` to avoid ingesting legacy hive metastore catalog in Databricks.
- #9904 - The default Redshift `table_lineage_mode` is now MIXED, instead of `STL_SCAN_BASED`. Improved lineage generation is also available by enabling `use_lineaege_v2`. This v2 implementation will become the default in a future release.

### Potential Downtime

Expand Down
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,3 @@ class LineageConfig(ConfigModel):
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

sql_parser_use_external_process: bool = Field(
default=False,
description="When enabled, sql parser will run in isolated in a separate process. This can affect processing time but can protect from sql parser's mem leak.",
)
15 changes: 10 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/redshift/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class RedshiftConfig(
scheme: str = Field(
default="redshift+redshift_connector",
description="",
hidden_from_schema=True,
hidden_from_docs=True,
)

_database_alias_removed = pydantic_removed_field("database_alias")
Expand All @@ -94,6 +94,11 @@ class RedshiftConfig(
description="The default schema to use if the sql parser fails to parse the schema with `sql_based` lineage collector",
)

use_lineage_v2: bool = Field(
default=False,
description="Whether to use the new SQL-based lineage and usage collector.",
)

include_table_lineage: bool = Field(
default=True, description="Whether table lineage should be ingested."
)
Expand All @@ -113,11 +118,11 @@ class RedshiftConfig(
)

include_table_rename_lineage: bool = Field(
default=False,
default=True,
description="Whether we should follow `alter table ... rename to` statements when computing lineage. ",
)
table_lineage_mode: Optional[LineageMode] = Field(
default=LineageMode.STL_SCAN_BASED,
table_lineage_mode: LineageMode = Field(
default=LineageMode.MIXED,
description="Which table lineage collector mode to use. Available modes are: [stl_scan_based, sql_based, mixed]",
)
extra_client_options: Dict[str, Any] = {}
Expand All @@ -138,7 +143,7 @@ class RedshiftConfig(
)

resolve_temp_table_in_lineage: bool = Field(
default=False,
default=True,
description="Whether to resolve temp table appear in lineage to upstream permanent tables.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ def get_lineage(
table: Union[RedshiftTable, RedshiftView],
dataset_urn: str,
schema: RedshiftSchema,
) -> Optional[Tuple[UpstreamLineageClass, Dict[str, str]]]:
) -> Optional[UpstreamLineageClass]:
upstream_lineage: List[UpstreamClass] = []

cll_lineage: List[FineGrainedLineage] = []
Expand Down Expand Up @@ -811,11 +811,9 @@ def get_lineage(
else:
return None

return (
UpstreamLineage(
upstreams=upstream_lineage, fineGrainedLineages=cll_lineage or None
),
{},
return UpstreamLineage(
upstreams=upstream_lineage,
fineGrainedLineages=cll_lineage or None,
)

def report_status(self, step: str, status: bool) -> None:
Expand Down
Loading

0 comments on commit 3921588

Please sign in to comment.