diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 4d1535f28fa0a..c3a92c782068c 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -57,6 +57,9 @@ qualified dataset name, i.e. `.`. We attempt to supp pattern format by prepending `.*\\.` to dataset patterns lacking a period, so in most cases this should not cause any issues. However, if you have a complex dataset pattern, we recommend you manually convert it to the fully qualified format to avoid any potential issues. +- #9110 - The Unity Catalog source will now generate urns based on `env` properly. If you have +been setting `env` in your recipe to something besides `PROD`, we will now generate urns +with that new env variable, invalidating your existing urns. ### Potential Downtime diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 16820c37d546e..c481eda648327 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -1,7 +1,7 @@ import logging import os from datetime import datetime, timedelta, timezone -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional import pydantic from pydantic import Field @@ -132,6 +132,14 @@ class UnityCatalogSourceConfig( _metastore_id_pattern_removed = pydantic_removed_field("metastore_id_pattern") + catalogs: Optional[List[str]] = pydantic.Field( + default=None, + description=( + "Fixed list of catalogs to ingest." + " If not specified, catalogs will be ingested based on `catalog_pattern`." + ), + ) + catalog_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description="Regex patterns for catalogs to filter in ingestion. Specify regex to match the full `metastore.catalog` name.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 3fb77ce512ed2..375c76db8e971 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -112,6 +112,15 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: for catalog in response: yield self._create_catalog(metastore, catalog) + def catalog( + self, catalog_name: str, metastore: Optional[Metastore] + ) -> Optional[Catalog]: + response = self._workspace_client.catalogs.get(catalog_name) + if not response: + logger.info(f"Catalog {catalog_name} not found") + return None + return self._create_catalog(metastore, response) + def schemas(self, catalog: Catalog) -> Iterable[Schema]: response = self._workspace_client.schemas.list(catalog_name=catalog.name) if not response: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index b63cf65d55dc8..44b5bbbcb0ceb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -188,9 +188,10 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - self.report.report_ingestion_stage_start("Start warehouse") + self.report.report_ingestion_stage_start("Ingestion Setup") wait_on_warehouse = None if self.config.is_profiling_enabled(): + self.report.report_ingestion_stage_start("Start warehouse") # Can take several minutes, so start now and wait later wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() if wait_on_warehouse is None: @@ -200,8 +201,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) return - self.report.report_ingestion_stage_start("Ingest service principals") - self.build_service_principal_map() + if self.config.include_ownership: + self.report.report_ingestion_stage_start("Ingest service principals") + self.build_service_principal_map() if self.config.include_notebooks: self.report.report_ingestion_stage_start("Ingest notebooks") yield from self.process_notebooks() @@ -317,7 +319,7 @@ def process_metastores(self) -> Iterable[MetadataWorkUnit]: def process_catalogs( self, metastore: Optional[Metastore] ) -> Iterable[MetadataWorkUnit]: - for catalog in self.unity_catalog_api_proxy.catalogs(metastore=metastore): + for catalog in self._get_catalogs(metastore): if not self.config.catalog_pattern.allowed(catalog.id): self.report.catalogs.dropped(catalog.id) continue @@ -327,6 +329,17 @@ def process_catalogs( self.report.catalogs.processed(catalog.id) + def _get_catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: + if self.config.catalogs: + for catalog_name in self.config.catalogs: + catalog = self.unity_catalog_api_proxy.catalog( + catalog_name, metastore=metastore + ) + if catalog: + yield catalog + else: + yield from self.unity_catalog_api_proxy.catalogs(metastore=metastore) + def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: for schema in self.unity_catalog_api_proxy.schemas(catalog=catalog): if not self.config.schema_pattern.allowed(schema.id): @@ -509,6 +522,7 @@ def gen_dataset_urn(self, table_ref: TableReference) -> str: platform=self.platform, platform_instance=self.platform_instance_name, name=str(table_ref), + env=self.config.env, ) def gen_notebook_urn(self, notebook: Union[Notebook, NotebookId]) -> str: @@ -576,6 +590,7 @@ def gen_schema_key(self, schema: Schema) -> ContainerKey: instance=self.config.platform_instance, catalog=schema.catalog.name, metastore=schema.catalog.metastore.name, + env=self.config.env, ) else: return UnitySchemaKey( @@ -583,6 +598,7 @@ def gen_schema_key(self, schema: Schema) -> ContainerKey: platform=self.platform, instance=self.config.platform_instance, catalog=schema.catalog.name, + env=self.config.env, ) def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey: @@ -590,6 +606,7 @@ def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey: metastore=metastore.name, platform=self.platform, instance=self.config.platform_instance, + env=self.config.env, ) def gen_catalog_key(self, catalog: Catalog) -> ContainerKey: @@ -600,12 +617,14 @@ def gen_catalog_key(self, catalog: Catalog) -> ContainerKey: metastore=catalog.metastore.name, platform=self.platform, instance=self.config.platform_instance, + env=self.config.env, ) else: return CatalogKey( catalog=catalog.name, platform=self.platform, instance=self.config.platform_instance, + env=self.config.env, ) def _gen_domain_urn(self, dataset_name: str) -> Optional[str]: