Skip to content

Commit

Permalink
test(ingest/unity): Unity catalog data generation (datahub-project#8949)
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored and Salman-Apptware committed Dec 15, 2023
1 parent dd3d149 commit 1903ada
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 101 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
"databricks-sdk>=0.9.0",
"pyspark~=3.3.0",
"requests",
"databricks-sql-connector",
}

mysql = sql_common | {"pymysql>=1.0.2"}
Expand Down
10 changes: 5 additions & 5 deletions metadata-ingestion/tests/performance/bigquery/bigquery_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import random
import uuid
from collections import defaultdict
from typing import Dict, Iterable, List, cast
from typing import Dict, Iterable, List, Set

from typing_extensions import get_args

Expand All @@ -15,7 +15,7 @@
)
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
from datahub.ingestion.source.bigquery_v2.usage import OPERATION_STATEMENT_TYPES
from tests.performance.data_model import Query, StatementType, Table, View
from tests.performance.data_model import Query, StatementType, Table

# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason
READ_REASONS = [
Expand Down Expand Up @@ -86,7 +86,7 @@ def generate_events(
ref_from_table(parent, table_to_project)
for field in query.fields_accessed
if field.table.is_view()
for parent in cast(View, field.table).parents
for parent in field.table.upstreams
)
),
referencedViews=referencedViews,
Expand All @@ -96,15 +96,15 @@ def generate_events(
query_on_view=True if referencedViews else False,
)
)
table_accesses = defaultdict(set)
table_accesses: Dict[BigQueryTableRef, Set[str]] = defaultdict(set)
for field in query.fields_accessed:
if not field.table.is_view():
table_accesses[ref_from_table(field.table, table_to_project)].add(
field.column
)
else:
# assuming that same fields are accessed in parent tables
for parent in cast(View, field.table).parents:
for parent in field.table.upstreams:
table_accesses[ref_from_table(parent, table_to_project)].add(
field.column
)
Expand Down
153 changes: 110 additions & 43 deletions metadata-ingestion/tests/performance/data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
This is a work in progress, built piecemeal as needed.
"""
import random
import uuid
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Iterable, List, TypeVar, Union, cast
from typing import Collection, Iterable, List, Optional, TypeVar, Union, cast

from faker import Faker

from tests.performance.data_model import (
Column,
ColumnMapping,
ColumnType,
Container,
FieldAccess,
Expand All @@ -40,17 +40,46 @@
"UNKNOWN",
]

ID_COLUMN = "id" # Use to allow joins between all tables


class Distribution(metaclass=ABCMeta):
@abstractmethod
def _sample(self) -> int:
raise NotImplementedError

def sample(
self, *, floor: Optional[int] = None, ceiling: Optional[int] = None
) -> int:
value = self._sample()
if floor is not None:
value = max(value, floor)
if ceiling is not None:
value = min(value, ceiling)
return value


@dataclass(frozen=True)
class NormalDistribution:
class NormalDistribution(Distribution):
mu: float
sigma: float

def sample(self) -> int:
def _sample(self) -> int:
return int(random.gauss(mu=self.mu, sigma=self.sigma))

def sample_with_floor(self, floor: int = 1) -> int:
return max(int(random.gauss(mu=self.mu, sigma=self.sigma)), floor)

@dataclass(frozen=True)
class LomaxDistribution(Distribution):
"""See https://en.wikipedia.org/wiki/Lomax_distribution.
Equivalent to pareto(scale, shape) - scale; scale * beta_prime(1, shape)
"""

scale: float
shape: float

def _sample(self) -> int:
return int(self.scale * (random.paretovariate(self.shape) - 1))


@dataclass
Expand All @@ -72,9 +101,9 @@ def generate_data(
num_containers: Union[List[int], int],
num_tables: int,
num_views: int,
columns_per_table: NormalDistribution = NormalDistribution(5, 2),
parents_per_view: NormalDistribution = NormalDistribution(2, 1),
view_definition_length: NormalDistribution = NormalDistribution(150, 50),
columns_per_table: Distribution = NormalDistribution(5, 2),
parents_per_view: Distribution = NormalDistribution(2, 1),
view_definition_length: Distribution = NormalDistribution(150, 50),
time_range: timedelta = timedelta(days=14),
) -> SeedMetadata:
# Assemble containers
Expand All @@ -85,43 +114,32 @@ def generate_data(
for i, num_in_layer in enumerate(num_containers):
layer = [
Container(
f"{i}-container-{j}",
f"{_container_type(i)}_{j}",
parent=random.choice(containers[-1]) if containers else None,
)
for j in range(num_in_layer)
]
containers.append(layer)

# Assemble tables
# Assemble tables and views, lineage, and definitions
tables = [
Table(
f"table-{i}",
container=random.choice(containers[-1]),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
column_mapping=None,
)
for i in range(num_tables)
_generate_table(i, containers[-1], columns_per_table) for i in range(num_tables)
]
views = [
View(
f"view-{i}",
container=random.choice(containers[-1]),
columns=[
f"column-{j}-{uuid.uuid4()}"
for j in range(columns_per_table.sample_with_floor())
],
column_mapping=None,
definition=f"{uuid.uuid4()}-{'*' * view_definition_length.sample_with_floor(10)}",
parents=random.sample(tables, parents_per_view.sample_with_floor()),
**{ # type: ignore
**_generate_table(i, containers[-1], columns_per_table).__dict__,
"name": f"view_{i}",
"definition": f"--{'*' * view_definition_length.sample(floor=0)}",
},
)
for i in range(num_views)
]

for table in tables + views:
_generate_column_mapping(table)
for view in views:
view.upstreams = random.sample(tables, k=parents_per_view.sample(floor=1))

generate_lineage(tables, views)

now = datetime.now(tz=timezone.utc)
return SeedMetadata(
Expand All @@ -133,6 +151,33 @@ def generate_data(
)


def generate_lineage(
tables: Collection[Table],
views: Collection[Table],
# Percentiles: 75th=0, 80th=1, 95th=2, 99th=4, 99.99th=15
upstream_distribution: Distribution = LomaxDistribution(scale=3, shape=5),
) -> None:
num_upstreams = [upstream_distribution.sample(ceiling=100) for _ in tables]
# Prioritize tables with a lot of upstreams themselves
factor = 1 + len(tables) // 10
table_weights = [1 + (num_upstreams[i] * factor) for i in range(len(tables))]
view_weights = [1] * len(views)

# TODO: Python 3.9 use random.sample with counts
sample = []
for table, weight in zip(tables, table_weights):
for _ in range(weight):
sample.append(table)
for view, weight in zip(views, view_weights):
for _ in range(weight):
sample.append(view)
for i, table in enumerate(tables):
table.upstreams = random.sample( # type: ignore
sample,
k=num_upstreams[i],
)


def generate_queries(
seed_metadata: SeedMetadata,
num_selects: int,
Expand All @@ -146,12 +191,12 @@ def generate_queries(
) -> Iterable[Query]:
faker = Faker()
query_texts = [
faker.paragraph(query_length.sample_with_floor(30) // 30)
faker.paragraph(query_length.sample(floor=30) // 30)
for _ in range(num_unique_queries)
]

all_tables = seed_metadata.tables + seed_metadata.views
users = [f"user-{i}@xyz.com" for i in range(num_users)]
users = [f"user_{i}@xyz.com" for i in range(num_users)]
for i in range(num_selects): # Pure SELECT statements
tables = _sample_list(all_tables, tables_per_select)
all_columns = [
Expand Down Expand Up @@ -191,21 +236,43 @@ def generate_queries(
)


def _generate_column_mapping(table: Table) -> ColumnMapping:
d = {}
for column in table.columns:
d[column] = Column(
name=column,
def _container_type(i: int) -> str:
if i == 0:
return "database"
elif i == 1:
return "schema"
else:
return f"{i}container"


def _generate_table(
i: int, parents: List[Container], columns_per_table: Distribution
) -> Table:
num_columns = columns_per_table.sample(floor=1)

columns = OrderedDict({ID_COLUMN: Column(ID_COLUMN, ColumnType.INTEGER, False)})
for j in range(num_columns):
name = f"column_{j}"
columns[name] = Column(
name=name,
type=random.choice(list(ColumnType)),
nullable=random.random() < 0.1, # Fixed 10% chance for now
)
table.column_mapping = d
return d
return Table(
f"table_{i}",
container=random.choice(parents),
columns=columns,
upstreams=[],
)


def _sample_list(lst: List[T], dist: NormalDistribution, floor: int = 1) -> List[T]:
return random.sample(lst, min(dist.sample_with_floor(floor), len(lst)))
return random.sample(lst, min(dist.sample(floor=floor), len(lst)))


def _random_time_between(start: datetime, end: datetime) -> datetime:
return start + timedelta(seconds=(end - start).total_seconds() * random.random())


if __name__ == "__main__":
z = generate_data(10, 1000, 10)
54 changes: 45 additions & 9 deletions metadata-ingestion/tests/performance/data_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from dataclasses import dataclass
import typing
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

from typing_extensions import Literal

Expand Down Expand Up @@ -37,29 +39,63 @@ class ColumnType(str, Enum):
@dataclass
class Column:
name: str
type: ColumnType
nullable: bool
type: ColumnType = ColumnType.STRING
nullable: bool = False


ColumnRef = str
ColumnMapping = Dict[ColumnRef, Column]


@dataclass
@dataclass(init=False)
class Table:
name: str
container: Container
columns: List[ColumnRef]
column_mapping: Optional[ColumnMapping]
columns: typing.OrderedDict[ColumnRef, Column] = field(repr=False)
upstreams: List["Table"] = field(repr=False)

def __init__(
self,
name: str,
container: Container,
columns: Union[List[str], Dict[str, Column]],
upstreams: List["Table"],
):
self.name = name
self.container = container
self.upstreams = upstreams
if isinstance(columns, list):
self.columns = OrderedDict((col, Column(col)) for col in columns)
elif isinstance(columns, dict):
self.columns = OrderedDict(columns)

@property
def name_components(self) -> List[str]:
lst = [self.name]
container: Optional[Container] = self.container
while container:
lst.append(container.name)
container = container.parent
return lst[::-1]

def is_view(self) -> bool:
return False


@dataclass
@dataclass(init=False)
class View(Table):
definition: str
parents: List[Table]

def __init__(
self,
name: str,
container: Container,
columns: Union[List[str], Dict[str, Column]],
upstreams: List["Table"],
definition: str,
):
super().__init__(name, container, columns, upstreams)
self.definition = definition

def is_view(self) -> bool:
return True
Expand Down
Loading

0 comments on commit 1903ada

Please sign in to comment.