Skip to content

Commit

Permalink
feat(ingestion/redshift): support auto_incremental_lineage (#9010)
Browse files Browse the repository at this point in the history
  • Loading branch information
siddiquebagwan-gslab authored Oct 25, 2023
1 parent 2d1584b commit b612545
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 10 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

## Next

- #9010 - In Redshift source's config `incremental_lineage` is set default to off.

### Breaking Changes

- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.
Expand Down
10 changes: 3 additions & 7 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,9 @@
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common
| redshift_common
| usage_common
| sqlglot_lib
| {"redshift-connector"},
"redshift-legacy": sql_common | redshift_common,
"redshift-usage-legacy": sql_common | usage_common | redshift_common,
"redshift": sql_common | redshift_common | usage_common | {"redshift-connector"} | sqlglot_lib,
"redshift-legacy": sql_common | redshift_common | sqlglot_lib,
"redshift-usage-legacy": sql_common | redshift_common | sqlglot_lib | usage_common,
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"sagemaker": aws_common,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,13 @@ class RedshiftConfig(
)

extract_column_level_lineage: bool = Field(
default=True, description="Whether to extract column level lineage."
default=True,
description="Whether to extract column level lineage. This config works with rest-sink only.",
)

incremental_lineage: bool = Field(
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run. This config works with rest-sink only.",
)

@root_validator(pre=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from collections import defaultdict
from functools import partial
from typing import Dict, Iterable, List, Optional, Type, Union

import humanfriendly
Expand All @@ -25,6 +26,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Expand Down Expand Up @@ -369,6 +371,11 @@ def gen_database_container(self, database: str) -> Iterable[MetadataWorkUnit]:
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
partial(
auto_incremental_lineage,
self.ctx.graph,
self.config.incremental_lineage,
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
Expand Down Expand Up @@ -942,7 +949,9 @@ def generate_lineage(self, database: str) -> Iterable[MetadataWorkUnit]:
)
if lineage_info:
yield from gen_lineage(
dataset_urn, lineage_info, self.config.incremental_lineage
dataset_urn,
lineage_info,
incremental_lineage=False, # incremental lineage generation is taken care by auto_incremental_lineage
)

for schema in self.db_views[database]:
Expand All @@ -956,7 +965,9 @@ def generate_lineage(self, database: str) -> Iterable[MetadataWorkUnit]:
)
if lineage_info:
yield from gen_lineage(
dataset_urn, lineage_info, self.config.incremental_lineage
dataset_urn,
lineage_info,
incremental_lineage=False, # incremental lineage generation is taken care by auto_incremental_lineage
)

def add_config_to_report(self):
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/tests/unit/test_redshift_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from datahub.ingestion.source.redshift.config import RedshiftConfig


def test_incremental_lineage_default_to_false():
config = RedshiftConfig(host_port="localhost:5439", database="test")
assert config.incremental_lineage is False

0 comments on commit b612545

Please sign in to comment.