Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/unity): GE Profiling #8951

Merged
merged 18 commits into from
Dec 6, 2023
Merged
4 changes: 4 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #9257: The Python SDK urn types are now autogenerated. The new classes are largely backwards compatible with the previous, manually written classes, but many older methods are now deprecated in favor of a more uniform interface. The only breaking change is that the signature for the director constructor e.g. `TagUrn("tag", ["tag_name"])` is no longer supported, and the simpler `TagUrn("tag_name")` should be used instead.
The canonical place to import the urn classes from is `datahub.metadata.urns.*`. Other import paths, like `datahub.utilities.urns.corpuser_urn.CorpuserUrn` are retained for backwards compatibility, but are considered deprecated.
- #9286: The `DataHubRestEmitter.emit` method no longer returns anything. It previously returned a tuple of timestamps.
- #8951: A great expectations based profiler has been added for the Unity Catalog source.
To use the old profiler, set `method: analyze` under the `profiling` section in your recipe.
To use the new profiler, set `method: ge`. Profiling is disabled by default, so to enable it,
one of these methods must be specified.

### Potential Downtime

Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
"databricks-sdk>=0.9.0",
"pyspark~=3.3.0",
"requests",
"databricks-sql-connector>=2.8.0", # Only added in 2.4.0, bug fixes since
}

