Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/unity): Remove metastore from ingestion and urns; standardize platform instance; add notebook filter #8943

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.

## Next
- #8943 - There is a new config param, `include_metastore`, that provides the option of not ingesting
the Unity Catalog metastore associated with your Databricks workspace. We recommend setting this to `false`.
However, if you have previously ingested from unity catalog, setting this to `false` is a breaking change; see that section for details.

### Breaking Changes

- #8810 - Removed support for SQLAlchemy 1.3.x. Only SQLAlchemy 1.4.x is supported now.
- #8853 - The Airflow plugin no longer supports Airflow 2.0.x or Python 3.7. See the docs for more details.
- #8853 - Introduced the Airflow plugin v2. If you're using Airflow 2.3+, the v2 plugin will be enabled by default, and so you'll need to switch your requirements to include `pip install 'acryl-datahub-airflow-plugin[plugin-v2]'`. To continue using the v1 plugin, set the `DATAHUB_AIRFLOW_PLUGIN_USE_V1_PLUGIN` environment variable to `true`.
- #8943 All Unity Catalog urns are changed if a new config param, `include_metastore`, is set to `false`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The phrasing is a bit confusing, and splitting this across the two sections isn't ideal

maybe let's do "The unity catalog ingestion source has a new option include_metastore, which will cause all urns to be changed when disabled. This is currently enabled by default to preserve compatibility, but will be disabled by default and then removed in the future. If stateful ingestion is enabled, simply setting include_metastore: true will perform all required cleanup. Otherwise, we recommend soft deleting all databricks data via the DataHub CLI: datahub delete --platform databricks --soft and then reingest with include_metastore: false.

This is set to `true` by default at the moment, but this default will be changed in the future.
To handle the change in urns, we recommend soft deleting all databricks data via the DataHub CLI: `datahub delete --platform databricks --soft`,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a note that if you have stateful ingest enabled, you don't need to do anything

and then re-ingesting from unity catalog with `include_metastore: false`.

### Potential Downtime

Expand Down
10 changes: 9 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ class MetastoreKey(ContainerKey):
metastore: str


class CatalogKey(MetastoreKey):
class CatalogKeyWithMetastore(MetastoreKey):
catalog: str


class UnitySchemaKeyWithMetastore(CatalogKeyWithMetastore):
unity_schema: str


class CatalogKey(ContainerKey):
catalog: str


Expand Down
42 changes: 39 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
Expand All @@ -22,6 +23,8 @@
is_profiling_enabled,
)

logger = logging.getLogger(__name__)


class UnityCatalogProfilerConfig(ConfigModel):
# TODO: Reduce duplicate code with DataLakeProfilerConfig, GEProfilingConfig, SQLAlchemyConfig
Expand Down Expand Up @@ -97,9 +100,20 @@ class UnityCatalogSourceConfig(
description="Name of the workspace. Default to deployment name present in workspace_url",
)

ingest_data_platform_instance_aspect: Optional[bool] = pydantic.Field(
default=False,
description="Option to enable/disable ingestion of the data platform instance aspect. The default data platform instance id for a dataset is workspace_name",
include_metastore: bool = pydantic.Field(
default=True,
description=(
"Whether to ingest the workspace's metastore as a container and include it in all urns."
" Changing this will affect the urns of all entities in the workspace."
" This will be disabled by default in the future,"
" so it is recommended to set this to False for new ingestions."
" If you have an existing unity catalog ingestion, we recommend deleting existing data"
" via the cli: `datahub delete --platform databricks` and re-ingesting."
),
)

_ingest_data_platform_instance_aspect_removed = pydantic_removed_field(
"ingest_data_platform_instance_aspect"
)

_only_ingest_assigned_metastore_removed = pydantic_removed_field(
Expand All @@ -122,6 +136,16 @@ class UnityCatalogSourceConfig(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in `catalog.schema.table` format. e.g. to match all tables starting with customer in Customer catalog and public schema, use the regex `Customer\\.public\\.customer.*`.",
)

notebook_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
"Regex patterns for notebooks to filter in ingestion, based on notebook *path*."
" Specify regex to match the entire notebook path in `/<dir>/.../<name>` format."
" e.g. to match all notebooks in the root Shared directory, use the regex `/Shared/.*`."
),
)

