Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg performance improvement #11182

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
437038a
Initial approach to boosting and measuring iceberg performance
skrydal Aug 14, 2024
6e62afe
Fixing catalog init
skrydal Aug 15, 2024
71e8990
Linting
skrydal Aug 15, 2024
f8881f0
Linting
skrydal Aug 15, 2024
a099eea
Removing unused import
skrydal Aug 15, 2024
7b51e21
Merge branch 'datahub-project:master' into iceberg_performance
skrydal Aug 15, 2024
71baf6f
Added warning comment, TODO
skrydal Aug 16, 2024
b04324d
Improving parallel handling, added verbosity
skrydal Aug 21, 2024
a7c842f
Linting
skrydal Aug 21, 2024
0320a23
Merge branch 'master' into iceberg_performance
skrydal Aug 21, 2024
63457cc
Better logging
skrydal Aug 21, 2024
0309a00
Improving error handling
skrydal Aug 21, 2024
598d1d2
Linting
skrydal Aug 21, 2024
1b2d6e7
Commenting out a sleep
skrydal Aug 21, 2024
eda1ee6
Back to thread local
skrydal Aug 21, 2024
7ab737f
Merge branch 'master' into iceberg_performance
skrydal Sep 26, 2024
609eddf
Merge branch 'master' into iceberg_performance
skrydal Oct 21, 2024
75851fb
Added graceful handling of some errors
skrydal Oct 21, 2024
750508c
Merge branch 'master' into iceberg_performance
skrydal Oct 21, 2024
15c3a1d
Increased verbosity for listing namespaces
skrydal Oct 21, 2024
47ee246
Added threading info dump
skrydal Oct 21, 2024
fad02ec
Merge branch 'master' into iceberg_performance
skrydal Nov 4, 2024
a297551
Merge branch 'master' into iceberg_performance
skrydal Nov 7, 2024
0c53eb8
Reverting exhaustive logging in threaded executor
skrydal Nov 7, 2024
f5489cf
Reducing verbosity
skrydal Nov 8, 2024
43e703e
Further improvements
skrydal Nov 8, 2024
5649bcc
Removed unused import
skrydal Nov 8, 2024
0f9d412
Linting
skrydal Nov 8, 2024
d88f739
Merge branch 'master' into iceberg_performance
skrydal Nov 12, 2024
aeca08d
Merge branch 'master' into iceberg_performance
skrydal Nov 12, 2024
da7cf6b
Merge branch 'master' into iceberg_performance
skrydal Nov 13, 2024
7bafba9
Merge branch 'master' into iceberg_performance
skrydal Nov 14, 2024
172a5e5
Introduced integration tests using multiple threads, removed overly
skrydal Nov 15, 2024
717a6ec
Minor linting
skrydal Nov 15, 2024
01ec9aa
More robust unit tests
skrydal Nov 15, 2024
8149340
Merge branch 'master' into iceberg_performance
skrydal Nov 15, 2024
e6b0d36
More tests
skrydal Nov 15, 2024
46c10cc
Changing logging to f-strings and adding missing typing info
skrydal Nov 15, 2024
733ed5a
Added handling of exceptions while listing tables for namespaces
skrydal Nov 15, 2024
f9ac4c7
Added handling of exceptions when listing tables
skrydal Nov 15, 2024
3ae97a2
Extended reporting
skrydal Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 130 additions & 48 deletions metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json
import logging
import threading
import uuid
from typing import Any, Dict, Iterable, List, Optional

from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchIcebergTableError
from pyiceberg.exceptions import (
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
)
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
from pyiceberg.table import Table
from pyiceberg.typedef import Identifier
Expand Down Expand Up @@ -75,6 +80,8 @@
OwnershipClass,
OwnershipTypeClass,
)
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

LOGGER = logging.getLogger(__name__)
logging.getLogger("azure.core.pipeline.policies.http_logging_policy").setLevel(
Expand Down Expand Up @@ -130,74 +137,149 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
]

