Skip to content

Commit

Permalink
perf(ingest/unity): Use ThreadPoolExecutor for CLL
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz committed Oct 4, 2023
1 parent d224996 commit a4df8dc
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 35 deletions.
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,17 @@ class UnityCatalogSourceConfig(
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)

column_lineage_column_limit: int = pydantic.Field(
default=300,
description="Limit the number of columns to get column level lineage. ",
)

lineage_max_workers: int = pydantic.Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for column lineage thread pool executor. Set to 1 to disable.",
hidden_from_docs=True,
)

include_usage_statistics: bool = Field(
default=True,
description="Generate usage statistics.",
Expand Down
49 changes: 23 additions & 26 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import dataclasses
import logging
from concurrent.futures import ThreadPoolExecutor, wait
from datetime import datetime, timezone
from typing import Any, Dict, Iterable, List, Optional, Union
from unittest.mock import patch
Expand Down Expand Up @@ -45,6 +46,8 @@

logger: logging.Logger = logging.getLogger(__name__)

MAX_WORKERS = 100


class TableInfoWithGeneration(TableInfo):
generation: Optional[int] = None
Expand Down Expand Up @@ -233,9 +236,7 @@ def list_lineages_by_column(self, table_name: str, column_name: str) -> dict:
body={"table_name": table_name, "column_name": column_name},
)

def table_lineage(
self, table: Table, include_entity_lineage: bool
) -> Optional[dict]:
def table_lineage(self, table: Table, include_entity_lineage: bool) -> None:
# Lineage endpoint doesn't exists on 2.1 version
try:
response: dict = self.list_lineages_by_table(
Expand All @@ -256,34 +257,30 @@ def table_lineage(
for item in response.get("downstreams") or []:
for notebook in item.get("notebookInfos") or []:
table.downstream_notebooks.add(notebook["notebook_id"])

return response
except Exception as e:
logger.error(f"Error getting lineage: {e}")
return None
logger.warning(
f"Error getting lineage on table {table.ref}: {e}", exc_info=True
)

def get_column_lineage(self, table: Table, include_entity_lineage: bool) -> None:
def get_column_lineage(self, table: Table, column_name: str) -> None:
try:
table_lineage = self.table_lineage(
table, include_entity_lineage=include_entity_lineage
response: dict = self.list_lineages_by_column(
table_name=table.ref.qualified_table_name,
column_name=column_name,
)
if table_lineage:
for column in table.columns:
response: dict = self.list_lineages_by_column(
table_name=table.ref.qualified_table_name,
column_name=column.name,
)
for item in response.get("upstream_cols", []):
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore
)
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column.name, []
).append(item["name"])

for item in response.get("upstream_cols") or []:
table_ref = TableReference.create_from_lineage(
item, table.schema.catalog.metastore
)
if table_ref:
table.upstreams.setdefault(table_ref, {}).setdefault(
column_name, []
).append(item["name"])
except Exception as e:
logger.error(f"Error getting lineage: {e}")
logger.warning(
f"Error getting column lineage on table {table.ref}, column {column_name}: {e}",
exc_info=True,
)

@staticmethod
def _escape_sequence(value: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
table_profiles: EntityFilterReport = EntityFilterReport.field(type="table profile")
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")

num_column_lineage_skipped_column_count: int = 0

num_queries: int = 0
num_queries_dropped_parse_failure: int = 0
num_queries_missing_table: int = 0 # Can be due to pattern filter
Expand Down
33 changes: 24 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import re
import time
from concurrent.futures import ThreadPoolExecutor, wait
from datetime import timedelta
from typing import Dict, Iterable, List, Optional, Set, Union
from urllib.parse import urljoin
Expand Down Expand Up @@ -367,15 +368,7 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
ownership = self._create_table_ownership_aspect(table)
data_platform_instance = self._create_data_platform_instance_aspect()

if self.config.include_column_lineage:
self.unity_catalog_api_proxy.get_column_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
elif self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
)
lineage = self._generate_lineage_aspect(dataset_urn, table)
lineage = self.ingest_lineage(table)

if self.config.include_notebooks:
for notebook_id in table.downstream_notebooks:
Expand All @@ -401,6 +394,28 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
)
]

def ingest_lineage(self, table: Table) -> Optional[MetadataWorkUnit]:
if self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
)

if self.config.include_column_lineage and table.upstreams:
if len(table.columns) > self.config.column_lineage_column_limit:
self.report.num_column_lineage_skipped_column_count += 1

with ThreadPoolExecutor(
max_workers=self.config.lineage_max_workers
) as executor:
for column in table.columns[: self.config.column_lineage_column_limit]:
executor.submit(
self.unity_catalog_api_proxy.get_column_lineage,
table,
column.name,
)

return self._generate_lineage_aspect(self.gen_dataset_urn(table.ref), table)

def _generate_lineage_aspect(
self, dataset_urn: str, table: Table
) -> Optional[UpstreamLineageClass]:
Expand Down

0 comments on commit a4df8dc

Please sign in to comment.