domain: Dict[str, AllowDenyPattern] = Field(
default=dict(),
description='Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.',
Expand Down Expand Up @@ -182,3 +206,15 @@ def workspace_url_should_start_with_http_scheme(cls, workspace_url: str) -> str:
"Workspace URL must start with http scheme. e.g. https://my-workspace.cloud.databricks.com"
)
return workspace_url

@pydantic.validator("include_metastore")
def include_metastore_warning(cls, v: bool) -> bool:
if v:
msg = (
"include_metastore is enabled."
" This is not recommended and will be disabled by default in the future, which is a breaking change."
" All databricks urns will change if you re-ingest with this disabled."
" We recommend soft deleting all databricks data and re-ingesting with include_metastore set to False."
)
logger.warning(msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also call add_global_warning

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's that do?

return v
16 changes: 9 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,13 @@ def __init__(
self.report = report

def check_basic_connectivity(self) -> bool:
self._workspace_client.metastores.summary()
return True
return bool(self._workspace_client.catalogs.list())

def assigned_metastore(self) -> Metastore:
response = self._workspace_client.metastores.summary()
return self._create_metastore(response)

def catalogs(self, metastore: Metastore) -> Iterable[Catalog]:
def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
response = self._workspace_client.catalogs.list()
if not response:
logger.info("Catalogs not found")
Expand Down Expand Up @@ -247,7 +246,7 @@ def table_lineage(
for item in response.get("upstreams") or []:
if "tableInfo" in item:
table_ref = TableReference.create_from_lineage(
item["tableInfo"], table.schema.catalog.metastore.id
item["tableInfo"], table.schema.catalog.metastore
)
if table_ref:
table.upstreams[table_ref] = {}
Expand Down Expand Up @@ -276,7 +275,7 @@ def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None
)
for item in response.get("upstream_cols", []):
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore.id
item, table.schema.catalog.metastore
)
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
Expand Down Expand Up @@ -305,10 +304,13 @@ def _create_metastore(
comment=None,
)

def _create_catalog(self, metastore: Metastore, obj: CatalogInfo) -> Catalog:
def _create_catalog(
self, metastore: Optional[Metastore], obj: CatalogInfo
) -> Catalog:
catalog_name = self._escape_sequence(obj.name)
return Catalog(
name=obj.name,
id=f"{metastore.id}.{self._escape_sequence(obj.name)}",
id=f"{metastore.id}.{catalog_name}" if metastore else catalog_name,
metastore=metastore,
comment=obj.comment,
owner=obj.owner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Metastore(CommonProperty):

@dataclass
class Catalog(CommonProperty):
metastore: Metastore
metastore: Optional[Metastore]
owner: Optional[str]
type: CatalogType

Expand Down Expand Up @@ -130,25 +130,29 @@ class ServicePrincipal:

@dataclass(frozen=True, order=True)
class TableReference:
metastore: str
metastore: Optional[str]
catalog: str
schema: str
table: str

@classmethod
def create(cls, table: "Table") -> "TableReference":
return cls(
table.schema.catalog.metastore.id,
table.schema.catalog.metastore.id
if table.schema.catalog.metastore
else None,
table.schema.catalog.name,
table.schema.name,
table.name,
)

@classmethod
def create_from_lineage(cls, d: dict, metastore: str) -> Optional["TableReference"]:
def create_from_lineage(
cls, d: dict, metastore: Optional[Metastore]
) -> Optional["TableReference"]:
try:
return cls(
metastore,
metastore.id if metastore else None,
d["catalog_name"],
d["schema_name"],
d.get("table_name", d["name"]), # column vs table query output
Expand All @@ -158,7 +162,10 @@ def create_from_lineage(cls, d: dict, metastore: str) -> Optional["TableReferenc
return None

def __str__(self) -> str:
return f"{self.metastore}.{self.catalog}.{self.schema}.{self.table}"
if self.metastore:
return f"{self.metastore}.{self.catalog}.{self.schema}.{self.table}"
else:
return self.qualified_table_name

@property
def qualified_table_name(self) -> str:
Expand Down
Loading
Loading