Skip to content

Commit

Permalink
Merge branch 'master' into dynamodb-issues
Browse files Browse the repository at this point in the history
  • Loading branch information
maggiehays authored Oct 4, 2023
2 parents 66a33ad + 301d3e6 commit bd47fe4
Show file tree
Hide file tree
Showing 24 changed files with 685 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Ownership of or `SELECT` privilege on any tables and views you want to ingest
* [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html)
* [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html)
+ To ingest your workspace's notebooks and respective lineage, your service principal must have `CAN_READ` privileges on the folders containing the notebooks you want to ingest: [guide](https://docs.databricks.com/en/security/auth-authz/access-control/workspace-acl.html#folder-permissions).
+ To `include_usage_statistics` (enabled by default), your service principal must have `CAN_MANAGE` permissions on any SQL Warehouses you want to ingest: [guide](https://docs.databricks.com/security/auth-authz/access-control/sql-endpoint-acl.html).
+ To ingest `profiling` information with `call_analyze` (enabled by default), your service principal must have ownership or `MODIFY` privilege on any tables you want to profile.
* Alternatively, you can run [ANALYZE TABLE](https://docs.databricks.com/sql/language-manual/sql-ref-syntax-aux-analyze-table.html) yourself on any tables you want to profile, then set `call_analyze` to `false`.
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
"databricks-sdk>=0.1.1, != 0.1.11",
"databricks-sdk>=0.9.0",
"pyspark",
"requests",
}
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
make_container_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -125,6 +126,17 @@ class BucketKey(ContainerKey):
bucket_name: str


class NotebookKey(DatahubKey):
notebook_id: int
platform: str
instance: Optional[str]

def as_urn(self) -> str:
return make_dataset_urn_with_platform_instance(
platform=self.platform, platform_instance=self.instance, name=self.guid()
)


class DatahubKeyJSONEncoder(json.JSONEncoder):
# overload method default
def default(self, obj: Any) -> Any:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,6 @@ def _process_project(
db_views: Dict[str, List[BigqueryView]] = {}

project_id = bigquery_project.id

yield from self.gen_project_id_containers(project_id)

try:
bigquery_project.datasets = (
self.bigquery_data_dictionary.get_datasets_for_project_id(project_id)
Expand All @@ -619,11 +616,23 @@ def _process_project(
return None

if len(bigquery_project.datasets) == 0:
logger.warning(
f"No dataset found in {project_id}. Either there are no datasets in this project or missing bigquery.datasets.get permission. You can assign predefined roles/bigquery.metadataViewer role to your service account."
more_info = (
"Either there are no datasets in this project or missing bigquery.datasets.get permission. "
"You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
if self.config.exclude_empty_projects:
self.report.report_dropped(project_id)
warning_message = f"Excluded project '{project_id}' since no were datasets found. {more_info}"
else:
yield from self.gen_project_id_containers(project_id)
warning_message = (
f"No datasets found in project '{project_id}'. {more_info}"
)
logger.warning(warning_message)
return

yield from self.gen_project_id_containers(project_id)

self.report.num_project_datasets_to_scan[project_id] = len(
bigquery_project.datasets
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ def validate_column_lineage(cls, v: bool, values: Dict[str, Any]) -> bool:
description="Maximum number of entries for the in-memory caches of FileBacked data structures.",
)

exclude_empty_projects: bool = Field(
default=False,
description="Option to exclude empty projects from being ingested.",
)

@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class BigQueryV2Report(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowR

usage_state_size: Optional[str] = None

exclude_empty_projects: Optional[bool] = None

schema_api_perf: BigQuerySchemaApiPerfReport = field(
default_factory=BigQuerySchemaApiPerfReport
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ class DatasetSubTypes(str, Enum):
SALESFORCE_STANDARD_OBJECT = "Object"
POWERBI_DATASET_TABLE = "PowerBI Dataset Table"

# TODO: Create separate entity...
NOTEBOOK = "Notebook"


class DatasetContainerSubTypes(str, Enum):
# Generic SubTypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,16 @@ class UnityCatalogSourceConfig(
description='Attach domains to catalogs, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like "Marketing".) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified.',
)

include_table_lineage: Optional[bool] = pydantic.Field(
include_table_lineage: bool = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation.",
)

include_notebooks: bool = pydantic.Field(
default=False,
description="Ingest notebooks, represented as DataHub datasets.",
)

include_ownership: bool = pydantic.Field(
default=False,
description="Option to enable/disable ownership generation for metastores, catalogs, schemas, and tables.",
Expand All @@ -141,7 +146,7 @@ class UnityCatalogSourceConfig(
"include_table_ownership", "include_ownership"
)

include_column_lineage: Optional[bool] = pydantic.Field(
include_column_lineage: bool = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)
Expand Down
89 changes: 60 additions & 29 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
QueryStatementType,
QueryStatus,
)
from databricks.sdk.service.workspace import ObjectType

import datahub
from datahub.ingestion.source.unity.proxy_profiling import (
Expand All @@ -33,6 +34,7 @@
Catalog,
Column,
Metastore,
Notebook,
Query,
Schema,
ServicePrincipal,
Expand Down Expand Up @@ -137,6 +139,21 @@ def service_principals(self) -> Iterable[ServicePrincipal]:
for principal in self._workspace_client.service_principals.list():
yield self._create_service_principal(principal)

def workspace_notebooks(self) -> Iterable[Notebook]:
for obj in self._workspace_client.workspace.list("/", recursive=True):
if obj.object_type == ObjectType.NOTEBOOK:
yield Notebook(
id=obj.object_id,
path=obj.path,
language=obj.language,
created_at=datetime.fromtimestamp(
obj.created_at / 1000, tz=timezone.utc
),
modified_at=datetime.fromtimestamp(
obj.modified_at / 1000, tz=timezone.utc
),
)

def query_history(
self,
start_time: datetime,
Expand All @@ -153,7 +170,7 @@ def query_history(
"start_time_ms": start_time.timestamp() * 1000,
"end_time_ms": end_time.timestamp() * 1000,
},
"statuses": [QueryStatus.FINISHED.value],
"statuses": [QueryStatus.FINISHED],
"statement_types": [typ.value for typ in ALLOWED_STATEMENT_TYPES],
}
)
Expand Down Expand Up @@ -196,61 +213,75 @@ def _query_history(
method, path, body={**body, "page_token": response["next_page_token"]}
)

def list_lineages_by_table(self, table_name: str) -> dict:
def list_lineages_by_table(
self, table_name: str, include_entity_lineage: bool
) -> dict:
"""List table lineage by table name."""
return self._workspace_client.api_client.do(
method="GET",
path="/api/2.0/lineage-tracking/table-lineage/get",
body={"table_name": table_name},
path="/api/2.0/lineage-tracking/table-lineage",
body={
"table_name": table_name,
"include_entity_lineage": include_entity_lineage,
},
)

def list_lineages_by_column(self, table_name: str, column_name: str) -> dict:
"""List column lineage by table name and column name."""
return self._workspace_client.api_client.do(
"GET",
"/api/2.0/lineage-tracking/column-lineage/get",
"/api/2.0/lineage-tracking/column-lineage",
body={"table_name": table_name, "column_name": column_name},
)

def table_lineage(self, table: Table) -> None:
def table_lineage(
self, table: Table, include_entity_lineage: bool
) -> Optional[dict]:
# Lineage endpoint doesn't exists on 2.1 version
try:
response: dict = self.list_lineages_by_table(
table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}"
table_name=table.ref.qualified_table_name,
include_entity_lineage=include_entity_lineage,
)
table.upstreams = {
TableReference(
table.schema.catalog.metastore.id,
item["catalog_name"],
item["schema_name"],
item["name"],
): {}
for item in response.get("upstream_tables", [])
}

for item in response.get("upstreams") or []:
if "tableInfo" in item:
table_ref = TableReference.create_from_lineage(
item["tableInfo"], table.schema.catalog.metastore.id
)
if table_ref:
table.upstreams[table_ref] = {}
for notebook in item.get("notebookInfos") or []:
table.upstream_notebooks.add(notebook["notebook_id"])

for item in response.get("downstreams") or []:
for notebook in item.get("notebookInfos") or []:
table.downstream_notebooks.add(notebook["notebook_id"])

return response
except Exception as e:
logger.error(f"Error getting lineage: {e}")
return None

def get_column_lineage(self, table: Table) -> None:
def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None:
try:
table_lineage_response: dict = self.list_lineages_by_table(
table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}"
table_lineage = self.table_lineage(
table, include_entity_lineage=include_entity_lineage
)
if table_lineage_response:
if table_lineage:
for column in table.columns:
response: dict = self.list_lineages_by_column(
table_name=f"{table.schema.catalog.name}.{table.schema.name}.{table.name}",
table_name=table.ref.qualified_table_name,
column_name=column.name,
)
for item in response.get("upstream_cols", []):
table_ref = TableReference(
table.schema.catalog.metastore.id,
item["catalog_name"],
item["schema_name"],
item["table_name"],
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore.id
)
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])

except Exception as e:
logger.error(f"Error getting lineage: {e}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Supported types are available at
# https://api-docs.databricks.com/rest/latest/unity-catalog-api-specification-2-1.html?_ga=2.151019001.1795147704.1666247755-2119235717.1666247755
import dataclasses
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
from typing import Dict, FrozenSet, List, Optional, Set

from databricks.sdk.service.catalog import (
CatalogType,
Expand All @@ -11,6 +13,7 @@
TableType,
)
from databricks.sdk.service.sql import QueryStatementType
from databricks.sdk.service.workspace import Language

from datahub.metadata.schema_classes import (
ArrayTypeClass,
Expand All @@ -26,6 +29,8 @@
TimeTypeClass,
)

logger = logging.getLogger(__name__)

DATA_TYPE_REGISTRY: dict = {
ColumnTypeName.BOOLEAN: BooleanTypeClass,
ColumnTypeName.BYTE: BytesTypeClass,
Expand Down Expand Up @@ -66,6 +71,9 @@
ALLOWED_STATEMENT_TYPES = {*OPERATION_STATEMENT_TYPES.keys(), QueryStatementType.SELECT}


NotebookId = int


@dataclass
class CommonProperty:
id: str
Expand Down Expand Up @@ -136,6 +144,19 @@ def create(cls, table: "Table") -> "TableReference":
table.name,
)

@classmethod
def create_from_lineage(cls, d: dict, metastore: str) -> Optional["TableReference"]:
try:
return cls(
metastore,
d["catalog_name"],
d["schema_name"],
d.get("table_name", d["name"]), # column vs table query output
)
except Exception as e:
logger.warning(f"Failed to create TableReference from {d}: {e}")
return None

def __str__(self) -> str:
return f"{self.metastore}.{self.catalog}.{self.schema}.{self.table}"

Expand All @@ -154,7 +175,6 @@ class Table(CommonProperty):
columns: List[Column]
storage_location: Optional[str]
data_source_format: Optional[DataSourceFormat]
comment: Optional[str]
table_type: TableType
owner: Optional[str]
generation: Optional[int]
Expand All @@ -166,6 +186,8 @@ class Table(CommonProperty):
view_definition: Optional[str]
properties: Dict[str, str]
upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict)
upstream_notebooks: Set[NotebookId] = field(default_factory=set)
downstream_notebooks: Set[NotebookId] = field(default_factory=set)

ref: TableReference = field(init=False)

Expand Down Expand Up @@ -228,3 +250,23 @@ def __bool__(self):
self.max is not None,
)
)


@dataclass
class Notebook:
id: NotebookId
path: str
language: Language
created_at: datetime
modified_at: datetime

upstreams: FrozenSet[TableReference] = field(default_factory=frozenset)

@classmethod
def add_upstream(cls, upstream: TableReference, notebook: "Notebook") -> "Notebook":
return cls(
**{ # type: ignore
**dataclasses.asdict(notebook),
"upstreams": frozenset([*notebook.upstreams, upstream]),
}
)
Loading

0 comments on commit bd47fe4

Please sign in to comment.