def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
for namespace in catalog.list_namespaces():
yield from catalog.list_tables(namespace)
namespaces = catalog.list_namespaces()
LOGGER.debug(
f"Retrieved {len(namespaces)} namespaces, first 10: {namespaces[:10]}"
)
self.report.report_no_listed_namespaces(len(namespaces))
tables_count = 0
for namespace in namespaces:
try:
tables = catalog.list_tables(namespace)
tables_count += len(tables)
LOGGER.debug(
f"Retrieved {len(tables)} tables for namespace: {namespace}, in total retrieved {tables_count}, first 10: {tables[:10]}"
)
self.report.report_listed_tables_for_namespace(
".".join(namespace), len(tables)
)
yield from tables
except NoSuchNamespaceError:
self.report.report_warning(
"no-such-namespace",
f"Couldn't list tables for namespace {namespace} due to NoSuchNamespaceError exception",
)
LOGGER.warning(
f"NoSuchNamespaceError exception while trying to get list of tables from namespace {namespace}, skipping it",
)
except Exception as e:
self.report.report_failure(
"listing-tables-exception",
f"Couldn't list tables for namespace {namespace} due to {e}",
)
LOGGER.exception(
f"Unexpected exception while trying to get list of tables for namespace {namespace}, skipping it"
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
try:
catalog = self.config.get_catalog()
except Exception as e:
LOGGER.error("Failed to get catalog", exc_info=True)
self.report.report_failure("get-catalog", f"Failed to get catalog: {e}")
return
thread_local = threading.local()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? Why not just pass catalog to _process_dataset?

 def _process_dataset(dataset_path, catalog):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catalog object can not be used across threads (at least not for every catalog type).


for dataset_path in self._get_datasets(catalog):
def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
LOGGER.debug(f"Processing dataset for path {dataset_path}")
dataset_name = ".".join(dataset_path)
if not self.config.table_pattern.allowed(dataset_name):
# Dataset name is rejected by pattern, report as dropped.
self.report.report_dropped(dataset_name)
continue

return
try:
# Try to load an Iceberg table. Might not contain one, this will be caught by NoSuchIcebergTableError.
table = catalog.load_table(dataset_path)
if not hasattr(thread_local, "local_catalog"):
LOGGER.debug(
f"Didn't find local_catalog in thread_local ({thread_local}), initializing new catalog"
)
thread_local.local_catalog = self.config.get_catalog()

with PerfTimer() as timer:
table = thread_local.local_catalog.load_table(dataset_path)
time_taken = timer.elapsed_seconds()
self.report.report_table_load_time(time_taken)
LOGGER.debug(
f"Loaded table: {table.identifier}, time taken: {time_taken}"
)
yield from self._create_iceberg_workunit(dataset_name, table)
except NoSuchPropertyException as e:
self.report.report_warning(
"table-property-missing",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchPropertyException while processing table {dataset_path}, skipping it.",
)
except NoSuchIcebergTableError as e:
self.report.report_warning(
"no-iceberg-table",
f"Failed to create workunit for {dataset_name}. {e}",
)
LOGGER.warning(
f"NoSuchIcebergTableError while processing table {dataset_path}, skipping it.",
)
except Exception as e:
self.report.report_failure("general", f"Failed to create workunit: {e}")
LOGGER.exception(
f"Exception while processing table {dataset_path}, skipping it.",
)

try:
catalog = self.config.get_catalog()
except Exception as e:
self.report.report_failure("get-catalog", f"Failed to get catalog: {e}")
return

for wu in ThreadedIteratorExecutor.process(
worker_func=_process_dataset,
args_list=[(dataset_path,) for dataset_path in self._get_datasets(catalog)],
max_workers=self.config.processing_threads,
):
yield wu

def _create_iceberg_workunit(
self, dataset_name: str, table: Table
) -> Iterable[MetadataWorkUnit]:
self.report.report_table_scanned(dataset_name)
dataset_urn: str = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[Status(removed=False)],
)

# Dataset properties aspect.
custom_properties = table.metadata.properties.copy()
custom_properties["location"] = table.metadata.location
custom_properties["format-version"] = str(table.metadata.format_version)
custom_properties["partition-spec"] = str(self._get_partition_aspect(table))
if table.current_snapshot():
custom_properties["snapshot-id"] = str(table.current_snapshot().snapshot_id)
custom_properties["manifest-list"] = table.current_snapshot().manifest_list
dataset_properties = DatasetPropertiesClass(
name=table.name()[-1],
tags=[],
description=table.metadata.properties.get("comment", None),
customProperties=custom_properties,
)
dataset_snapshot.aspects.append(dataset_properties)
with PerfTimer() as timer:
self.report.report_table_scanned(dataset_name)
LOGGER.debug(f"Processing table {dataset_name}")
dataset_urn: str = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[Status(removed=False)],
)

