Skip to content

Commit

Permalink
feat(ingestion): Adding config option to auto lowercase dataset urns (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Oct 12, 2023
1 parent dd418de commit c381806
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 20 deletions.
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin):
"""


class LowerCaseDatasetUrnConfigMixin(ConfigModel):
convert_urns_to_lowercase: bool = Field(
default=False,
description="Whether to convert dataset urns to lowercase.",
)


class DatasetLineageProviderConfigBase(EnvConfigMixin):
"""
Any non-Dataset source that produces lineage to Datasets should inherit this class.
Expand Down
24 changes: 24 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_lowercase_urns,
auto_materialize_referenced_tags,
auto_status_aspect,
auto_workunit_reporter,
Expand Down Expand Up @@ -192,7 +193,30 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
)

auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.source
and self.ctx.pipeline_config.source.config
and (
(
hasattr(
self.ctx.pipeline_config.source.config,
"convert_urns_to_lowercase",
)
and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase
)
or (
hasattr(self.ctx.pipeline_config.source.config, "get")
and self.ctx.pipeline_config.source.config.get(
"convert_urns_to_lowercase"
)
)
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns
return [
auto_lowercase_dataset_urns,
auto_status_aspect,
auto_materialize_referenced_tags,
browse_path_processor,
Expand Down
20 changes: 18 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns
from datahub.utilities.urns.urn_iter import list_urns, lowercase_dataset_urns

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
Expand Down Expand Up @@ -70,7 +70,6 @@ def auto_status_aspect(
for wu in stream:
urn = wu.get_urn()
all_urns.add(urn)

if not wu.is_primary_source:
# If this is a non-primary source, we pretend like we've seen the status
# aspect so that we don't try to emit a removal for it.
Expand Down Expand Up @@ -173,6 +172,23 @@ def auto_materialize_referenced_tags(
).as_workunit()


def auto_lowercase_urns(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""Lowercase all dataset urns"""

for wu in stream:
try:
old_urn = wu.get_urn()
lowercase_dataset_urns(wu.metadata)
wu.id = wu.id.replace(old_urn, wu.get_urn())

yield wu
except Exception as e:
logger.warning(f"Failed to lowercase urns for {wu}: {e}", exc_info=True)
yield wu


def auto_browse_path_v2(
stream: Iterable[MetadataWorkUnit],
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
make_dataplatform_instance_urn,
make_dataset_urn,
make_tag_urn,
set_dataset_urn_to_lower,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey
Expand Down Expand Up @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
if self.config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase)

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",
)

convert_urns_to_lowercase: bool = Field(
default=False,
description="Convert urns to lowercase.",
)

enable_legacy_sharded_table_support: bool = Field(
default=True,
description="Use the legacy sharded table urn suffix added.",
Expand Down
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand Down Expand Up @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum):
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand All @@ -21,7 +24,11 @@
logger: logging.Logger = logging.getLogger(__name__)


class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class SQLCommonConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
options: dict = pydantic.Field(
default_factory=dict,
description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -91,6 +94,7 @@ class UnityCatalogSourceConfig(
BaseUsageConfig,
DatasetSourceConfigMixin,
StatefulProfilingConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(
Expand Down
33 changes: 28 additions & 5 deletions metadata-ingestion/src/datahub/utilities/urns/urn_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from avro.schema import Field, RecordSchema

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import DictWrapper
from datahub.metadata.schema_classes import (
DictWrapper,
MetadataChangeEventClass,
MetadataChangeProposalClass,
)
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn, guess_entity_type

Expand Down Expand Up @@ -32,7 +36,7 @@ def list_urns_with_path(

if isinstance(model, MetadataChangeProposalWrapper):
if model.entityUrn:
urns.append((model.entityUrn, ["urn"]))
urns.append((model.entityUrn, ["entityUrn"]))
if model.entityKeyAspect:
urns.extend(
_add_prefix_to_paths(
Expand Down Expand Up @@ -83,7 +87,15 @@ def list_urns(model: Union[DictWrapper, MetadataChangeProposalWrapper]) -> List[
return [urn for urn, _ in list_urns_with_path(model)]


def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None:
def transform_urns(
model: Union[
DictWrapper,
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
],
func: Callable[[str], str],
) -> None:
"""
Rewrites all URNs in the given object according to the given function.
"""
Expand All @@ -95,14 +107,18 @@ def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None:


def _modify_at_path(
model: Union[DictWrapper, list], path: _Path, new_value: str
model: Union[DictWrapper, MetadataChangeProposalWrapper, list],
path: _Path,
new_value: str,
) -> None:
assert len(path) > 0

if len(path) == 1:
if isinstance(path[0], int):
assert isinstance(model, list)
model[path[0]] = new_value
elif isinstance(model, MetadataChangeProposalWrapper):
setattr(model, path[0], new_value)
else:
assert isinstance(model, DictWrapper)
model._inner_dict[path[0]] = new_value
Expand All @@ -120,7 +136,14 @@ def _lowercase_dataset_urn(dataset_urn: str) -> str:
return str(cur_urn)


def lowercase_dataset_urns(model: DictWrapper) -> None:
def lowercase_dataset_urns(
model: Union[
DictWrapper,
MetadataChangeEventClass,
MetadataChangeProposalClass,
MetadataChangeProposalWrapper,
]
) -> None:
def modify_urn(urn: str) -> str:
if guess_entity_type(urn) == "dataset":
return _lowercase_dataset_urn(urn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_empty_dataset_usage_statistics,
auto_lowercase_urns,
auto_status_aspect,
auto_workunit,
)
Expand Down Expand Up @@ -275,6 +276,75 @@ def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock):
assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"])


def test_auto_lowercase_aspects():
mcws = auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(
"bigquery", "myProject.mySchema.myTable", "PROD"
),
aspect=models.DatasetKeyClass(
"urn:li:dataPlatform:bigquery", "myProject.mySchema.myTable", "PROD"
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-Public-Data.Covid19_Aha.staffing,PROD)",
aspects=[
models.DatasetPropertiesClass(
customProperties={
"key": "value",
},
),
],
),
),
]
)

expected = [
*list(
auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)",
aspect=models.DatasetKeyClass(
"urn:li:dataPlatform:bigquery",
"myProject.mySchema.myTable",
"PROD",
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_aha.staffing,PROD)",
aspects=[
models.DatasetPropertiesClass(
customProperties={
"key": "value",
},
),
],
),
),
]
)
),
]
assert list(auto_lowercase_urns(mcws)) == expected


@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock):
structure = {"a": {"b": ["c"]}}
Expand Down

0 comments on commit c381806

Please sign in to comment.