Skip to content

Commit

Permalink
fix(ingest/tableau): fix tableau native CLL for snowflake
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate committed Sep 4, 2023
1 parent 801208e commit 1d26c77
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
SnowflakeConfig,
)
from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsageConfig
from datahub.utilities.global_warning_util import add_global_warning

logger = logging.Logger(__name__)

Expand Down Expand Up @@ -156,6 +157,15 @@ class SnowflakeV2Config(
description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is provided, generates email addresses for snowflake users with unset emails, based on their username.",
)

@validator("convert_urns_to_lowercase")
def validate_convert_urns_to_lowercase(cls, v):
if not v:
add_global_warning(
"Please use `convert_urns_to_lowercase: True`, otherwise lineage may not work correctly."
)

return v

@validator("include_column_lineage")
def validate_include_column_lineage(cls, v, values):
if not values.get("include_table_lineage") and v:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@ def _populate_external_lineage_map(self, discovered_tables: List[str]) -> None:

# Handles the case for explicitly created external tables.
# NOTE: Snowflake does not log this information to the access_history table.
def _populate_external_lineage_from_show_query(self, discovered_tables):
def _populate_external_lineage_from_show_query(
self, discovered_tables: List[str]
) -> None:
external_tables_query: str = SnowflakeQuery.show_external_tables()
try:
for db_row in self.query(external_tables_query):
Expand Down Expand Up @@ -455,7 +457,9 @@ def _populate_external_lineage_from_copy_history(
)
self.report_status(EXTERNAL_LINEAGE, False)

def _process_external_lineage_result_row(self, db_row, discovered_tables):
def _process_external_lineage_result_row(
self, db_row: dict, discovered_tables: List[str]
) -> None:
# key is the down-stream table name
key: str = self.get_dataset_identifier_from_qualified_name(
db_row["DOWNSTREAM_TABLE_NAME"]
Expand All @@ -475,7 +479,7 @@ def _process_external_lineage_result_row(self, db_row, discovered_tables):
f"ExternalLineage[Table(Down)={key}]:External(Up)={self._external_lineage_map[key]} via access_history"
)

def _fetch_upstream_lineages_for_tables(self):
def _fetch_upstream_lineages_for_tables(self) -> Iterable[Dict]:
query: str = SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=int(self.start_time.timestamp() * 1000),
end_time_millis=int(self.end_time.timestamp() * 1000),
Expand All @@ -498,7 +502,9 @@ def _fetch_upstream_lineages_for_tables(self):
)
self.report_status(TABLE_LINEAGE, False)

def map_query_result_upstreams(self, upstream_tables):
def map_query_result_upstreams(
self, upstream_tables: Optional[List[dict]]
) -> List[UpstreamClass]:
if not upstream_tables:
return []
upstreams: List[UpstreamClass] = []
Expand All @@ -510,7 +516,9 @@ def map_query_result_upstreams(self, upstream_tables):
logger.debug(e, exc_info=e)
return upstreams

def _process_add_single_upstream(self, upstreams, upstream_table):
def _process_add_single_upstream(
self, upstreams: List[UpstreamClass], upstream_table: dict
) -> None:
upstream_name = self.get_dataset_identifier_from_qualified_name(
upstream_table["upstream_object_name"]
)
Expand All @@ -524,7 +532,9 @@ def _process_add_single_upstream(self, upstreams, upstream_table):
)
)

def map_query_result_fine_upstreams(self, dataset_urn, column_wise_upstreams):
def map_query_result_fine_upstreams(
self, dataset_urn: str, column_wise_upstreams: Optional[List[dict]]
) -> List[FineGrainedLineage]:
if not column_wise_upstreams:
return []
fine_upstreams: List[FineGrainedLineage] = []
Expand All @@ -539,8 +549,11 @@ def map_query_result_fine_upstreams(self, dataset_urn, column_wise_upstreams):
return fine_upstreams

