Skip to content

Commit

Permalink
Adding config option to lowercase dataset urns
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es committed Oct 3, 2023
1 parent 241a676 commit 461e8d8
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 19 deletions.
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/configuration/source_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ class DatasetSourceConfigMixin(PlatformInstanceConfigMixin, EnvConfigMixin):
"""


LOWER_CASE_URN_CONFIG_KEY = "convert_urns_to_lowercase"


class LowerCaseDatasetUrnConfigMixin(ConfigModel):
convert_urns_to_lowercase2: bool = Field(
default=False,
alias=LOWER_CASE_URN_CONFIG_KEY,
description="Whether to convert dataset urns to lowercase.",
)


class DatasetLineageProviderConfigBase(EnvConfigMixin):
"""
Any non-Dataset source that produces lineage to Datasets should inherit this class.
Expand Down
14 changes: 8 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
from pydantic import BaseModel

from datahub.configuration.common import ConfigModel
from datahub.configuration.source_common import PlatformInstanceConfigMixin
from datahub.configuration.source_common import (
LOWER_CASE_URN_CONFIG_KEY,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
Expand Down Expand Up @@ -194,12 +197,11 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
)

auto_lowercase_urns: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.flags.auto_lowercase_urns
):
if self.ctx.pipeline_config and self.ctx.pipeline_config.source.config:
auto_lowercase_urns = self._get_auto_lowercase_urn_processor(
enabled=self.ctx.pipeline_config.flags.auto_lowercase_urns
enabled=self.ctx.pipeline_config.source.config.get(
LOWER_CASE_URN_CONFIG_KEY, False
)
)

return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
make_dataplatform_instance_urn,
make_dataset_urn,
make_tag_urn,
set_dataset_urn_to_lower,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import BigQueryDatasetKey, ContainerKey, ProjectIdKey
Expand Down Expand Up @@ -218,8 +217,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
if self.config.enable_legacy_sharded_table_support:
BigqueryTableIdentifier._BQ_SHARDED_TABLE_SUFFIX = ""

set_dataset_urn_to_lower(self.config.convert_urns_to_lowercase)

self.bigquery_data_dictionary = BigQuerySchemaApi(
self.report.schema_api_perf, self.config.get_bigquery_client()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="This flag enables the data lineage extraction from Data Lineage API exposed by Google Data Catalog. NOTE: This extractor can't build views lineage. It's recommended to enable the view's DDL parsing. Read the docs to have more information about: https://cloud.google.com/data-catalog/docs/concepts/about-data-lineage",
)

convert_urns_to_lowercase: bool = Field(
default=False,
description="Convert urns to lowercase.",
)

enable_legacy_sharded_table_support: bool = Field(
default=True,
description="Use the legacy sharded table urn suffix added.",
Expand Down
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand Down Expand Up @@ -76,7 +79,11 @@ class KafkaTopicConfigKeys(str, Enum):
UNCLEAN_LEADER_ELECTION_CONFIG = "unclean.leader.election.enable"


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class KafkaSourceConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig()

topic_patterns: AllowDenyPattern = AllowDenyPattern(allow=[".*"], deny=["^_.*"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.pydantic_field_deprecation import pydantic_field_deprecated
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
Expand All @@ -21,7 +24,11 @@
logger: logging.Logger = logging.getLogger(__name__)


class SQLCommonConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
class SQLCommonConfig(
StatefulIngestionConfigBase,
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
options: dict = pydantic.Field(
default_factory=dict,
description="Any options specified here will be passed to [SQLAlchemy.create_engine](https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine) as kwargs.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from pydantic import Field

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
)
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.state.stale_entity_removal_handler import (
Expand Down Expand Up @@ -87,6 +90,7 @@ class UnityCatalogSourceConfig(
BaseUsageConfig,
DatasetSourceConfigMixin,
StatefulProfilingConfigMixin,
LowerCaseDatasetUrnConfigMixin,
):
token: str = pydantic.Field(description="Databricks personal access token")
workspace_url: str = pydantic.Field(
Expand Down

0 comments on commit 461e8d8

Please sign in to comment.