mysql = sql_common | {"pymysql>=1.0.2"}
Expand Down Expand Up @@ -396,7 +397,7 @@
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"},
"unity-catalog": databricks | sqllineage_lib,
"unity-catalog": databricks | sql_common | sqllineage_lib,
"fivetran": snowflake_common,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def get_workunits(
return
yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import sqlalchemy as sa
import sqlalchemy.sql.compiler
from great_expectations.core.profiler_types_mapping import ProfilerTypeMapping
from great_expectations.core.util import convert_to_json_serializable
from great_expectations.data_context import AbstractDataContext, BaseDataContext
from great_expectations.data_context.types.base import (
Expand Down Expand Up @@ -77,8 +78,26 @@
SNOWFLAKE = "snowflake"
BIGQUERY = "bigquery"
REDSHIFT = "redshift"
DATABRICKS = "databricks"
TRINO = "trino"

# Type names for Databricks, to match Title Case types in sqlalchemy
ProfilerTypeMapping.INT_TYPE_NAMES.append("Integer")
ProfilerTypeMapping.INT_TYPE_NAMES.append("SmallInteger")
ProfilerTypeMapping.INT_TYPE_NAMES.append("BigInteger")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Float")
ProfilerTypeMapping.FLOAT_TYPE_NAMES.append("Numeric")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("String")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Text")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("Unicode")
ProfilerTypeMapping.STRING_TYPE_NAMES.append("UnicodeText")
ProfilerTypeMapping.BOOLEAN_TYPE_NAMES.append("Boolean")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Date")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("DateTime")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Time")
ProfilerTypeMapping.DATETIME_TYPE_NAMES.append("Interval")
ProfilerTypeMapping.BINARY_TYPE_NAMES.append("LargeBinary")

# The reason for this wacky structure is quite fun. GE basically assumes that
# the config structures were generated directly from YML and further assumes that
# they can be `deepcopy`'d without issue. The SQLAlchemy engine and connection
Expand Down Expand Up @@ -697,6 +716,9 @@ def generate_dataset_profile( # noqa: C901 (complexity)
1, unique_count / non_null_count
)

if not profile.rowCount:
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
continue

self._get_dataset_column_sample_values(column_profile, column)

if (
Expand Down Expand Up @@ -1172,7 +1194,7 @@ def _get_ge_dataset(
},
)

if platform == BIGQUERY:
if platform == BIGQUERY or platform == DATABRICKS:
# This is done as GE makes the name as DATASET.TABLE
# but we want it to be PROJECT.DATASET.TABLE instead for multi-project setups
name_parts = pretty_name.split(".")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def get_workunits(

yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
db,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def get_workunits(

yield from self.generate_profile_workunits(
profile_requests,
self.config.profiling.max_workers,
database.name,
max_workers=self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def generate_profile_workunits(
self,
requests: List[TableProfilerRequest],
max_workers: int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can force these to be kwargs for clarity

Suggested change
max_workers: int,
*, max_workers: int,

db_name: Optional[str] = None,
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -98,7 +97,7 @@ def generate_profile_workunits(
return

# Otherwise, if column level profiling is enabled, use GE profiler.
ge_profiler = self.get_profiler_instance(db_name)
ge_profiler = self.get_profiler_instance()

for ge_profiler_request, profile in ge_profiler.generate_profiles(
ge_profile_requests, max_workers, platform, profiler_args
Expand Down Expand Up @@ -199,9 +198,7 @@ def get_inspectors(self) -> Iterable[Inspector]:
inspector = inspect(conn)
yield inspector

def get_profiler_instance(
self, db_name: Optional[str] = None
asikowitz marked this conversation as resolved.
Show resolved Hide resolved
) -> "DatahubGEProfiler":
def get_profiler_instance(self) -> "DatahubGEProfiler":
logger.debug(f"Getting profiler instance from {self.platform}")
url = self.config.get_sql_alchemy_url()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.unity.config import UnityCatalogProfilerConfig
from datahub.ingestion.source.unity.config import UnityCatalogAnalyzeProfilerConfig
from datahub.ingestion.source.unity.proxy import UnityCatalogApiProxy
from datahub.ingestion.source.unity.proxy_types import (
ColumnProfile,
Expand All @@ -23,8 +23,8 @@


@dataclass
class UnityCatalogProfiler:
config: UnityCatalogProfilerConfig
class UnityCatalogAnalyzeProfiler:
config: UnityCatalogAnalyzeProfilerConfig
report: UnityCatalogReport
proxy: UnityCatalogApiProxy
dataset_urn_builder: Callable[[TableReference], str]
Expand Down
72 changes: 56 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from urllib.parse import urlparse

import pydantic
from pydantic import Field
from typing_extensions import Literal

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import (
Expand All @@ -13,6 +15,9 @@
)
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.ge_data_profiler import DATABRICKS
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, make_sqlalchemy_uri
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
Expand All @@ -31,24 +36,12 @@


class UnityCatalogProfilerConfig(ConfigModel):
# TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)
method: str
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this show up in the docs? does this need a Field(description="docs")?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it doesn't show up at all. I'll add a description but in general, our docs support for discriminated unions is not very good -- we don't show which type supports which options. I'll update example recipes to help here


warehouse_id: Optional[str] = Field(
default=None, description="SQL Warehouse id, for running profiling queries."
)

profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only or include column-level profiling as well.",
)

pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand All @@ -58,6 +51,24 @@ class UnityCatalogProfilerConfig(ConfigModel):
),
)


class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig):
method: Literal["analyze"] = "analyze"

# TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes please

enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
operation_config: OperationConfig = Field(
default_factory=OperationConfig,
description="Experimental feature. To specify operation configs.",
)

profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only or include column-level profiling as well.",
)

call_analyze: bool = Field(
default=True,
description=(
Expand Down Expand Up @@ -89,7 +100,17 @@ def include_columns(self):
return not self.profile_table_level_only


class UnityCatalogGEProfilerConfig(UnityCatalogProfilerConfig, GEProfilingConfig):
method: Literal["ge"] = "ge"

max_wait_secs: Optional[int] = Field(
default=None,
description="Maximum time to wait for a table to be profiled.",
)


class UnityCatalogSourceConfig(
SQLCommonConfig,
StatefulIngestionConfigBase,
BaseUsageConfig,
DatasetSourceConfigMixin,
Expand Down Expand Up @@ -217,15 +238,34 @@ class UnityCatalogSourceConfig(
description="Generate usage statistics.",
)

profiling: UnityCatalogProfilerConfig = Field(
default=UnityCatalogProfilerConfig(), description="Data profiling configuration"
profiling: Union[UnityCatalogGEProfilerConfig, UnityCatalogAnalyzeProfilerConfig] = Field( # type: ignore
default=UnityCatalogGEProfilerConfig(),
description="Data profiling configuration",
discriminator="method",
)

scheme: str = DATABRICKS

def get_sql_alchemy_url(self):
return make_sqlalchemy_uri(
scheme=self.scheme,
username="token",
password=self.token,
at=urlparse(self.workspace_url).netloc,
db=None,
uri_opts={
"http_path": f"/sql/1.0/warehouses/{self.profiling.warehouse_id}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes use of SQL warehouse. Looks like this may take different formats.

http-path is the HTTP Path either to a Databricks SQL endpoint (e.g. /sql/1.0/endpoints/1234567890abcdef), or to a Databricks Runtime interactive cluster (e.g. /sql/protocolv1/o/1234567890123456/1234-123456-slid123).

Ref - https://pypi.org/project/databricks-sql-connector/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good point. It's going to be a bit annoying to support both profilers :|

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gonna hold off on this... want to get this in before I never get to it again

},
)

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

def is_ge_profiling(self) -> bool:
return self.profiling.method == "ge"

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="Unity Catalog Stateful Ingestion Config."
)
Expand Down
Loading
Loading