def _process_add_single_column_upstream(
self, dataset_urn, fine_upstreams, column_with_upstreams
):
self,
dataset_urn: str,
fine_upstreams: List[FineGrainedLineage],
column_with_upstreams: Dict,
) -> None:
column_name = column_with_upstreams["column_name"]
upstream_jobs = column_with_upstreams["upstreams"]
if column_name and upstream_jobs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ def _get_workunits_internal(

yield from self.build_usage_statistics_for_dataset(dataset_identifier, row)

def build_usage_statistics_for_dataset(self, dataset_identifier, row):
def build_usage_statistics_for_dataset(
self, dataset_identifier: str, row: dict
) -> Iterable[MetadataWorkUnit]:
try:
stats = DatasetUsageStatistics(
timestampMillis=int(row["BUCKET_START_TIME"].timestamp() * 1000),
Expand Down Expand Up @@ -357,7 +359,7 @@ def _make_operations_query(self) -> str:
end_time = int(self.end_time.timestamp() * 1000)
return SnowflakeQuery.operational_data_for_time_window(start_time, end_time)

def _check_usage_date_ranges(self) -> Any:
def _check_usage_date_ranges(self) -> None:
with PerfTimer() as timer:
try:
results = self.query(SnowflakeQuery.get_access_history_date_range())
Expand Down Expand Up @@ -477,7 +479,7 @@ def _process_snowflake_history_row(
f"Failed to parse operation history row {event_dict}, {e}",
)

def parse_event_objects(self, event_dict):
def parse_event_objects(self, event_dict: Dict) -> None:
event_dict["BASE_OBJECTS_ACCESSED"] = [
obj
for obj in json.loads(event_dict["BASE_OBJECTS_ACCESSED"])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ def gen_schema_metadata(

foreign_keys: Optional[List[ForeignKeyConstraint]] = None
if isinstance(table, SnowflakeTable) and len(table.foreign_keys) > 0:
foreign_keys = self.build_foreign_keys(table, dataset_urn, foreign_keys)
foreign_keys = self.build_foreign_keys(table, dataset_urn)

schema_metadata = SchemaMetadata(
schemaName=dataset_name,
Expand Down Expand Up @@ -1211,7 +1211,9 @@ def gen_schema_metadata(

return schema_metadata

def build_foreign_keys(self, table, dataset_urn, foreign_keys):
def build_foreign_keys(
self, table: SnowflakeTable, dataset_urn: str
) -> List[ForeignKeyConstraint]:
foreign_keys = []
for fk in table.foreign_keys:
foreign_dataset = make_dataset_urn(
Expand Down Expand Up @@ -1428,7 +1430,7 @@ def get_fk_constraints_for_table(
# Access to table but none of its constraints - is this possible ?
return constraints.get(table_name, [])

def add_config_to_report(self):
def add_config_to_report(self) -> None:
self.report.cleaned_account_id = self.config.get_account()
self.report.ignore_start_time_lineage = self.config.ignore_start_time_lineage
self.report.upstream_lineage_in_report = self.config.upstream_lineage_in_report
Expand Down Expand Up @@ -1481,7 +1483,9 @@ def inspect_session_metadata(self) -> None:
# that would be expensive, hence not done. To compensale for possibility
# of some null values in collected sample, we fetch extra (20% more)
# rows than configured sample_size.
def get_sample_values_for_table(self, table_name, schema_name, db_name):
def get_sample_values_for_table(
self, table_name: str, schema_name: str, db_name: str
) -> pd.DataFrame:
# Create a cursor object.
logger.debug(
f"Collecting sample values for table {db_name}.{schema_name}.{table_name}"
Expand Down Expand Up @@ -1562,7 +1566,7 @@ def get_snowsight_base_url(self) -> Optional[str]:
)
return None

def is_standard_edition(self):
def is_standard_edition(self) -> bool:
try:
self.query(SnowflakeQuery.show_tags())
return False
Expand All @@ -1571,7 +1575,7 @@ def is_standard_edition(self):
return True
raise

def _snowflake_clear_ocsp_cache(self):
def _snowflake_clear_ocsp_cache(self) -> None:
# Because of some issues with the Snowflake Python connector, we wipe the OCSP cache.
#
# Why is this necessary:
Expand Down
Loading

0 comments on commit 1d26c77

Please sign in to comment.