Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): initialize schema resolver from datahub for l… #8903

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from datetime import datetime
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type

from avro.schema import RecordSchema
from deprecated import deprecated
Expand Down Expand Up @@ -993,14 +993,13 @@ def _make_schema_resolver(

def initialize_schema_resolver_from_datahub(
self, platform: str, platform_instance: Optional[str], env: str
) -> Tuple["SchemaResolver", Set[str]]:
) -> "SchemaResolver":
logger.info("Initializing schema resolver")
schema_resolver = self._make_schema_resolver(
platform, platform_instance, env, include_graph=False
)

logger.info(f"Fetching schemas for platform {platform}, env {env}")
urns = []
count = 0
with PerfTimer() as timer:
for urn, schema_info in self._bulk_fetch_schema_info_by_filter(
Expand All @@ -1009,7 +1008,6 @@ def initialize_schema_resolver_from_datahub(
env=env,
):
try:
urns.append(urn)
schema_resolver.add_graphql_schema_metadata(urn, schema_info)
count += 1
except Exception:
Expand All @@ -1024,7 +1022,7 @@ def initialize_schema_resolver_from_datahub(
)

logger.info("Finished initializing schema resolver")
return schema_resolver, set(urns)
return schema_resolver

def parse_sql_lineage(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def _init_schema_resolver(self) -> SchemaResolver:
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)[0]
)
else:
logger.warning(
"Failed to load schema info from DataHub as DataHubGraph is missing.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class SnowflakeV2Config(
)

include_view_column_lineage: bool = Field(
default=False,
description="Populates view->view and table->view column lineage.",
default=True,
description="Populates view->view and table->view column lineage using DataHub's sql parser.",
)

_check_role_grants_removed = pydantic_removed_field("check_role_grants")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,11 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
# Caches tables for a single database. Consider moving to disk or S3 when possible.
self.db_tables: Dict[str, List[SnowflakeTable]] = {}

self.sql_parser_schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.view_definitions: FileBackedDict[str] = FileBackedDict()
self.add_config_to_report()

self.sql_parser_schema_resolver = self._init_schema_resolver()

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = SnowflakeV2Config.parse_obj(config_dict)
Expand Down Expand Up @@ -493,6 +490,24 @@ def query(query):

return _report

def _init_schema_resolver(self) -> SchemaResolver:
if not self.config.include_technical_schema and self.config.parse_view_ddl:
if self.ctx.graph:
return self.ctx.graph.initialize_schema_resolver_from_datahub(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of initialize_schema_resolver_from_datahub returning Tuple[SchemaResolver, set[urns]], can we change it so that it only returns SchemaResolver, and in turn SchemaResolver has a method to get a set of loaded urns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. What would we call this method in SchemaResolver ? get_urns ?

platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
else:
logger.warning(
"Failed to load schema info from DataHub as DataHubGraph is missing.",
)
return SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
Expand Down Expand Up @@ -764,7 +779,7 @@ def _process_schema(
)
self.db_tables[schema_name] = tables

if self.config.include_technical_schema or self.config.parse_view_ddl:
if self.config.include_technical_schema:
for table in tables:
yield from self._process_table(table, schema_name, db_name)

Expand All @@ -776,7 +791,7 @@ def _process_schema(
if view.view_definition:
self.view_definitions[key] = view.view_definition

if self.config.include_technical_schema or self.config.parse_view_ddl:
if self.config.include_technical_schema:
for view in views:
yield from self._process_view(view, schema_name, db_name)

Expand Down Expand Up @@ -892,8 +907,6 @@ def _process_table(
yield from self._process_tag(tag)

yield from self.gen_dataset_workunits(table, schema_name, db_name)
elif self.config.parse_view_ddl:
self.gen_schema_metadata(table, schema_name, db_name)

def fetch_sample_data_for_classification(
self, table: SnowflakeTable, schema_name: str, db_name: str, dataset_name: str
Expand Down Expand Up @@ -1004,8 +1017,6 @@ def _process_view(
yield from self._process_tag(tag)

yield from self.gen_dataset_workunits(view, schema_name, db_name)
elif self.config.parse_view_ddl:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these changes confuse me a bit - are we dropping parse_view_ddl, or is that functionality handled elsewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These conditions were part of schema extraction process, such that - even if schema ingestion is disabled but parse_view_ddl is enabled, the code to fetch and generate schema metadata from snowflake would run, in order for that to be used during view definitions cll extraction.

Actual view definition parsing logic is in snowflake_lineage_v2.py and the only thing it requires is view_definitions to be populated. This code is still intact.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok that broadly makes sense

self.gen_schema_metadata(view, schema_name, db_name)

def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
tag_identifier = tag.identifier()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,12 @@ def __init__(self, ctx: PipelineContext, config: SqlQueriesSourceConfig):
self.builder = SqlParsingBuilder(usage_config=self.config.usage)

if self.config.use_schema_resolver:
schema_resolver, urns = self.graph.initialize_schema_resolver_from_datahub(
self.schema_resolver = self.graph.initialize_schema_resolver_from_datahub(
platform=self.config.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.schema_resolver = schema_resolver
self.urns = urns
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.config.platform,
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ def __init__(
shared_connection=shared_conn,
)

def get_urns(self) -> Set[str]:
return set(self._schema_cache.keys())

def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str:
# TODO: Validate that this is the correct 2/3 layer hierarchy for the platform.

Expand Down Expand Up @@ -390,8 +393,6 @@ def convert_graphql_schema_metadata_to_info(
)
}

# TODO add a method to load all from graphql

def close(self) -> None:
self._schema_cache.close()

Expand Down
Loading