Skip to content

Commit

Permalink
simplify signature of initialize_schema_resolver_from_datahub
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Sep 28, 2023
1 parent 12a7bfe commit d28b157
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 11 deletions.
8 changes: 3 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dataclasses import dataclass
from datetime import datetime
from json.decoder import JSONDecodeError
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Tuple, Type

from avro.schema import RecordSchema
from deprecated import deprecated
Expand Down Expand Up @@ -1148,14 +1148,13 @@ def _make_schema_resolver(

def initialize_schema_resolver_from_datahub(
self, platform: str, platform_instance: Optional[str], env: str
) -> Tuple["SchemaResolver", Set[str]]:
) -> "SchemaResolver":
logger.info("Initializing schema resolver")
schema_resolver = self._make_schema_resolver(
platform, platform_instance, env, include_graph=False
)

logger.info(f"Fetching schemas for platform {platform}, env {env}")
urns = []
count = 0
with PerfTimer() as timer:
for urn, schema_info in self._bulk_fetch_schema_info_by_filter(
Expand All @@ -1164,7 +1163,6 @@ def initialize_schema_resolver_from_datahub(
env=env,
):
try:
urns.append(urn)
schema_resolver.add_graphql_schema_metadata(urn, schema_info)
count += 1
except Exception:
Expand All @@ -1179,7 +1177,7 @@ def initialize_schema_resolver_from_datahub(
)

logger.info("Finished initializing schema resolver")
return schema_resolver, set(urns)
return schema_resolver

def parse_sql_lineage(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ def _init_schema_resolver(self) -> SchemaResolver:
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)[0]
)
else:
logger.warning(
"Failed to load schema info from DataHub as DataHubGraph is missing.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def _init_schema_resolver(self) -> SchemaResolver:
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)[0]
)
else:
logger.warning(
"Failed to load schema info from DataHub as DataHubGraph is missing.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ def __init__(self, ctx: PipelineContext, config: SqlQueriesSourceConfig):
self.builder = SqlParsingBuilder(usage_config=self.config.usage)

if self.config.use_schema_resolver:
schema_resolver, urns = self.graph.initialize_schema_resolver_from_datahub(
schema_resolver = self.graph.initialize_schema_resolver_from_datahub(
platform=self.config.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.schema_resolver = schema_resolver
self.urns = urns
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.config.platform,
Expand Down
5 changes: 3 additions & 2 deletions metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ def __init__(
shared_connection=shared_conn,
)

def get_urns(self) -> Set[str]:
return set(self._schema_cache.keys())

def get_urn_for_table(self, table: _TableName, lower: bool = False) -> str:
# TODO: Validate that this is the correct 2/3 layer hierarchy for the platform.

Expand Down Expand Up @@ -390,8 +393,6 @@ def convert_graphql_schema_metadata_to_info(
)
}

# TODO add a method to load all from graphql

def close(self) -> None:
self._schema_cache.close()

Expand Down

0 comments on commit d28b157

Please sign in to comment.