From 13508a9d888df519a389b6bd187b5f745772627b Mon Sep 17 00:00:00 2001 From: Upendra Rao Vedullapalli Date: Wed, 4 Oct 2023 15:20:51 +0200 Subject: [PATCH 1/3] feat(bigquery): excluding projects without any datasets from ingestion (#8535) Co-authored-by: Upendra Vedullapalli Co-authored-by: Andrew Sikowitz --- .../ingestion/source/bigquery_v2/bigquery.py | 19 +++++-- .../source/bigquery_v2/bigquery_config.py | 5 ++ .../source/bigquery_v2/bigquery_report.py | 2 + .../tests/unit/test_bigquery_source.py | 53 ++++++++++++++++++- 4 files changed, 72 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index f6adbcf033bcc..fee181864a2d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -600,9 +600,6 @@ def _process_project( db_views: Dict[str, List[BigqueryView]] = {} project_id = bigquery_project.id - - yield from self.gen_project_id_containers(project_id) - try: bigquery_project.datasets = ( self.bigquery_data_dictionary.get_datasets_for_project_id(project_id) @@ -619,11 +616,23 @@ def _process_project( return None if len(bigquery_project.datasets) == 0: - logger.warning( - f"No dataset found in {project_id}. Either there are no datasets in this project or missing bigquery.datasets.get permission. You can assign predefined roles/bigquery.metadataViewer role to your service account." + more_info = ( + "Either there are no datasets in this project or missing bigquery.datasets.get permission. " + "You can assign predefined roles/bigquery.metadataViewer role to your service account." ) + if self.config.exclude_empty_projects: + self.report.report_dropped(project_id) + warning_message = f"Excluded project '{project_id}' since no were datasets found. {more_info}" + else: + yield from self.gen_project_id_containers(project_id) + warning_message = ( + f"No datasets found in project '{project_id}'. {more_info}" + ) + logger.warning(warning_message) return + yield from self.gen_project_id_containers(project_id) + self.report.num_project_datasets_to_scan[project_id] = len( bigquery_project.datasets ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 3b06a4699c566..483355a85ac05 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -265,6 +265,11 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool: description="Maximum number of entries for the in-memory caches of FileBacked data structures.", ) + exclude_empty_projects: bool = Field( + default=False, + description="Option to exclude empty projects from being ingested.", + ) + @root_validator(pre=False) def profile_default_settings(cls, values: Dict) -> Dict: # Extra default SQLAlchemy option for better connection pooling and threading. diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 661589a0c58e5..9d92b011ee285 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -122,6 +122,8 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR usage_state_size: Optional[str] = None + exclude_empty_projects: Optional[bool] = None + schema_api_perf: BigQuerySchemaApiPerfReport = field( default_factory=BigQuerySchemaApiPerfReport ) diff --git a/metadata-ingestion/tests/unit/test_bigquery_source.py b/metadata-ingestion/tests/unit/test_bigquery_source.py index 4fc6c31626ba8..e9e91361f49f4 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_source.py +++ b/metadata-ingestion/tests/unit/test_bigquery_source.py @@ -3,13 +3,14 @@ import os from datetime import datetime, timedelta, timezone from types import SimpleNamespace -from typing import Any, Dict, Optional, cast +from typing import Any, Dict, List, Optional, cast from unittest.mock import MagicMock, Mock, patch import pytest from google.api_core.exceptions import GoogleAPICallError from google.cloud.bigquery.table import Row, TableListItem +from datahub.configuration.common import AllowDenyPattern from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.bigquery_v2.bigquery import BigqueryV2Source from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( @@ -17,9 +18,13 @@ BigqueryTableIdentifier, BigQueryTableRef, ) -from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config +from datahub.ingestion.source.bigquery_v2.bigquery_config import ( + BigQueryConnectionConfig, + BigQueryV2Config, +) from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.bigquery_schema import ( + BigqueryDataset, BigqueryProject, BigQuerySchemaApi, BigqueryView, @@ -854,3 +859,47 @@ def test_get_table_name(full_table_name: str, datahub_full_table_name: str) -> N BigqueryTableIdentifier.from_string_name(full_table_name).get_table_name() == datahub_full_table_name ) + + +def test_default_config_for_excluding_projects_and_datasets(): + config = BigQueryV2Config.parse_obj({}) + assert config.exclude_empty_projects is False + config = BigQueryV2Config.parse_obj({"exclude_empty_projects": True}) + assert config.exclude_empty_projects + + +@patch.object(BigQueryConnectionConfig, "get_bigquery_client", new=lambda self: None) +@patch.object(BigQuerySchemaApi, "get_datasets_for_project_id") +def test_excluding_empty_projects_from_ingestion( + get_datasets_for_project_id_mock, +): + project_id_with_datasets = "project-id-with-datasets" + project_id_without_datasets = "project-id-without-datasets" + + def get_datasets_for_project_id_side_effect( + project_id: str, + ) -> List[BigqueryDataset]: + return ( + [] + if project_id == project_id_without_datasets + else [BigqueryDataset("some-dataset")] + ) + + get_datasets_for_project_id_mock.side_effect = ( + get_datasets_for_project_id_side_effect + ) + + base_config = { + "project_ids": [project_id_with_datasets, project_id_without_datasets], + "schema_pattern": AllowDenyPattern(deny=[".*"]), + "include_usage_statistics": False, + "include_table_lineage": False, + } + + config = BigQueryV2Config.parse_obj(base_config) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-1")) + assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 2 # type: ignore + + config = BigQueryV2Config.parse_obj({**base_config, "exclude_empty_projects": True}) + source = BigqueryV2Source(config=config, ctx=PipelineContext(run_id="test-2")) + assert len({wu.metadata.entityUrn for wu in source.get_workunits()}) == 1 # type: ignore From d3346a04e486fa098129b626e61013cab4f69350 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 4 Oct 2023 10:22:45 -0400 Subject: [PATCH 2/3] feat(ingest/unity): Ingest notebooks and their lineage (#8940) --- .../sources/databricks/unity-catalog_pre.md | 1 + metadata-ingestion/setup.py | 2 +- .../src/datahub/emitter/mcp_builder.py | 12 ++ .../ingestion/source/common/subtypes.py | 3 + .../datahub/ingestion/source/unity/config.py | 9 +- .../datahub/ingestion/source/unity/proxy.py | 89 +++++++---- .../ingestion/source/unity/proxy_types.py | 45 +++++- .../datahub/ingestion/source/unity/report.py | 8 +- .../datahub/ingestion/source/unity/source.py | 148 ++++++++++++++---- .../datahub/ingestion/source/unity/usage.py | 12 +- 10 files changed, 257 insertions(+), 72 deletions(-) diff --git a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md index 2be8846b87bea..ae2883343d7e8 100644 --- a/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md +++ b/metadata-ingestion/docs/sources/databricks/unity-catalog_pre.md @@ -13,6 +13,7 @@ * Ownership of or `SELECT` privilege on any tables and views you want to ingest * [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html) * [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html) + + To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions). + To `include_usage_statistics` (enabled by default), your service principal must have `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html). + To ingest `profiling` information with `call_analyze` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile. * Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`. diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 34afa8cdb39a4..fe8e3be4632c4 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -250,7 +250,7 @@ databricks = { # 0.1.11 appears to have authentication issues with azure databricks - "databricks-sdk>=0.1.1, != 0.1.11", + "databricks-sdk>=0.9.0", "pyspark", "requests", } diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 844a29f1c78a3..7419577b367aa 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -9,6 +9,7 @@ make_container_urn, make_data_platform_urn, make_dataplatform_instance_urn, + make_dataset_urn_with_platform_instance, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit @@ -125,6 +126,17 @@ class BucketKey(ContainerKey): bucket_name: str +class NotebookKey(DatahubKey): + notebook_id: int + platform: str + instance: Optional[str] + + def as_urn(self) -> str: + return make_dataset_urn_with_platform_instance( + platform=self.platform, platform_instance=self.instance, name=self.guid() + ) + + class DatahubKeyJSONEncoder(json.JSONEncoder): # overload method default def default(self, obj: Any) -> Any: diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index a2d89d26112f4..741b4789bef21 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -16,6 +16,9 @@ class DatasetSubTypes(str, Enum): SALESFORCE_STANDARD_OBJECT = "Object" POWERBI_DATASET_TABLE = "PowerBI Dataset Table" + # TODO: Create separate entity... + NOTEBOOK = "Notebook" + class DatasetContainerSubTypes(str, Enum): # Generic SubTypes diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 94ff755e3b254..a49c789a82f27 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -127,11 +127,16 @@ class UnityCatalogSourceConfig( description='Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.', ) - include_table_lineage: Optional[bool] = pydantic.Field( + include_table_lineage: bool = pydantic.Field( default=True, description="Option to enable/disable lineage generation.", ) + include_notebooks: bool = pydantic.Field( + default=False, + description="Ingest notebooks, represented as DataHub datasets.", + ) + include_ownership: bool = pydantic.Field( default=False, description="Option to enable/disable ownership generation for metastores, catalogs, schemas, and tables.", @@ -141,7 +146,7 @@ class UnityCatalogSourceConfig( "include_table_ownership", "include_ownership" ) - include_column_lineage: Optional[bool] = pydantic.Field( + include_column_lineage: bool = pydantic.Field( default=True, description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index e92f4ff07b1ad..2401f1c3d163c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -23,6 +23,7 @@ QueryStatementType, QueryStatus, ) +from databricks.sdk.service.workspace import ObjectType import datahub from datahub.ingestion.source.unity.proxy_profiling import ( @@ -33,6 +34,7 @@ Catalog, Column, Metastore, + Notebook, Query, Schema, ServicePrincipal, @@ -137,6 +139,21 @@ def service_principals(self) -> Iterable[ServicePrincipal]: for principal in self._workspace_client.service_principals.list(): yield self._create_service_principal(principal) + def workspace_notebooks(self) -> Iterable[Notebook]: + for obj in self._workspace_client.workspace.list("/", recursive=True): + if obj.object_type == ObjectType.NOTEBOOK: + yield Notebook( + id=obj.object_id, + path=obj.path, + language=obj.language, + created_at=datetime.fromtimestamp( + obj.created_at / 1000, tz=timezone.utc + ), + modified_at=datetime.fromtimestamp( + obj.modified_at / 1000, tz=timezone.utc + ), + ) + def query_history( self, start_time: datetime, @@ -153,7 +170,7 @@ def query_history( "start_time_ms": start_time.timestamp() * 1000, "end_time_ms": end_time.timestamp() * 1000, }, - "statuses": [QueryStatus.FINISHED.value], + "statuses": [QueryStatus.FINISHED], "statement_types": [typ.value for typ in ALLOWED_STATEMENT_TYPES], } ) @@ -196,61 +213,75 @@ def _query_history( method, path, body={**body, "page_token": response["next_page_token"]} ) - def list_lineages_by_table(self, table_name: str) -> dict: + def list_lineages_by_table( + self, table_name: str, include_entity_lineage: bool + ) -> dict: """List table lineage by table name.""" return self._workspace_client.api_client.do( method="GET", - path="/api/2.0/lineage-tracking/table-lineage/get", - body={"table_name": table_name}, + path="/api/2.0/lineage-tracking/table-lineage", + body={ + "table_name": table_name, + "include_entity_lineage": include_entity_lineage, + }, ) def list_lineages_by_column(self, table_name: str, column_name: str) -> dict: """List column lineage by table name and column name.""" return self._workspace_client.api_client.do( "GET", - "/api/2.0/lineage-tracking/column-lineage/get", + "/api/2.0/lineage-tracking/column-lineage", body={"table_name": table_name, "column_name": column_name}, ) - def table_lineage(self, table: Table) -> None: + def table_lineage( + self, table: Table, include_entity_lineage: bool + ) -> Optional[dict]: # Lineage endpoint doesn't exists on 2.1 version try: response: dict = self.list_lineages_by_table( - table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" + table_name=table.ref.qualified_table_name, + include_entity_lineage=include_entity_lineage, ) - table.upstreams = { - TableReference( - table.schema.catalog.metastore.id, - item["catalog_name"], - item["schema_name"], - item["name"], - ): {} - for item in response.get("upstream_tables", []) - } + + for item in response.get("upstreams") or []: + if "tableInfo" in item: + table_ref = TableReference.create_from_lineage( + item["tableInfo"], table.schema.catalog.metastore.id + ) + if table_ref: + table.upstreams[table_ref] = {} + for notebook in item.get("notebookInfos") or []: + table.upstream_notebooks.add(notebook["notebook_id"]) + + for item in response.get("downstreams") or []: + for notebook in item.get("notebookInfos") or []: + table.downstream_notebooks.add(notebook["notebook_id"]) + + return response except Exception as e: logger.error(f"Error getting lineage: {e}") + return None - def get_column_lineage(self, table: Table) -> None: + def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None: try: - table_lineage_response: dict = self.list_lineages_by_table( - table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" + table_lineage = self.table_lineage( + table, include_entity_lineage=include_entity_lineage ) - if table_lineage_response: + if table_lineage: for column in table.columns: response: dict = self.list_lineages_by_column( - table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}", + table_name=table.ref.qualified_table_name, column_name=column.name, ) for item in response.get("upstream_cols", []): - table_ref = TableReference( - table.schema.catalog.metastore.id, - item["catalog_name"], - item["schema_name"], - item["table_name"], + table_ref = TableReference.create_from_lineage( + item, table.schema.catalog.metastore.id ) - table.upstreams.setdefault(table_ref, {}).setdefault( - column.name, [] - ).append(item["name"]) + if table_ref: + table.upstreams.setdefault(table_ref, {}).setdefault( + column.name, [] + ).append(item["name"]) except Exception as e: logger.error(f"Error getting lineage: {e}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index 2b943d8c98e7d..d57f20245913f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -1,8 +1,10 @@ # Supported types are available at # https://api-docs.databricks.com/rest/latest/unity-catalog-api-specification-2-1.html?_ga=2.151019001.1795147704.1666247755-2119235717.1666247755 +import dataclasses +import logging from dataclasses import dataclass, field from datetime import datetime -from typing import Dict, List, Optional +from typing import Dict, FrozenSet, List, Optional, Set from databricks.sdk.service.catalog import ( CatalogType, @@ -11,6 +13,7 @@ TableType, ) from databricks.sdk.service.sql import QueryStatementType +from databricks.sdk.service.workspace import Language from datahub.metadata.schema_classes import ( ArrayTypeClass, @@ -26,6 +29,8 @@ TimeTypeClass, ) +logger = logging.getLogger(__name__) + DATA_TYPE_REGISTRY: dict = { ColumnTypeName.BOOLEAN: BooleanTypeClass, ColumnTypeName.BYTE: BytesTypeClass, @@ -66,6 +71,9 @@ ALLOWED_STATEMENT_TYPES = {*OPERATION_STATEMENT_TYPES.keys(), QueryStatementType.SELECT} +NotebookId = int + + @dataclass class CommonProperty: id: str @@ -136,6 +144,19 @@ def create(cls, table: "Table") -> "TableReference": table.name, ) + @classmethod + def create_from_lineage(cls, d: dict, metastore: str) -> Optional["TableReference"]: + try: + return cls( + metastore, + d["catalog_name"], + d["schema_name"], + d.get("table_name", d["name"]), # column vs table query output + ) + except Exception as e: + logger.warning(f"Failed to create TableReference from {d}: {e}") + return None + def __str__(self) -> str: return f"{self.metastore}.{self.catalog}.{self.schema}.{self.table}" @@ -166,6 +187,8 @@ class Table(CommonProperty): view_definition: Optional[str] properties: Dict[str, str] upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict) + upstream_notebooks: Set[NotebookId] = field(default_factory=set) + downstream_notebooks: Set[NotebookId] = field(default_factory=set) ref: TableReference = field(init=False) @@ -228,3 +251,23 @@ def __bool__(self): self.max is not None, ) ) + + +@dataclass +class Notebook: + id: NotebookId + path: str + language: Language + created_at: datetime + modified_at: datetime + + upstreams: FrozenSet[TableReference] = field(default_factory=frozenset) + + @classmethod + def add_upstream(cls, upstream: TableReference, notebook: "Notebook") -> "Notebook": + return cls( + **{ # type: ignore + **dataclasses.asdict(notebook), + "upstreams": frozenset([*notebook.upstreams, upstream]), + } + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index 8382b31a56add..808172a136bb3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -5,21 +5,23 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalSourceReport, ) +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.utilities.lossy_collections import LossyDict, LossyList @dataclass -class UnityCatalogReport(StaleEntityRemovalSourceReport): +class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport): metastores: EntityFilterReport = EntityFilterReport.field(type="metastore") catalogs: EntityFilterReport = EntityFilterReport.field(type="catalog") schemas: EntityFilterReport = EntityFilterReport.field(type="schema") tables: EntityFilterReport = EntityFilterReport.field(type="table/view") table_profiles: EntityFilterReport = EntityFilterReport.field(type="table profile") + notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook") num_queries: int = 0 num_queries_dropped_parse_failure: int = 0 - num_queries_dropped_missing_table: int = 0 # Can be due to pattern filter - num_queries_dropped_duplicate_table: int = 0 + num_queries_missing_table: int = 0 # Can be due to pattern filter + num_queries_duplicate_table: int = 0 num_queries_parsed_by_spark_plan: int = 0 # Distinguish from Operations emitted for created / updated timestamps diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 493acb939c3bb..f2da1aece9fd4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -2,7 +2,7 @@ import re import time from datetime import timedelta -from typing import Dict, Iterable, List, Optional, Set +from typing import Dict, Iterable, List, Optional, Set, Union from urllib.parse import urljoin from datahub.emitter.mce_builder import ( @@ -18,6 +18,7 @@ CatalogKey, ContainerKey, MetastoreKey, + NotebookKey, UnitySchemaKey, add_dataset_to_container, gen_containers, @@ -56,6 +57,8 @@ Catalog, Column, Metastore, + Notebook, + NotebookId, Schema, ServicePrincipal, Table, @@ -69,6 +72,7 @@ ViewProperties, ) from datahub.metadata.schema_classes import ( + BrowsePathsClass, DataPlatformInstanceClass, DatasetLineageTypeClass, DatasetPropertiesClass, @@ -88,6 +92,7 @@ UpstreamClass, UpstreamLineageClass, ) +from datahub.utilities.file_backed_collections import FileBackedDict from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column from datahub.utilities.registries.domain_registry import DomainRegistry @@ -157,6 +162,7 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig): # Global set of table refs self.table_refs: Set[TableReference] = set() self.view_refs: Set[TableReference] = set() + self.notebooks: FileBackedDict[Notebook] = FileBackedDict() @staticmethod def test_connection(config_dict: dict) -> TestConnectionReport: @@ -176,6 +182,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + self.report.report_ingestion_stage_start("Start warehouse") wait_on_warehouse = None if self.config.is_profiling_enabled(): # Can take several minutes, so start now and wait later @@ -187,10 +194,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) return + self.report.report_ingestion_stage_start("Ingest service principals") self.build_service_principal_map() + if self.config.include_notebooks: + self.report.report_ingestion_stage_start("Ingest notebooks") + yield from self.process_notebooks() + yield from self.process_metastores() + if self.config.include_notebooks: + self.report.report_ingestion_stage_start("Notebook lineage") + for notebook in self.notebooks.values(): + wu = self._gen_notebook_lineage(notebook) + if wu: + yield wu + if self.config.include_usage_statistics: + self.report.report_ingestion_stage_start("Ingest usage") usage_extractor = UnityCatalogUsageExtractor( config=self.config, report=self.report, @@ -203,6 +223,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) if self.config.is_profiling_enabled(): + self.report.report_ingestion_stage_start("Wait on warehouse") assert wait_on_warehouse timeout = timedelta(seconds=self.config.profiling.max_wait_secs) wait_on_warehouse.result(timeout) @@ -212,6 +233,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.unity_catalog_api_proxy, self.gen_dataset_urn, ) + self.report.report_ingestion_stage_start("Profiling") yield from profiling_extractor.get_workunits(self.table_refs) def build_service_principal_map(self) -> None: @@ -223,6 +245,56 @@ def build_service_principal_map(self) -> None: "service-principals", f"Unable to fetch service principals: {e}" ) + def process_notebooks(self) -> Iterable[MetadataWorkUnit]: + for notebook in self.unity_catalog_api_proxy.workspace_notebooks(): + self.notebooks[str(notebook.id)] = notebook + yield from self._gen_notebook_aspects(notebook) + + def _gen_notebook_aspects(self, notebook: Notebook) -> Iterable[MetadataWorkUnit]: + mcps = MetadataChangeProposalWrapper.construct_many( + entityUrn=self.gen_notebook_urn(notebook), + aspects=[ + DatasetPropertiesClass( + name=notebook.path.rsplit("/", 1)[-1], + customProperties={ + "path": notebook.path, + "language": notebook.language.value, + }, + externalUrl=urljoin( + self.config.workspace_url, f"#notebook/{notebook.id}" + ), + created=TimeStampClass(int(notebook.created_at.timestamp() * 1000)), + lastModified=TimeStampClass( + int(notebook.modified_at.timestamp() * 1000) + ), + ), + SubTypesClass(typeNames=[DatasetSubTypes.NOTEBOOK]), + BrowsePathsClass(paths=notebook.path.split("/")), + # TODO: Add DPI aspect + ], + ) + for mcp in mcps: + yield mcp.as_workunit() + + self.report.notebooks.processed(notebook.path) + + def _gen_notebook_lineage(self, notebook: Notebook) -> Optional[MetadataWorkUnit]: + if not notebook.upstreams: + return None + + return MetadataChangeProposalWrapper( + entityUrn=self.gen_notebook_urn(notebook), + aspect=UpstreamLineageClass( + upstreams=[ + UpstreamClass( + dataset=self.gen_dataset_urn(upstream_ref), + type=DatasetLineageTypeClass.COPY, + ) + for upstream_ref in notebook.upstreams + ] + ), + ).as_workunit() + def process_metastores(self) -> Iterable[MetadataWorkUnit]: metastore = self.unity_catalog_api_proxy.assigned_metastore() yield from self.gen_metastore_containers(metastore) @@ -247,6 +319,7 @@ def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: self.report.schemas.dropped(schema.id) continue + self.report.report_ingestion_stage_start(f"Ingest schema {schema.id}") yield from self.gen_schema_containers(schema) yield from self.process_tables(schema) @@ -282,13 +355,21 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn ownership = self._create_table_ownership_aspect(table) data_platform_instance = self._create_data_platform_instance_aspect(table) - lineage: Optional[UpstreamLineageClass] = None if self.config.include_column_lineage: - self.unity_catalog_api_proxy.get_column_lineage(table) - lineage = self._generate_column_lineage_aspect(dataset_urn, table) + self.unity_catalog_api_proxy.get_column_lineage( + table, include_entity_lineage=self.config.include_notebooks + ) elif self.config.include_table_lineage: - self.unity_catalog_api_proxy.table_lineage(table) - lineage = self._generate_lineage_aspect(dataset_urn, table) + self.unity_catalog_api_proxy.table_lineage( + table, include_entity_lineage=self.config.include_notebooks + ) + lineage = self._generate_lineage_aspect(dataset_urn, table) + + if self.config.include_notebooks: + for notebook_id in table.downstream_notebooks: + self.notebooks[str(notebook_id)] = Notebook.add_upstream( + table.ref, self.notebooks[str(notebook_id)] + ) yield from [ mcp.as_workunit() @@ -308,7 +389,7 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn ) ] - def _generate_column_lineage_aspect( + def _generate_lineage_aspect( self, dataset_urn: str, table: Table ) -> Optional[UpstreamLineageClass]: upstreams: List[UpstreamClass] = [] @@ -318,6 +399,7 @@ def _generate_column_lineage_aspect( ): upstream_urn = self.gen_dataset_urn(upstream_ref) + # Should be empty if config.include_column_lineage is False finegrained_lineages.extend( FineGrainedLineage( upstreamType=FineGrainedLineageUpstreamType.FIELD_SET, @@ -331,38 +413,28 @@ def _generate_column_lineage_aspect( for d_col, u_cols in sorted(downstream_to_upstream_cols.items()) ) - upstream_table = UpstreamClass( - upstream_urn, - DatasetLineageTypeClass.TRANSFORMED, - ) - upstreams.append(upstream_table) - - if upstreams: - return UpstreamLineageClass( - upstreams=upstreams, fineGrainedLineages=finegrained_lineages - ) - else: - return None - - def _generate_lineage_aspect( - self, dataset_urn: str, table: Table - ) -> Optional[UpstreamLineageClass]: - upstreams: List[UpstreamClass] = [] - for upstream in sorted(table.upstreams.keys()): - upstream_urn = make_dataset_urn_with_platform_instance( - self.platform, - f"{table.schema.catalog.metastore.id}.{upstream}", - self.platform_instance_name, + upstreams.append( + UpstreamClass( + dataset=upstream_urn, + type=DatasetLineageTypeClass.TRANSFORMED, + ) ) - upstream_table = UpstreamClass( - upstream_urn, - DatasetLineageTypeClass.TRANSFORMED, + for notebook in table.upstream_notebooks: + upstreams.append( + UpstreamClass( + dataset=self.gen_notebook_urn(notebook), + type=DatasetLineageTypeClass.TRANSFORMED, + ) ) - upstreams.append(upstream_table) if upstreams: - return UpstreamLineageClass(upstreams=upstreams) + return UpstreamLineageClass( + upstreams=upstreams, + fineGrainedLineages=finegrained_lineages + if self.config.include_column_lineage + else None, + ) else: return None @@ -389,6 +461,14 @@ def gen_dataset_urn(self, table_ref: TableReference) -> str: name=str(table_ref), ) + def gen_notebook_urn(self, notebook: Union[Notebook, NotebookId]) -> str: + notebook_id = notebook.id if isinstance(notebook, Notebook) else notebook + return NotebookKey( + notebook_id=notebook_id, + platform=self.platform, + instance=self.config.platform_instance, + ).as_urn() + def gen_schema_containers(self, schema: Schema) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(f"{schema.catalog.name}.{schema.name}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index 49f56b46fb012..ab21c1a318659 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -214,12 +214,15 @@ def _resolve_tables( self, tables: List[str], table_map: TableMap ) -> List[TableReference]: """Resolve tables to TableReferences, filtering out unrecognized or unresolvable table names.""" + + missing_table = False + duplicate_table = False output = [] for table in tables: table = str(table) if table not in table_map: logger.debug(f"Dropping query with unrecognized table: {table}") - self.report.num_queries_dropped_missing_table += 1 + missing_table = True else: refs = table_map[table] if len(refs) == 1: @@ -228,6 +231,11 @@ def _resolve_tables( logger.warning( f"Could not resolve table ref for {table}: {len(refs)} duplicates." ) - self.report.num_queries_dropped_duplicate_table += 1 + duplicate_table = True + + if missing_table: + self.report.num_queries_missing_table += 1 + if duplicate_table: + self.report.num_queries_duplicate_table += 1 return output From 301d3e6b1ccffaf946f128766578faddbc7ac44e Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Wed, 4 Oct 2023 10:23:13 -0400 Subject: [PATCH 3/3] test(ingest/unity): Add Unity Catalog memory performance testing (#8932) --- .../ingestion/source/unity/proxy_types.py | 1 - .../tests/performance/bigquery/__init__.py | 0 .../bigquery_events.py} | 0 .../{ => bigquery}/test_bigquery_usage.py | 22 +-- .../tests/performance/data_generation.py | 53 ++++- .../tests/performance/data_model.py | 31 ++- .../tests/performance/databricks/__init__.py | 0 .../performance/databricks/test_unity.py | 71 +++++++ .../databricks/unity_proxy_mock.py | 183 ++++++++++++++++++ .../tests/performance/helpers.py | 21 ++ .../tests/unit/test_bigquery_usage.py | 7 +- 11 files changed, 356 insertions(+), 33 deletions(-) create mode 100644 metadata-ingestion/tests/performance/bigquery/__init__.py rename metadata-ingestion/tests/performance/{bigquery.py => bigquery/bigquery_events.py} (100%) rename metadata-ingestion/tests/performance/{ => bigquery}/test_bigquery_usage.py (80%) create mode 100644 metadata-ingestion/tests/performance/databricks/__init__.py create mode 100644 metadata-ingestion/tests/performance/databricks/test_unity.py create mode 100644 metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py create mode 100644 metadata-ingestion/tests/performance/helpers.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index d57f20245913f..54ac2e90d7c7e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -175,7 +175,6 @@ class Table(CommonProperty): columns: List[Column] storage_location: Optional[str] data_source_format: Optional[DataSourceFormat] - comment: Optional[str] table_type: TableType owner: Optional[str] generation: Optional[int] diff --git a/metadata-ingestion/tests/performance/bigquery/__init__.py b/metadata-ingestion/tests/performance/bigquery/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/performance/bigquery.py b/metadata-ingestion/tests/performance/bigquery/bigquery_events.py similarity index 100% rename from metadata-ingestion/tests/performance/bigquery.py rename to metadata-ingestion/tests/performance/bigquery/bigquery_events.py diff --git a/metadata-ingestion/tests/performance/test_bigquery_usage.py b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py similarity index 80% rename from metadata-ingestion/tests/performance/test_bigquery_usage.py rename to metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py index 7e05ef070b45d..bbc3378450bff 100644 --- a/metadata-ingestion/tests/performance/test_bigquery_usage.py +++ b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py @@ -2,13 +2,11 @@ import os import random from datetime import timedelta -from typing import Iterable, Tuple import humanfriendly import psutil from datahub.emitter.mce_builder import make_dataset_urn -from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.bigquery_v2.bigquery_config import ( BigQueryUsageConfig, BigQueryV2Config, @@ -16,12 +14,13 @@ from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor from datahub.utilities.perf_timer import PerfTimer -from tests.performance.bigquery import generate_events, ref_from_table +from tests.performance.bigquery.bigquery_events import generate_events, ref_from_table from tests.performance.data_generation import ( NormalDistribution, generate_data, generate_queries, ) +from tests.performance.helpers import workunit_sink def run_test(): @@ -33,7 +32,7 @@ def run_test(): num_views=2000, time_range=timedelta(days=7), ) - all_tables = seed_metadata.tables + seed_metadata.views + all_tables = seed_metadata.all_tables config = BigQueryV2Config( start_time=seed_metadata.start_time, @@ -88,21 +87,6 @@ def run_test(): print(f"Hash collisions: {report.num_usage_query_hash_collisions}") -def workunit_sink(workunits: Iterable[MetadataWorkUnit]) -> Tuple[int, int]: - peak_memory_usage = psutil.Process(os.getpid()).memory_info().rss - i: int = 0 - for i, wu in enumerate(workunits): - if i % 10_000 == 0: - peak_memory_usage = max( - peak_memory_usage, psutil.Process(os.getpid()).memory_info().rss - ) - peak_memory_usage = max( - peak_memory_usage, psutil.Process(os.getpid()).memory_info().rss - ) - - return i, peak_memory_usage - - if __name__ == "__main__": root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) diff --git a/metadata-ingestion/tests/performance/data_generation.py b/metadata-ingestion/tests/performance/data_generation.py index c530848f27f5c..67b156896909a 100644 --- a/metadata-ingestion/tests/performance/data_generation.py +++ b/metadata-ingestion/tests/performance/data_generation.py @@ -11,11 +11,14 @@ import uuid from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from typing import Iterable, List, TypeVar +from typing import Iterable, List, TypeVar, Union, cast from faker import Faker from tests.performance.data_model import ( + Column, + ColumnMapping, + ColumnType, Container, FieldAccess, Query, @@ -52,15 +55,21 @@ def sample_with_floor(self, floor: int = 1) -> int: @dataclass class SeedMetadata: - containers: List[Container] + # Each list is a layer of containers, e.g. [[databases], [schemas]] + containers: List[List[Container]] + tables: List[Table] views: List[View] start_time: datetime end_time: datetime + @property + def all_tables(self) -> List[Table]: + return self.tables + cast(List[Table], self.views) + def generate_data( - num_containers: int, + num_containers: Union[List[int], int], num_tables: int, num_views: int, columns_per_table: NormalDistribution = NormalDistribution(5, 2), @@ -68,32 +77,52 @@ def generate_data( view_definition_length: NormalDistribution = NormalDistribution(150, 50), time_range: timedelta = timedelta(days=14), ) -> SeedMetadata: - containers = [Container(f"container-{i}") for i in range(num_containers)] + # Assemble containers + if isinstance(num_containers, int): + num_containers = [num_containers] + + containers: List[List[Container]] = [] + for i, num_in_layer in enumerate(num_containers): + layer = [ + Container( + f"{i}-container-{j}", + parent=random.choice(containers[-1]) if containers else None, + ) + for j in range(num_in_layer) + ] + containers.append(layer) + + # Assemble tables tables = [ Table( f"table-{i}", - container=random.choice(containers), + container=random.choice(containers[-1]), columns=[ f"column-{j}-{uuid.uuid4()}" for j in range(columns_per_table.sample_with_floor()) ], + column_mapping=None, ) for i in range(num_tables) ] views = [ View( f"view-{i}", - container=random.choice(containers), + container=random.choice(containers[-1]), columns=[ f"column-{j}-{uuid.uuid4()}" for j in range(columns_per_table.sample_with_floor()) ], + column_mapping=None, definition=f"{uuid.uuid4()}-{'*' * view_definition_length.sample_with_floor(10)}", parents=random.sample(tables, parents_per_view.sample_with_floor()), ) for i in range(num_views) ] + for table in tables + views: + _generate_column_mapping(table) + now = datetime.now(tz=timezone.utc) return SeedMetadata( containers=containers, @@ -162,6 +191,18 @@ def generate_queries( ) +def _generate_column_mapping(table: Table) -> ColumnMapping: + d = {} + for column in table.columns: + d[column] = Column( + name=column, + type=random.choice(list(ColumnType)), + nullable=random.random() < 0.1, # Fixed 10% chance for now + ) + table.column_mapping = d + return d + + def _sample_list(lst: List[T], dist: NormalDistribution, floor: int = 1) -> List[T]: return random.sample(lst, min(dist.sample_with_floor(floor), len(lst))) diff --git a/metadata-ingestion/tests/performance/data_model.py b/metadata-ingestion/tests/performance/data_model.py index c593e69ceb9a7..9425fa827070e 100644 --- a/metadata-ingestion/tests/performance/data_model.py +++ b/metadata-ingestion/tests/performance/data_model.py @@ -1,10 +1,10 @@ from dataclasses import dataclass from datetime import datetime -from typing import List, Optional +from enum import Enum +from typing import Dict, List, Optional from typing_extensions import Literal -Column = str StatementType = Literal[ # SELECT + values from OperationTypeClass "SELECT", "INSERT", @@ -21,13 +21,36 @@ @dataclass class Container: name: str + parent: Optional["Container"] = None + + +class ColumnType(str, Enum): + # Can add types that take parameters in the future + + INTEGER = "INTEGER" + FLOAT = "FLOAT" # Double precision (64 bit) + STRING = "STRING" + BOOLEAN = "BOOLEAN" + DATETIME = "DATETIME" + + +@dataclass +class Column: + name: str + type: ColumnType + nullable: bool + + +ColumnRef = str +ColumnMapping = Dict[ColumnRef, Column] @dataclass class Table: name: str container: Container - columns: List[Column] + columns: List[ColumnRef] + column_mapping: Optional[ColumnMapping] def is_view(self) -> bool: return False @@ -44,7 +67,7 @@ def is_view(self) -> bool: @dataclass class FieldAccess: - column: Column + column: ColumnRef table: Table diff --git a/metadata-ingestion/tests/performance/databricks/__init__.py b/metadata-ingestion/tests/performance/databricks/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/metadata-ingestion/tests/performance/databricks/test_unity.py b/metadata-ingestion/tests/performance/databricks/test_unity.py new file mode 100644 index 0000000000000..cc9558f0692ed --- /dev/null +++ b/metadata-ingestion/tests/performance/databricks/test_unity.py @@ -0,0 +1,71 @@ +import logging +import os +from unittest.mock import patch + +import humanfriendly +import psutil + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig +from datahub.ingestion.source.unity.source import UnityCatalogSource +from datahub.utilities.perf_timer import PerfTimer +from tests.performance.data_generation import ( + NormalDistribution, + generate_data, + generate_queries, +) +from tests.performance.databricks.unity_proxy_mock import UnityCatalogApiProxyMock +from tests.performance.helpers import workunit_sink + + +def run_test(): + seed_metadata = generate_data( + num_containers=[1, 100, 5000], + num_tables=50000, + num_views=10000, + columns_per_table=NormalDistribution(100, 50), + parents_per_view=NormalDistribution(5, 5), + view_definition_length=NormalDistribution(1000, 300), + ) + queries = generate_queries( + seed_metadata, + num_selects=100000, + num_operations=100000, + num_unique_queries=10000, + num_users=1000, + ) + proxy_mock = UnityCatalogApiProxyMock( + seed_metadata, queries=queries, num_service_principals=10000 + ) + print("Data generated") + + config = UnityCatalogSourceConfig( + token="", workspace_url="http://localhost:1234", include_usage_statistics=False + ) + ctx = PipelineContext(run_id="test") + with patch( + "datahub.ingestion.source.unity.source.UnityCatalogApiProxy", + lambda *args, **kwargs: proxy_mock, + ): + source: UnityCatalogSource = UnityCatalogSource(ctx, config) + + pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss + print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}") + + with PerfTimer() as timer: + workunits = source.get_workunits() + num_workunits, peak_memory_usage = workunit_sink(workunits) + print(f"Workunits Generated: {num_workunits}") + print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + + print( + f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" + ) + print(source.report.aspects) + + +if __name__ == "__main__": + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + root_logger.addHandler(logging.StreamHandler()) + run_test() diff --git a/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py b/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py new file mode 100644 index 0000000000000..593163e12bf0a --- /dev/null +++ b/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py @@ -0,0 +1,183 @@ +import uuid +from collections import defaultdict +from datetime import datetime, timezone +from typing import Dict, Iterable, List + +from databricks.sdk.service.catalog import ColumnTypeName +from databricks.sdk.service.sql import QueryStatementType + +from datahub.ingestion.source.unity.proxy_types import ( + Catalog, + CatalogType, + Column, + Metastore, + Query, + Schema, + ServicePrincipal, + Table, + TableType, +) +from tests.performance import data_model +from tests.performance.data_generation import SeedMetadata +from tests.performance.data_model import ColumnType, StatementType + + +class UnityCatalogApiProxyMock: + """Mimics UnityCatalogApiProxy for performance testing.""" + + def __init__( + self, + seed_metadata: SeedMetadata, + queries: Iterable[data_model.Query] = (), + num_service_principals: int = 0, + ) -> None: + self.seed_metadata = seed_metadata + self.queries = queries + self.num_service_principals = num_service_principals + self.warehouse_id = "invalid-warehouse-id" + + # Cache for performance + self._schema_to_table: Dict[str, List[data_model.Table]] = defaultdict(list) + for table in seed_metadata.all_tables: + self._schema_to_table[table.container.name].append(table) + + def check_basic_connectivity(self) -> bool: + return True + + def assigned_metastore(self) -> Metastore: + container = self.seed_metadata.containers[0][0] + return Metastore( + id=container.name, + name=container.name, + global_metastore_id=container.name, + metastore_id=container.name, + comment=None, + owner=None, + cloud=None, + region=None, + ) + + def catalogs(self, metastore: Metastore) -> Iterable[Catalog]: + for container in self.seed_metadata.containers[1]: + if not container.parent or metastore.name != container.parent.name: + continue + + yield Catalog( + id=f"{metastore.id}.{container.name}", + name=container.name, + metastore=metastore, + comment=None, + owner=None, + type=CatalogType.MANAGED_CATALOG, + ) + + def schemas(self, catalog: Catalog) -> Iterable[Schema]: + for container in self.seed_metadata.containers[2]: + # Assumes all catalog names are unique + if not container.parent or catalog.name != container.parent.name: + continue + + yield Schema( + id=f"{catalog.id}.{container.name}", + name=container.name, + catalog=catalog, + comment=None, + owner=None, + ) + + def tables(self, schema: Schema) -> Iterable[Table]: + for table in self._schema_to_table[schema.name]: + columns = [] + if table.column_mapping: + for i, col_name in enumerate(table.columns): + column = table.column_mapping[col_name] + columns.append( + Column( + id=column.name, + name=column.name, + type_name=self._convert_column_type(column.type), + type_text=column.type.value, + nullable=column.nullable, + position=i, + comment=None, + type_precision=0, + type_scale=0, + ) + ) + + yield Table( + id=f"{schema.id}.{table.name}", + name=table.name, + schema=schema, + table_type=TableType.VIEW if table.is_view() else TableType.MANAGED, + columns=columns, + created_at=datetime.now(tz=timezone.utc), + comment=None, + owner=None, + storage_location=None, + data_source_format=None, + generation=None, + created_by="", + updated_at=None, + updated_by=None, + table_id="", + view_definition=table.definition + if isinstance(table, data_model.View) + else None, + properties={}, + ) + + def service_principals(self) -> Iterable[ServicePrincipal]: + for i in range(self.num_service_principals): + yield ServicePrincipal( + id=str(i), + application_id=str(uuid.uuid4()), + display_name=f"user-{i}", + active=True, + ) + + def query_history( + self, + start_time: datetime, + end_time: datetime, + ) -> Iterable[Query]: + for i, query in enumerate(self.queries): + yield Query( + query_id=str(i), + query_text=query.text, + statement_type=self._convert_statement_type(query.type), + start_time=query.timestamp, + end_time=query.timestamp, + user_id=hash(query.actor), + user_name=query.actor, + executed_as_user_id=hash(query.actor), + executed_as_user_name=None, + ) + + def table_lineage(self, table: Table) -> None: + pass + + def get_column_lineage(self, table: Table) -> None: + pass + + @staticmethod + def _convert_column_type(t: ColumnType) -> ColumnTypeName: + if t == ColumnType.INTEGER: + return ColumnTypeName.INT + elif t == ColumnType.FLOAT: + return ColumnTypeName.DOUBLE + elif t == ColumnType.STRING: + return ColumnTypeName.STRING + elif t == ColumnType.BOOLEAN: + return ColumnTypeName.BOOLEAN + elif t == ColumnType.DATETIME: + return ColumnTypeName.TIMESTAMP + else: + raise ValueError(f"Unknown column type: {t}") + + @staticmethod + def _convert_statement_type(t: StatementType) -> QueryStatementType: + if t == "CUSTOM" or t == "UNKNOWN": + return QueryStatementType.OTHER + else: + return QueryStatementType[t] diff --git a/metadata-ingestion/tests/performance/helpers.py b/metadata-ingestion/tests/performance/helpers.py new file mode 100644 index 0000000000000..eb98e53670c96 --- /dev/null +++ b/metadata-ingestion/tests/performance/helpers.py @@ -0,0 +1,21 @@ +import os +from typing import Iterable, Tuple + +import psutil + +from datahub.ingestion.api.workunit import MetadataWorkUnit + + +def workunit_sink(workunits: Iterable[MetadataWorkUnit]) -> Tuple[int, int]: + peak_memory_usage = psutil.Process(os.getpid()).memory_info().rss + i: int = 0 + for i, wu in enumerate(workunits): + if i % 10_000 == 0: + peak_memory_usage = max( + peak_memory_usage, psutil.Process(os.getpid()).memory_info().rss + ) + peak_memory_usage = max( + peak_memory_usage, psutil.Process(os.getpid()).memory_info().rss + ) + + return i, peak_memory_usage diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage.py b/metadata-ingestion/tests/unit/test_bigquery_usage.py index e06c6fb3fe7e5..1eb5d8b00e27c 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage.py @@ -35,7 +35,7 @@ TimeWindowSizeClass, ) from datahub.testing.compare_metadata_json import diff_metadata_json -from tests.performance.bigquery import generate_events, ref_from_table +from tests.performance.bigquery.bigquery_events import generate_events, ref_from_table from tests.performance.data_generation import generate_data, generate_queries from tests.performance.data_model import Container, FieldAccess, Query, Table, View @@ -45,14 +45,15 @@ ACTOR_2, ACTOR_2_URN = "b@acryl.io", "urn:li:corpuser:b" DATABASE_1 = Container("database_1") DATABASE_2 = Container("database_2") -TABLE_1 = Table("table_1", DATABASE_1, ["id", "name", "age"]) -TABLE_2 = Table("table_2", DATABASE_1, ["id", "table_1_id", "value"]) +TABLE_1 = Table("table_1", DATABASE_1, ["id", "name", "age"], None) +TABLE_2 = Table("table_2", DATABASE_1, ["id", "table_1_id", "value"], None) VIEW_1 = View( name="view_1", container=DATABASE_1, columns=["id", "name", "total"], definition="VIEW DEFINITION 1", parents=[TABLE_1, TABLE_2], + column_mapping=None, ) ALL_TABLES = [TABLE_1, TABLE_2, VIEW_1]