Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): tables from snowflake shares as siblings #8531

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
47bb20d
feat(ingest/snowflake): tables from snowflake shares as siblings
mayurinehate Jul 26, 2023
752b2dd
add unit tests, docs, logging
mayurinehate Jul 27, 2023
daf87ef
remove node itself from sibling, more TODO
mayurinehate Jul 27, 2023
8018e5a
update tests, comments
mayurinehate Jul 31, 2023
f704d9c
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Jul 31, 2023
69e3792
revert push down filtering changes near query
mayurinehate Aug 1, 2023
4ce37e5
change config structure to allow same shares config across accounts
mayurinehate Aug 1, 2023
b33cafd
fix
mayurinehate Aug 1, 2023
ea1ba40
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Aug 1, 2023
f19d1ed
fix indent
mayurinehate Aug 1, 2023
89a06b1
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Aug 4, 2023
4f6035a
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Aug 14, 2023
6d4be65
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Aug 22, 2023
cfd3924
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Aug 23, 2023
78eba30
update doc content, refractor to avoid assers
mayurinehate Aug 23, 2023
d1bc494
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Aug 23, 2023
1a77245
simplification refractor
mayurinehate Aug 23, 2023
c55e37a
Merge remote-tracking branch 'refs/remotes/origin/snowflake_shares_li…
mayurinehate Aug 23, 2023
afbe095
Update snowflake_pre.md
mayurinehate Aug 23, 2023
02585c4
remove no-op
mayurinehate Aug 23, 2023
b28ebb1
Merge remote-tracking branch 'refs/remotes/origin/snowflake_shares_li…
mayurinehate Aug 23, 2023
6cde852
Merge branch 'master' into snowflake_shares_lineage_siblings
mayurinehate Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions metadata-ingestion/docs/sources/snowflake/snowflake_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@ The steps slightly differ based on which you decide to use.
including `client_id` and `client_secret`, plus your Okta user's `Username` and `Password`
* Note: the `username` and `password` config options are not nested under `oauth_config`

