-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/snowflake): tables from snowflake shares as siblings #8531
Changes from 13 commits
47bb20d
752b2dd
daf87ef
8018e5a
f704d9c
69e3792
4ce37e5
b33cafd
ea1ba40
f19d1ed
89a06b1
4f6035a
6d4be65
cfd3924
78eba30
d1bc494
1a77245
c55e37a
afbe095
02585c4
b28ebb1
6cde852
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,10 +1,11 @@ | ||||||
import logging | ||||||
from dataclasses import dataclass | ||||||
from enum import Enum | ||||||
from typing import Dict, List, Optional, cast | ||||||
from typing import Dict, List, Optional, Set, cast | ||||||
|
||||||
from pydantic import Field, SecretStr, root_validator, validator | ||||||
|
||||||
from datahub.configuration.common import AllowDenyPattern | ||||||
from datahub.configuration.common import AllowDenyPattern, ConfigModel | ||||||
from datahub.configuration.pattern_utils import UUID_REGEX | ||||||
from datahub.configuration.validate_field_removal import pydantic_removed_field | ||||||
from datahub.configuration.validate_field_rename import pydantic_renamed_field | ||||||
|
@@ -42,6 +43,27 @@ class TagOption(str, Enum): | |||||
skip = "skip" | ||||||
|
||||||
|
||||||
@dataclass(frozen=True) | ||||||
class DatabaseId: | ||||||
database: str = Field( | ||||||
description="Database created from share in consumer account." | ||||||
) | ||||||
platform_instance: str = Field( | ||||||
description="Platform instance of consumer snowflake account." | ||||||
) | ||||||
|
||||||
|
||||||
class SnowflakeShareConfig(ConfigModel): | ||||||
database: str = Field(description="Database from which share is created.") | ||||||
platform_instance: str = Field( | ||||||
description="Platform instance for snowflake account in which share is created." | ||||||
) | ||||||
|
||||||
consumers: Set[DatabaseId] = Field( | ||||||
description="List of databases created in consumer accounts." | ||||||
) | ||||||
|
||||||
|
||||||
class SnowflakeV2Config( | ||||||
SnowflakeConfig, | ||||||
SnowflakeUsageConfig, | ||||||
|
@@ -120,6 +142,13 @@ class SnowflakeV2Config( | |||||
"upstreams_deny_pattern", "temporary_tables_pattern" | ||||||
) | ||||||
|
||||||
shares: Optional[Dict[str, SnowflakeShareConfig]] = Field( | ||||||
default=None, | ||||||
description="Required if current account owns or consumes snowflake share." | ||||||
" If specified, connector creates lineage and siblings relationship between current account's database tables and consumer/producer account's database tables." | ||||||
" Map of share name -> details of share.", | ||||||
) | ||||||
|
||||||
email_as_user_identifier: bool = Field( | ||||||
default=True, | ||||||
description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is provided, generates email addresses for snowflake users with unset emails, based on their username.", | ||||||
|
@@ -197,3 +226,41 @@ def get_sql_alchemy_url( | |||||
@property | ||||||
def parse_view_ddl(self) -> bool: | ||||||
return self.include_view_column_lineage | ||||||
|
||||||
@validator("shares") | ||||||
def validate_shares( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generally we raise ValueError in validators, not use assertions. Do you want to change that convention? For now at least, can you change to stay consistent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we allow ValueError, AssertionError, TypeError as a convention, as also mentioned here - https://datahubproject.io/docs/metadata-ingestion/developing/#coding Sometimes asserts are more readable /briefer so I'd prefer them. In this case, I'm okay to change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh nice. I like the assert syntax more, I think we're just hesitant because you can disable assertions with a certain flag. I don't feel strongly here, up to you |
||||||
cls, shares: Optional[Dict[str, SnowflakeShareConfig]], values: Dict | ||||||
) -> Optional[Dict[str, SnowflakeShareConfig]]: | ||||||
current_platform_instance = values.get("platform_instance") | ||||||
|
||||||
# Check: platform_instance should be present | ||||||
if shares: | ||||||
assert current_platform_instance is not None, ( | ||||||
"Did you forget to set `platform_instance` for current ingestion ?" | ||||||
"It is advisable to use `platform_instance` when ingesting from multiple snowflake accounts." | ||||||
mayurinehate marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
) | ||||||
|
||||||
databases_included_in_share: List[DatabaseId] = [] | ||||||
databases_created_from_share: List[DatabaseId] = [] | ||||||
|
||||||
for _, share_details in shares.items(): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
shared_db = DatabaseId( | ||||||
share_details.database, share_details.platform_instance | ||||||
) | ||||||
assert all( | ||||||
consumer.platform_instance != share_details.platform_instance | ||||||
for consumer in share_details.consumers | ||||||
), "Share's platform_instance can not be same as consumer's platform instance. Self-sharing not supported in Snowflake." | ||||||
|
||||||
databases_included_in_share.append(shared_db) | ||||||
databases_created_from_share.extend(share_details.consumers) | ||||||
|
||||||
for db_from_share in databases_created_from_share: | ||||||
asikowitz marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
assert ( | ||||||
db_from_share not in databases_included_in_share | ||||||
), "Database included in a share can not be present as consumer in any share." | ||||||
assert ( | ||||||
databases_created_from_share.count(db_from_share) == 1 | ||||||
), "Same database can not be present as consumer in more than one share." | ||||||
|
||||||
return shares |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
import logging | ||
from dataclasses import dataclass | ||
from typing import Callable, Dict, Iterable, List, Optional | ||
|
||
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.ingestion.api.workunit import MetadataWorkUnit | ||
from datahub.ingestion.source.snowflake.snowflake_config import ( | ||
DatabaseId, | ||
SnowflakeShareConfig, | ||
SnowflakeV2Config, | ||
) | ||
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report | ||
from datahub.ingestion.source.snowflake.snowflake_schema import SnowflakeDatabase | ||
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin | ||
from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings | ||
from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( | ||
DatasetLineageType, | ||
Upstream, | ||
UpstreamLineage, | ||
) | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass | ||
class SharedDatabase: | ||
""" | ||
Represents shared database from current platform instance | ||
This is either created from an inbound share or included in an outbound share. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you mention that this relies on the invariant that a snowflake database can't both be in a share and the consumer of a share There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure |
||
""" | ||
|
||
name: str | ||
created_from_share: bool | ||
|
||
# This will have exactly entry if created_from_share = True | ||
shares: List[str] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, technically some database are created from share while others are used to create a share, i.e. included in share. I am okay to use "primary/is_share_source" = not (created_from_share/secondary). |
||
|
||
|
||
class SnowflakeSharesHandler(SnowflakeCommonMixin): | ||
def __init__( | ||
self, | ||
config: SnowflakeV2Config, | ||
report: SnowflakeV2Report, | ||
dataset_urn_builder: Callable[[str], str], | ||
) -> None: | ||
self.config = config | ||
self.report = report | ||
self.logger = logger | ||
self.dataset_urn_builder = dataset_urn_builder | ||
|
||
def _get_shared_databases( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overall thought the logic in this method and by users of the I think this logic could be also simplified if we did some preprocessing first. What do you think about, in the config file:
Could def have better naming, but once you have these, then I think you can get rid of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, let me check. |
||
self, shares: Dict[str, SnowflakeShareConfig], platform_instance: Optional[str] | ||
) -> Dict[str, SharedDatabase]: | ||
# this is ensured in config validators | ||
assert platform_instance is not None | ||
|
||
shared_databases: Dict[str, SharedDatabase] = {} | ||
|
||
for share_name, share_details in shares.items(): | ||
if share_details.platform_instance == platform_instance: | ||
if share_details.database not in shared_databases: | ||
shared_databases[share_details.database] = SharedDatabase( | ||
name=share_details.database, | ||
created_from_share=False, | ||
shares=[share_name], | ||
) | ||
|
||
else: | ||
shared_databases[share_details.database].shares.append(share_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the same platform instance and database really appear as inbound in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a corner case but yes. |
||
|
||
else: | ||
for consumer in share_details.consumers: | ||
if consumer.platform_instance == platform_instance: | ||
shared_databases[consumer.database] = SharedDatabase( | ||
name=share_details.database, | ||
created_from_share=True, | ||
shares=[share_name], | ||
) | ||
break | ||
else: | ||
self.report_warning( | ||
f"Skipping Share, as it does not include current platform instance {platform_instance}", | ||
share_name, | ||
) | ||
|
||
return shared_databases | ||
|
||
def get_shares_workunits( | ||
self, databases: List[SnowflakeDatabase] | ||
) -> Iterable[MetadataWorkUnit]: | ||
shared_databases = self._get_shared_databases( | ||
self.config.shares or {}, self.config.platform_instance | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. self.config is also used in other places in SnowflakeCommonMixin - primarily in deciding whether to lowercase urn. hence keeping self.config and refractoring a bit to avoid asserts. |
||
) | ||
|
||
# None of the databases are shared | ||
if not shared_databases: | ||
return | ||
|
||
logger.debug("Checking databases for inbound or outbound shares.") | ||
for db in databases: | ||
if db.name not in shared_databases: | ||
logger.debug(f"database {db.name} is not shared.") | ||
continue | ||
|
||
sibling_dbs = self.get_sibling_databases(shared_databases[db.name]) | ||
|
||
for schema in db.schemas: | ||
for table_name in schema.tables + schema.views: | ||
# TODO: If this is outbound database, | ||
# 1. attempt listing shares using `show shares` to identify name of share associated with this database (cache query result). | ||
# 2. if corresponding share is listed, then run `show grants to share <share_name>` to identify exact tables, views included in share. | ||
# 3. emit siblings only for the objects listed above. | ||
# This will work only if the configured role has accountadmin role access OR is owner of share. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not advisable to use role with "accountadmin" access hence this is not done. Also this PR takes care to hide ghost nodes in siblings relation so this is not required. |
||
# Otherwise ghost nodes may be shown in "Composed Of" section for tables/views in original database which are not granted to share. | ||
yield from self.gen_siblings( | ||
db.name, | ||
schema.name, | ||
table_name, | ||
not shared_databases[db.name].created_from_share, | ||
sibling_dbs, | ||
) | ||
|
||
if shared_databases[db.name].created_from_share: | ||
assert len(sibling_dbs) == 1 | ||
# SnowflakeLineageExtractor is unaware of database->schema->table hierarchy | ||
# hence this lineage code is not written in SnowflakeLineageExtractor | ||
# also this is not governed by configs include_table_lineage and include_view_lineage | ||
yield self.get_upstream_lineage_with_primary_sibling( | ||
db.name, schema.name, table_name, sibling_dbs[0] | ||
) | ||
|
||
self.report_missing_databases(databases, shared_databases) | ||
|
||
def get_sibling_databases(self, db: SharedDatabase) -> List[DatabaseId]: | ||
assert self.config.shares is not None | ||
sibling_dbs: List[DatabaseId] = [] | ||
if db.created_from_share: | ||
share_details = self.config.shares[db.shares[0]] | ||
logger.debug( | ||
f"database {db.name} is created from inbound share {db.shares[0]}." | ||
) | ||
sibling_dbs = [ | ||
DatabaseId(share_details.database, share_details.platform_instance) | ||
] | ||
|
||
else: # not created from share, but is in fact included in share | ||
logger.debug( | ||
f"database {db.name} is included as outbound share(s) {db.shares}." | ||
) | ||
sibling_dbs = [ | ||
consumer | ||
for share_name in db.shares | ||
for consumer in self.config.shares[share_name].consumers | ||
] | ||
|
||
return sibling_dbs | ||
|
||
def report_missing_databases( | ||
self, | ||
databases: List[SnowflakeDatabase], | ||
shared_databases: Dict[str, SharedDatabase], | ||
) -> None: | ||
db_names = [db.name for db in databases] | ||
missing_dbs = [db for db in shared_databases.keys() if db not in db_names] | ||
|
||
if missing_dbs: | ||
self.report_warning( | ||
"snowflake-shares", | ||
f"Databases {missing_dbs} were not ingested. Siblings/Lineage will not be set for these.", | ||
) | ||
|
||
def gen_siblings( | ||
self, | ||
database_name: str, | ||
schema_name: str, | ||
table_name: str, | ||
primary: bool, | ||
sibling_databases: List[DatabaseId], | ||
) -> Iterable[MetadataWorkUnit]: | ||
if not sibling_databases: | ||
return | ||
dataset_identifier = self.get_dataset_identifier( | ||
table_name, schema_name, database_name | ||
) | ||
urn = self.dataset_urn_builder(dataset_identifier) | ||
|
||
sibling_urns = [ | ||
make_dataset_urn_with_platform_instance( | ||
self.platform, | ||
self.get_dataset_identifier( | ||
table_name, schema_name, sibling_db.database | ||
), | ||
sibling_db.platform_instance, | ||
) | ||
for sibling_db in sibling_databases | ||
] | ||
|
||
yield MetadataChangeProposalWrapper( | ||
entityUrn=urn, | ||
aspect=Siblings(primary=primary, siblings=sorted(sibling_urns)), | ||
).as_workunit() | ||
|
||
def get_upstream_lineage_with_primary_sibling( | ||
self, | ||
database_name: str, | ||
schema_name: str, | ||
table_name: str, | ||
primary_sibling_db: DatabaseId, | ||
) -> MetadataWorkUnit: | ||
dataset_identifier = self.get_dataset_identifier( | ||
table_name, schema_name, database_name | ||
) | ||
urn = self.dataset_urn_builder(dataset_identifier) | ||
|
||
upstream_urn = make_dataset_urn_with_platform_instance( | ||
self.platform, | ||
self.get_dataset_identifier( | ||
table_name, schema_name, primary_sibling_db.database | ||
), | ||
primary_sibling_db.platform_instance, | ||
) | ||
|
||
return MetadataChangeProposalWrapper( | ||
entityUrn=urn, | ||
aspect=UpstreamLineage( | ||
upstreams=[Upstream(dataset=upstream_urn, type=DatasetLineageType.COPY)] | ||
), | ||
).as_workunit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have a lot of snowflake recipes, I could see how this could get tiresome to set up for every ingestion pipeline. Thoughts on having a config that could be the same for every recipe, e.g.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has crossed my mind too and I'd really like "having a config that could be the same for every recipe". The only downside is we require additional unused information - e.g. share name "X" and it is possible that some of the shares config will not be relevant for some account recipes, making validation and probable errors hard to find. I feel, the ease of using same shares config outweighs the downsides so let me think more on this and update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the config to use similar structure as your example, except that using term
consumers
instead ofoutbounds
. Outbound is relative and can be confusing term. Consumer has a peculiar meaning for snowflake shares and hence unambiguous.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't want to have them specify the share name, we can also do:
but maybe that's more confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I like the one with share name better - more precise and readable. Also shares do have unique names across accounts. This change primarily makes shares configuration absolute and exhaustive for all accounts and configurations need not be thought of wrt the particular account/recipe.