# Dataset ownership aspect.
dataset_ownership = self._get_ownership_aspect(table)
if dataset_ownership:
dataset_snapshot.aspects.append(dataset_ownership)
# Dataset properties aspect.
custom_properties = table.metadata.properties.copy()
custom_properties["location"] = table.metadata.location
custom_properties["format-version"] = str(table.metadata.format_version)
custom_properties["partition-spec"] = str(self._get_partition_aspect(table))
if table.current_snapshot():
custom_properties["snapshot-id"] = str(
table.current_snapshot().snapshot_id
)
custom_properties[
"manifest-list"
] = table.current_snapshot().manifest_list
dataset_properties = DatasetPropertiesClass(
name=table.name()[-1],
tags=[],
description=table.metadata.properties.get("comment", None),
customProperties=custom_properties,
)
dataset_snapshot.aspects.append(dataset_properties)
# Dataset ownership aspect.
dataset_ownership = self._get_ownership_aspect(table)
if dataset_ownership:
LOGGER.debug(
f"Adding ownership: {dataset_ownership} to the dataset {dataset_name}"
)
dataset_snapshot.aspects.append(dataset_ownership)

schema_metadata = self._create_schema_metadata(dataset_name, table)
dataset_snapshot.aspects.append(schema_metadata)
schema_metadata = self._create_schema_metadata(dataset_name, table)
dataset_snapshot.aspects.append(schema_metadata)

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
self.report.report_table_processing_time(timer.elapsed_seconds())
yield MetadataWorkUnit(id=dataset_name, mce=mce)

dpi_aspect = self._get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

from humanfriendly import format_timespan
from pydantic import Field, validator
from pyiceberg.catalog import Catalog, load_catalog

Expand Down Expand Up @@ -75,6 +76,9 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin)
description="Iceberg table property to look for a `CorpGroup` owner. Can only hold a single group value. If property has no value, no owner information will be emitted.",
)
profiling: IcebergProfilingConfig = IcebergProfilingConfig()
processing_threads: int = Field(
default=1, description="How many threads will be processing tables"
)

@validator("catalog", pre=True, always=True)
def handle_deprecated_catalog_format(cls, value):
Expand Down Expand Up @@ -131,17 +135,70 @@ def get_catalog(self) -> Catalog:

# Retrieve the dict associated with the one catalog entry
catalog_name, catalog_config = next(iter(self.catalog.items()))
logger.debug(
"Initializing the catalog %s with config: %s", catalog_name, catalog_config
)
return load_catalog(name=catalog_name, **catalog_config)


class TimingClass:
times: List[int]

def __init__(self):
self.times = []

def add_timing(self, t):
self.times.append(t)

def __str__(self):
if len(self.times) == 0:
return "no timings reported"
self.times.sort()
total = sum(self.times)
avg = total / len(self.times)
return str(
{
"average_time": format_timespan(avg, detailed=True, max_units=3),
"min_time": format_timespan(self.times[0], detailed=True, max_units=3),
"max_time": format_timespan(self.times[-1], detailed=True, max_units=3),
# total_time does not provide correct information in case we run in more than 1 thread
"total_time": format_timespan(total, detailed=True, max_units=3),
}
)


@dataclass
class IcebergSourceReport(StaleEntityRemovalSourceReport):
tables_scanned: int = 0
entities_profiled: int = 0
filtered: List[str] = field(default_factory=list)
load_table_timings: TimingClass = field(default_factory=TimingClass)
processing_table_timings: TimingClass = field(default_factory=TimingClass)
profiling_table_timings: TimingClass = field(default_factory=TimingClass)
listed_namespaces: int = 0
total_listed_tables: int = 0
tables_listed_per_namespace: Dict = field(default_factory=dict)

def report_listed_tables_for_namespace(
self, namespace: str, no_tables: int
) -> None:
self.tables_listed_per_namespace[namespace] = no_tables
self.total_listed_tables += no_tables

def report_no_listed_namespaces(self, amount: int) -> None:
self.listed_namespaces = amount

def report_table_scanned(self, name: str) -> None:
self.tables_scanned += 1

def report_dropped(self, ent_name: str) -> None:
self.filtered.append(ent_name)

def report_table_load_time(self, t: float) -> None:
self.load_table_timings.add_timing(t)

def report_table_processing_time(self, t: float) -> None:
self.processing_table_timings.add_timing(t)

def report_table_profiling_time(self, t: float) -> None:
self.profiling_table_timings.add_timing(t)
Loading
Loading