Skip to content

Commit

Permalink
feat(ingest/unity): Use ThreadPoolExecutor for CLL (#8952)
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored Oct 6, 2023
1 parent ea87feb commit c80da8f
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 35 deletions.
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,17 @@ class UnityCatalogSourceConfig(
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)

column_lineage_column_limit: int = pydantic.Field(
default=300,
description="Limit the number of columns to get column level lineage. ",
)

lineage_max_workers: int = pydantic.Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for column lineage thread pool executor. Set to 1 to disable.",
hidden_from_docs=True,
)

include_usage_statistics: bool = Field(
default=True,
description="Generate usage statistics.",
Expand Down
46 changes: 20 additions & 26 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,7 @@ def list_lineages_by_column(self, table_name: str, column_name: str) -> dict:
body={"table_name": table_name, "column_name": column_name},
)

def table_lineage(
self, table: Table, include_entity_lineage: bool
) -> Optional[dict]:
def table_lineage(self, table: Table, include_entity_lineage: bool) -> None:
# Lineage endpoint doesn't exists on 2.1 version
try:
response: dict = self.list_lineages_by_table(
Expand All @@ -256,34 +254,30 @@ def table_lineage(
for item in response.get("downstreams") or []:
for notebook in item.get("notebookInfos") or []:
table.downstream_notebooks.add(notebook["notebook_id"])

return response
except Exception as e:
logger.error(f"Error getting lineage: {e}")
return None
logger.warning(
f"Error getting lineage on table {table.ref}: {e}", exc_info=True
)

def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None:
def get_column_lineage(self, table: Table, column_name: str) -> None:
try:
table_lineage = self.table_lineage(
table, include_entity_lineage=include_entity_lineage
response: dict = self.list_lineages_by_column(
table_name=table.ref.qualified_table_name,
column_name=column_name,
)
if table_lineage:
for column in table.columns:
response: dict = self.list_lineages_by_column(
table_name=table.ref.qualified_table_name,
column_name=column.name,
)
for item in response.get("upstream_cols", []):
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore
)
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])

for item in response.get("upstream_cols") or []:
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore
)
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column_name, []
).append(item["name"])
except Exception as e:
logger.error(f"Error getting lineage: {e}")
logger.warning(
f"Error getting column lineage on table {table.ref}, column {column_name}: {e}",
exc_info=True,
)

@staticmethod
def _escape_sequence(value: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
table_profiles: EntityFilterReport = EntityFilterReport.field(type="table profile")
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")

num_column_lineage_skipped_column_count: int = 0

num_queries: int = 0
num_queries_dropped_parse_failure: int = 0
num_queries_missing_table: int = 0 # Can be due to pattern filter
Expand Down
33 changes: 24 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import re
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from typing import Dict, Iterable, List, Optional, Set, Union
from urllib.parse import urljoin
Expand Down Expand Up @@ -367,15 +368,7 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
ownership = self._create_table_ownership_aspect(table)
data_platform_instance = self._create_data_platform_instance_aspect()

if self.config.include_column_lineage:
self.unity_catalog_api_proxy.get_column_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
elif self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
lineage = self._generate_lineage_aspect(dataset_urn, table)
lineage = self.ingest_lineage(table)

if self.config.include_notebooks:
for notebook_id in table.downstream_notebooks:
Expand All @@ -401,6 +394,28 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
)
]

def ingest_lineage(self, table: Table) -> Optional[UpstreamLineageClass]:
if self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
)

if self.config.include_column_lineage and table.upstreams:
if len(table.columns) > self.config.column_lineage_column_limit:
self.report.num_column_lineage_skipped_column_count += 1

with ThreadPoolExecutor(
max_workers=self.config.lineage_max_workers
) as executor:
for column in table.columns[: self.config.column_lineage_column_limit]:
executor.submit(
self.unity_catalog_api_proxy.get_column_lineage,
table,
column.name,
)

return self._generate_lineage_aspect(self.gen_dataset_urn(table.ref), table)

def _generate_lineage_aspect(
self, dataset_urn: str, table: Table
) -> Optional[UpstreamLineageClass]:
Expand Down

0 comments on commit c80da8f

Please sign in to comment.