Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/unity): Support specifying catalogs directly; pass env correctly #9110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ qualified dataset name, i.e. `<project_name>.<dataset_name>`. We attempt to supp
pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this
should not cause any issues. However, if you have a complex dataset pattern, we recommend you
manually convert it to the fully qualified format to avoid any potential issues.
- #9110 - The Unity Catalog source will now generate urns based on `env` properly. If you have
been setting `env` in your recipe to something besides `PROD`, we will now generate urns
with that new env variable, invalidating your existing urns.

### Potential Downtime

Expand Down
10 changes: 9 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

import pydantic
from pydantic import Field
Expand Down Expand Up @@ -132,6 +132,14 @@ class UnityCatalogSourceConfig(

_metastore_id_pattern_removed = pydantic_removed_field("metastore_id_pattern")

catalogs: Optional[List[str]] = pydantic.Field(
default=None,
description=(
"Fixed list of catalogs to ingest."
" If not specified, catalogs will be ingested based on `catalog_pattern`."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our allowdenypattern has a is_fully_specified_allow_list property that could be used as a shortcut and avoid requiring an additional config option

),
)

catalog_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for catalogs to filter in ingestion. Specify regex to match the full `metastore.catalog` name.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
for catalog in response:
yield self._create_catalog(metastore, catalog)

def catalog(
self, catalog_name: str, metastore: Optional[Metastore]
) -> Optional[Catalog]:
response = self._workspace_client.catalogs.get(catalog_name)
if not response:
logger.info(f"Catalog {catalog_name} not found")
return None
return self._create_catalog(metastore, response)

def schemas(self, catalog: Catalog) -> Iterable[Schema]:
response = self._workspace_client.schemas.list(catalog_name=catalog.name)
if not response:
Expand Down
27 changes: 23 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,10 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_ingestion_stage_start("Start warehouse")
self.report.report_ingestion_stage_start("Ingestion Setup")
wait_on_warehouse = None
if self.config.is_profiling_enabled():
self.report.report_ingestion_stage_start("Start warehouse")
# Can take several minutes, so start now and wait later
wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse()
if wait_on_warehouse is None:
Expand All @@ -200,8 +201,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
)
return

self.report.report_ingestion_stage_start("Ingest service principals")
self.build_service_principal_map()
if self.config.include_ownership:
self.report.report_ingestion_stage_start("Ingest service principals")
self.build_service_principal_map()
if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Ingest notebooks")
yield from self.process_notebooks()
Expand Down Expand Up @@ -317,7 +319,7 @@ def process_metastores(self) -> Iterable[MetadataWorkUnit]:
def process_catalogs(
self, metastore: Optional[Metastore]
) -> Iterable[MetadataWorkUnit]:
for catalog in self.unity_catalog_api_proxy.catalogs(metastore=metastore):
for catalog in self._get_catalogs(metastore):
if not self.config.catalog_pattern.allowed(catalog.id):
self.report.catalogs.dropped(catalog.id)
continue
Expand All @@ -327,6 +329,17 @@ def process_catalogs(

self.report.catalogs.processed(catalog.id)

def _get_catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
if self.config.catalogs:
for catalog_name in self.config.catalogs:
catalog = self.unity_catalog_api_proxy.catalog(
catalog_name, metastore=metastore
)
if catalog:
yield catalog
else:
yield from self.unity_catalog_api_proxy.catalogs(metastore=metastore)

def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]:
for schema in self.unity_catalog_api_proxy.schemas(catalog=catalog):
if not self.config.schema_pattern.allowed(schema.id):
Expand Down Expand Up @@ -509,6 +522,7 @@ def gen_dataset_urn(self, table_ref: TableReference) -> str:
platform=self.platform,
platform_instance=self.platform_instance_name,
name=str(table_ref),
env=self.config.env,
)

def gen_notebook_urn(self, notebook: Union[Notebook, NotebookId]) -> str:
Expand Down Expand Up @@ -576,20 +590,23 @@ def gen_schema_key(self, schema: Schema) -> ContainerKey:
instance=self.config.platform_instance,
catalog=schema.catalog.name,
metastore=schema.catalog.metastore.name,
env=self.config.env,
)
else:
return UnitySchemaKey(
unity_schema=schema.name,
platform=self.platform,
instance=self.config.platform_instance,
catalog=schema.catalog.name,
env=self.config.env,
)

def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey:
return MetastoreKey(
metastore=metastore.name,
platform=self.platform,
instance=self.config.platform_instance,
env=self.config.env,
)

def gen_catalog_key(self, catalog: Catalog) -> ContainerKey:
Expand All @@ -600,12 +617,14 @@ def gen_catalog_key(self, catalog: Catalog) -> ContainerKey:
metastore=catalog.metastore.name,
platform=self.platform,
instance=self.config.platform_instance,
env=self.config.env,
)
else:
return CatalogKey(
catalog=catalog.name,
platform=self.platform,
instance=self.config.platform_instance,
env=self.config.env,
)

def _gen_domain_urn(self, dataset_name: str) -> Optional[str]:
Expand Down
Loading