### Snowflake Shares
If you are using [Snowflake Shares](https://docs.snowflake.com/en/user-guide/data-sharing-provider) to share data across different snowflake accounts, and you have set up DataHub recipes for ingesting metadata from all these accounts, you may end up having multiple similar dataset entities corresponding to virtual versions of same table in different snowflake accounts. DataHub Snowflake connector can automatically link such tables together through Siblings and Lineage relationship if user provides information necessary to establish the relationship using configuration `shares` in recipe.

#### Example
- Snowflake account `account1` (ingested as platform_instance `instance1`) owns a database `db1`. A share `X` is created in `account1` that includes database `db1` along with schemas and tables inside it.
- Now, `X` is shared with snowflake account `account2` (ingested as platform_instance `instance2`). A database `db1_from_X` is created from inbound share `X` in `account2`. In this case, all tables and views included in share `X` will also be present in `instance2`.`db1_from_X`.
- This can be represented in `shares` configuration section as
```yaml
shares:
X: # name of the share
database_name: db1
platform_instance: instance1
consumers: # list of all databases created from share X
- database_name: db1_from_X
platform_instance: instance2

```
- If share `X` is shared with more snowflake accounts and database is created from share `X` in those account then additional entries need to be added in `consumers` list for share `X`, one per snowflake account. The same `shares` config can then be copied across recipes of all accounts.
### Caveats

- Some of the features are only available in the Snowflake Enterprise Edition. This doc has notes mentioning where this applies.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import logging
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import Dict, List, Optional, cast
from typing import Dict, List, Optional, Set, cast

from pydantic import Field, SecretStr, root_validator, validator

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
Expand Down Expand Up @@ -42,6 +44,31 @@ class TagOption(str, Enum):
skip = "skip"


@dataclass(frozen=True)
class DatabaseId:
database: str = Field(
description="Database created from share in consumer account."
)
platform_instance: str = Field(
description="Platform instance of consumer snowflake account."
)


class SnowflakeShareConfig(ConfigModel):
database: str = Field(description="Database from which share is created.")
platform_instance: str = Field(
description="Platform instance for snowflake account in which share is created."
)

consumers: Set[DatabaseId] = Field(
description="List of databases created in consumer accounts."
)

@property
def source_database(self) -> DatabaseId:
return DatabaseId(self.database, self.platform_instance)


class SnowflakeV2Config(
SnowflakeConfig,
SnowflakeUsageConfig,
Expand Down Expand Up @@ -120,6 +147,13 @@ class SnowflakeV2Config(
"upstreams_deny_pattern", "temporary_tables_pattern"
)

shares: Optional[Dict[str, SnowflakeShareConfig]] = Field(
default=None,
description="Required if current account owns or consumes snowflake share."
" If specified, connector creates lineage and siblings relationship between current account's database tables and consumer/producer account's database tables."
" Map of share name -> details of share.",
)

email_as_user_identifier: bool = Field(
default=True,
description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is provided, generates email addresses for snowflake users with unset emails, based on their username.",
Expand Down Expand Up @@ -197,3 +231,77 @@ def get_sql_alchemy_url(
@property
def parse_view_ddl(self) -> bool:
return self.include_view_column_lineage

@validator("shares")
def validate_shares(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we raise ValueError in validators, not use assertions. Do you want to change that convention? For now at least, can you change to stay consistent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we allow ValueError, AssertionError, TypeError as a convention, as also mentioned here - https://datahubproject.io/docs/metadata-ingestion/developing/#coding

Sometimes asserts are more readable /briefer so I'd prefer them. In this case, I'm okay to change.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice. I like the assert syntax more, I think we're just hesitant because you can disable assertions with a certain flag. I don't feel strongly here, up to you

cls, shares: Optional[Dict[str, SnowflakeShareConfig]], values: Dict
) -> Optional[Dict[str, SnowflakeShareConfig]]:
current_platform_instance = values.get("platform_instance")

if shares:
# Check: platform_instance should be present
assert current_platform_instance is not None, (
"Did you forget to set `platform_instance` for current ingestion ? "
"It is required to use `platform_instance` when ingesting from multiple snowflake accounts."
)

databases_included_in_share: List[DatabaseId] = []
databases_created_from_share: List[DatabaseId] = []

for share_details in shares.values():
shared_db = DatabaseId(
share_details.database, share_details.platform_instance
)
assert all(
consumer.platform_instance != share_details.platform_instance
for consumer in share_details.consumers
), "Share's platform_instance can not be same as consumer's platform instance. Self-sharing not supported in Snowflake."

databases_included_in_share.append(shared_db)
databases_created_from_share.extend(share_details.consumers)

for db_from_share in databases_created_from_share:
asikowitz marked this conversation as resolved.
Show resolved Hide resolved
assert (
db_from_share not in databases_included_in_share
), "Database included in a share can not be present as consumer in any share."
assert (
databases_created_from_share.count(db_from_share) == 1
), "Same database can not be present as consumer in more than one share."

return shares

def outbounds(self) -> Dict[str, Set[DatabaseId]]:
"""
Returns mapping of
database included in current account's outbound share -> all databases created from this share in other accounts
"""
outbounds: Dict[str, Set[DatabaseId]] = defaultdict(set)
if self.shares:
for share_name, share_details in self.shares.items():
if share_details.platform_instance == self.platform_instance:
logger.debug(
f"database {share_details.database} is included in outbound share(s) {share_name}."
)
outbounds[share_details.database].update(share_details.consumers)
return outbounds

def inbounds(self) -> Dict[str, DatabaseId]:
"""
Returns mapping of
database created from an current account's inbound share -> other-account database from which this share was created
"""
inbounds: Dict[str, DatabaseId] = {}
if self.shares:
for share_name, share_details in self.shares.items():
for consumer in share_details.consumers:
if consumer.platform_instance == self.platform_instance:
logger.debug(
f"database {consumer.database} is created from inbound share {share_name}."
)
inbounds[consumer.database] = share_details.source_database
break
else:
logger.info(
f"Skipping Share {share_name}, as it does not include current platform instance {self.platform_instance}",
)
return inbounds
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def get_tables_for_database(
for table in cur:
if table["TABLE_SCHEMA"] not in tables:
tables[table["TABLE_SCHEMA"]] = []

tables[table["TABLE_SCHEMA"]].append(
SnowflakeTable(
name=table["TABLE_NAME"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import logging
from typing import Callable, Iterable, List

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import (
DatabaseId,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_schema import SnowflakeDatabase
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
Upstream,
UpstreamLineage,
)

logger: logging.Logger = logging.getLogger(__name__)


class SnowflakeSharesHandler(SnowflakeCommonMixin):
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
dataset_urn_builder: Callable[[str], str],
) -> None:
self.config = config
self.report = report
self.logger = logger
self.dataset_urn_builder = dataset_urn_builder

def get_shares_workunits(
self, databases: List[SnowflakeDatabase]
) -> Iterable[MetadataWorkUnit]:
inbounds = self.config.inbounds()
outbounds = self.config.outbounds()
# None of the databases are shared
if not (inbounds or outbounds):
return

logger.debug("Checking databases for inbound or outbound shares.")
for db in databases:
is_inbound = db.name in inbounds
is_outbound = db.name in outbounds

if not (is_inbound or is_outbound):
logger.debug(f"database {db.name} is not shared.")
continue

sibling_dbs = (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is typed as Collection or Iterable then you don't have to cast to list. Doesn't matter though

list(outbounds[db.name]) if is_outbound else [inbounds[db.name]]
)

for schema in db.schemas:
for table_name in schema.tables + schema.views:
# TODO: If this is outbound database,
# 1. attempt listing shares using `show shares` to identify name of share associated with this database (cache query result).
# 2. if corresponding share is listed, then run `show grants to share <share_name>` to identify exact tables, views included in share.
# 3. emit siblings only for the objects listed above.
# This will work only if the configured role has accountadmin role access OR is owner of share.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not advisable to use role with "accountadmin" access hence this is not done. Also this PR takes care to hide ghost nodes in siblings relation so this is not required.

# Otherwise ghost nodes may be shown in "Composed Of" section for tables/views in original database which are not granted to share.
yield from self.gen_siblings(
db.name,
schema.name,
table_name,
is_outbound,
sibling_dbs,
)

if is_inbound:
assert len(sibling_dbs) == 1
# SnowflakeLineageExtractor is unaware of database->schema->table hierarchy
# hence this lineage code is not written in SnowflakeLineageExtractor
# also this is not governed by configs include_table_lineage and include_view_lineage
yield self.get_upstream_lineage_with_primary_sibling(
db.name, schema.name, table_name, sibling_dbs[0]
)

self.report_missing_databases(
databases, list(inbounds.keys()), list(outbounds.keys())
)

def report_missing_databases(
self,
databases: List[SnowflakeDatabase],
inbounds: List[str],
outbounds: List[str],
) -> None:
db_names = [db.name for db in databases]
missing_dbs = [db for db in inbounds + outbounds if db not in db_names]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could alternatively not cast to list and do inbounds | outbounds


if missing_dbs:
self.report_warning(
"snowflake-shares",
f"Databases {missing_dbs} were not ingested. Siblings/Lineage will not be set for these.",
)

def gen_siblings(
self,
database_name: str,
schema_name: str,
table_name: str,
primary: bool,
sibling_databases: List[DatabaseId],
) -> Iterable[MetadataWorkUnit]:
if not sibling_databases:
return
dataset_identifier = self.get_dataset_identifier(
table_name, schema_name, database_name
)
urn = self.dataset_urn_builder(dataset_identifier)

sibling_urns = [
make_dataset_urn_with_platform_instance(
self.platform,
self.get_dataset_identifier(
table_name, schema_name, sibling_db.database
),
sibling_db.platform_instance,
)
for sibling_db in sibling_databases
]

yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=Siblings(primary=primary, siblings=sorted(sibling_urns)),
).as_workunit()

def get_upstream_lineage_with_primary_sibling(
self,
database_name: str,
schema_name: str,
table_name: str,
primary_sibling_db: DatabaseId,
) -> MetadataWorkUnit:
dataset_identifier = self.get_dataset_identifier(
table_name, schema_name, database_name
)
urn = self.dataset_urn_builder(dataset_identifier)

upstream_urn = make_dataset_urn_with_platform_instance(
self.platform,
self.get_dataset_identifier(
table_name, schema_name, primary_sibling_db.database
),
primary_sibling_db.platform_instance,
)

return MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=UpstreamLineage(
upstreams=[Upstream(dataset=upstream_urn, type=DatasetLineageType.COPY)]
),
).as_workunit()
Loading
Loading