diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index af99faf6e6396..d5aaa857e4d6e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -23,6 +23,7 @@ SnowflakeConfig, ) from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsageConfig +from datahub.utilities.global_warning_util import add_global_warning logger = logging.Logger(__name__) @@ -156,6 +157,15 @@ class SnowflakeV2Config( description="Format user urns as an email, if the snowflake user's email is set. If `email_domain` is provided, generates email addresses for snowflake users with unset emails, based on their username.", ) + @validator("convert_urns_to_lowercase") + def validate_convert_urns_to_lowercase(cls, v): + if not v: + add_global_warning( + "Please use `convert_urns_to_lowercase: True`, otherwise lineage may not work correctly." + ) + + return v + @validator("include_column_lineage") def validate_include_column_lineage(cls, v, values): if not values.get("include_table_lineage") and v: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index cee3a2926520f..9a993f5774032 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -404,7 +404,9 @@ def _populate_external_lineage_map(self, discovered_tables: List[str]) -> None: # Handles the case for explicitly created external tables. # NOTE: Snowflake does not log this information to the access_history table. - def _populate_external_lineage_from_show_query(self, discovered_tables): + def _populate_external_lineage_from_show_query( + self, discovered_tables: List[str] + ) -> None: external_tables_query: str = SnowflakeQuery.show_external_tables() try: for db_row in self.query(external_tables_query): @@ -455,7 +457,9 @@ def _populate_external_lineage_from_copy_history( ) self.report_status(EXTERNAL_LINEAGE, False) - def _process_external_lineage_result_row(self, db_row, discovered_tables): + def _process_external_lineage_result_row( + self, db_row: dict, discovered_tables: List[str] + ) -> None: # key is the down-stream table name key: str = self.get_dataset_identifier_from_qualified_name( db_row["DOWNSTREAM_TABLE_NAME"] @@ -475,7 +479,7 @@ def _process_external_lineage_result_row(self, db_row, discovered_tables): f"ExternalLineage[Table(Down)={key}]:External(Up)={self._external_lineage_map[key]} via access_history" ) - def _fetch_upstream_lineages_for_tables(self): + def _fetch_upstream_lineages_for_tables(self) -> Iterable[Dict]: query: str = SnowflakeQuery.table_to_table_lineage_history_v2( start_time_millis=int(self.start_time.timestamp() * 1000), end_time_millis=int(self.end_time.timestamp() * 1000), @@ -498,7 +502,9 @@ def _fetch_upstream_lineages_for_tables(self): ) self.report_status(TABLE_LINEAGE, False) - def map_query_result_upstreams(self, upstream_tables): + def map_query_result_upstreams( + self, upstream_tables: Optional[List[dict]] + ) -> List[UpstreamClass]: if not upstream_tables: return [] upstreams: List[UpstreamClass] = [] @@ -510,7 +516,9 @@ def map_query_result_upstreams(self, upstream_tables): logger.debug(e, exc_info=e) return upstreams - def _process_add_single_upstream(self, upstreams, upstream_table): + def _process_add_single_upstream( + self, upstreams: List[UpstreamClass], upstream_table: dict + ) -> None: upstream_name = self.get_dataset_identifier_from_qualified_name( upstream_table["upstream_object_name"] ) @@ -524,7 +532,9 @@ def _process_add_single_upstream(self, upstreams, upstream_table): ) ) - def map_query_result_fine_upstreams(self, dataset_urn, column_wise_upstreams): + def map_query_result_fine_upstreams( + self, dataset_urn: str, column_wise_upstreams: Optional[List[dict]] + ) -> List[FineGrainedLineage]: if not column_wise_upstreams: return [] fine_upstreams: List[FineGrainedLineage] = [] @@ -539,8 +549,11 @@ def map_query_result_fine_upstreams(self, dataset_urn, column_wise_upstreams): return fine_upstreams def _process_add_single_column_upstream( - self, dataset_urn, fine_upstreams, column_with_upstreams - ): + self, + dataset_urn: str, + fine_upstreams: List[FineGrainedLineage], + column_with_upstreams: Dict, + ) -> None: column_name = column_with_upstreams["column_name"] upstream_jobs = column_with_upstreams["upstreams"] if column_name and upstream_jobs: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index a64921ea01759..d041d219c4bdd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -245,7 +245,9 @@ def _get_workunits_internal( yield from self.build_usage_statistics_for_dataset(dataset_identifier, row) - def build_usage_statistics_for_dataset(self, dataset_identifier, row): + def build_usage_statistics_for_dataset( + self, dataset_identifier: str, row: dict + ) -> Iterable[MetadataWorkUnit]: try: stats = DatasetUsageStatistics( timestampMillis=int(row["BUCKET_START_TIME"].timestamp() * 1000), @@ -357,7 +359,7 @@ def _make_operations_query(self) -> str: end_time = int(self.end_time.timestamp() * 1000) return SnowflakeQuery.operational_data_for_time_window(start_time, end_time) - def _check_usage_date_ranges(self) -> Any: + def _check_usage_date_ranges(self) -> None: with PerfTimer() as timer: try: results = self.query(SnowflakeQuery.get_access_history_date_range()) @@ -477,7 +479,7 @@ def _process_snowflake_history_row( f"Failed to parse operation history row {event_dict}, {e}", ) - def parse_event_objects(self, event_dict): + def parse_event_objects(self, event_dict: Dict) -> None: event_dict["BASE_OBJECTS_ACCESSED"] = [ obj for obj in json.loads(event_dict["BASE_OBJECTS_ACCESSED"]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index e561ed0e2d146..811ea67981e18 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -1165,7 +1165,7 @@ def gen_schema_metadata( foreign_keys: Optional[List[ForeignKeyConstraint]] = None if isinstance(table, SnowflakeTable) and len(table.foreign_keys) > 0: - foreign_keys = self.build_foreign_keys(table, dataset_urn, foreign_keys) + foreign_keys = self.build_foreign_keys(table, dataset_urn) schema_metadata = SchemaMetadata( schemaName=dataset_name, @@ -1211,7 +1211,9 @@ def gen_schema_metadata( return schema_metadata - def build_foreign_keys(self, table, dataset_urn, foreign_keys): + def build_foreign_keys( + self, table: SnowflakeTable, dataset_urn: str + ) -> List[ForeignKeyConstraint]: foreign_keys = [] for fk in table.foreign_keys: foreign_dataset = make_dataset_urn( @@ -1428,7 +1430,7 @@ def get_fk_constraints_for_table( # Access to table but none of its constraints - is this possible ? return constraints.get(table_name, []) - def add_config_to_report(self): + def add_config_to_report(self) -> None: self.report.cleaned_account_id = self.config.get_account() self.report.ignore_start_time_lineage = self.config.ignore_start_time_lineage self.report.upstream_lineage_in_report = self.config.upstream_lineage_in_report @@ -1481,7 +1483,9 @@ def inspect_session_metadata(self) -> None: # that would be expensive, hence not done. To compensale for possibility # of some null values in collected sample, we fetch extra (20% more) # rows than configured sample_size. - def get_sample_values_for_table(self, table_name, schema_name, db_name): + def get_sample_values_for_table( + self, table_name: str, schema_name: str, db_name: str + ) -> pd.DataFrame: # Create a cursor object. logger.debug( f"Collecting sample values for table {db_name}.{schema_name}.{table_name}" @@ -1562,7 +1566,7 @@ def get_snowsight_base_url(self) -> Optional[str]: ) return None - def is_standard_edition(self): + def is_standard_edition(self) -> bool: try: self.query(SnowflakeQuery.show_tags()) return False @@ -1571,7 +1575,7 @@ def is_standard_edition(self): return True raise - def _snowflake_clear_ocsp_cache(self): + def _snowflake_clear_ocsp_cache(self) -> None: # Because of some issues with the Snowflake Python connector, we wipe the OCSP cache. # # Why is this necessary: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index ec0af37089b1d..8d7419a56e4f1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -138,6 +138,7 @@ ) from datahub.utilities import config_clean from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult +from datahub.utilities.urns.dataset_urn import DatasetUrn logger: logging.Logger = logging.getLogger(__name__) @@ -504,7 +505,7 @@ def close(self) -> None: self.server = None super().close() - def _populate_usage_stat_registry(self): + def _populate_usage_stat_registry(self) -> None: if self.server is None: return @@ -645,7 +646,7 @@ def _init_workbook_registry(self) -> None: continue self.workbook_project_map[wb.id] = wb.project_id - def _populate_projects_registry(self): + def _populate_projects_registry(self) -> None: if self.server is None: return @@ -666,7 +667,7 @@ def _populate_projects_registry(self): f"Tableau workbooks {self.workbook_project_map}", ) - def _authenticate(self): + def _authenticate(self) -> None: try: self.server = self.config.make_tableau_client() logger.info("Authenticated to Tableau server") @@ -841,16 +842,14 @@ def _track_custom_sql_ids(self, field: dict) -> None: def _create_upstream_table_lineage( self, datasource: dict, - browse_path: str, + browse_path: Optional[str], is_embedded_ds: bool = False, ) -> Tuple: upstream_tables: List[Upstream] = [] fine_grained_lineages: List[FineGrainedLineage] = [] table_id_to_urn = {} - upstream_datasources = self.get_upstream_datasources( - datasource, upstream_tables - ) + upstream_datasources = self.get_upstream_datasources(datasource) upstream_tables.extend(upstream_datasources) # When tableau workbook connects to published datasource, it creates an embedded @@ -875,7 +874,7 @@ def _create_upstream_table_lineage( # This adds an edge to upstream CustomSQLTables using `fields`.`upstreamColumns`.`table` csql_upstreams, csql_id_to_urn = self.get_upstream_csql_tables( - datasource.get(tableau_constant.FIELDS), + datasource.get(tableau_constant.FIELDS) or [], ) upstream_tables.extend(csql_upstreams) table_id_to_urn.update(csql_id_to_urn) @@ -914,7 +913,7 @@ def _create_upstream_table_lineage( return upstream_tables, fine_grained_lineages - def get_upstream_datasources(self, datasource, upstream_tables): + def get_upstream_datasources(self, datasource: dict) -> List[Upstream]: upstream_tables = [] for ds in datasource.get(tableau_constant.UPSTREAM_DATA_SOURCES, []): if ds[tableau_constant.ID] not in self.datasource_ids_being_used: @@ -933,14 +932,16 @@ def get_upstream_datasources(self, datasource, upstream_tables): upstream_tables.append(upstream_table) return upstream_tables - def get_upstream_csql_tables(self, fields): + def get_upstream_csql_tables( + self, fields: List[dict] + ) -> Tuple[List[Upstream], Dict[str, str]]: upstream_csql_urns = set() csql_id_to_urn = {} for field in fields: if not field.get(tableau_constant.UPSTREAM_COLUMNS): continue - for upstream_col in field.get(tableau_constant.UPSTREAM_COLUMNS): + for upstream_col in field[tableau_constant.UPSTREAM_COLUMNS]: if ( upstream_col and upstream_col.get(tableau_constant.TABLE) @@ -968,7 +969,13 @@ def get_upstream_csql_tables(self, fields): for csql_urn in upstream_csql_urns ], csql_id_to_urn - def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sql): + def get_upstream_tables( + self, + tables: List[dict], + datasource_name: Optional[str], + browse_path: Optional[str], + is_custom_sql: bool, + ) -> tuple[List[Upstream], Dict[str, str]]: upstream_tables = [] # Same table urn can be used when setting fine grained lineage, table_id_to_urn: Dict[str, str] = {} @@ -1056,12 +1063,12 @@ def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sq def get_upstream_columns_of_fields_in_datasource( self, - datasource, - datasource_urn, - table_id_to_urn, - ): + datasource: dict, + datasource_urn: str, + table_id_to_urn: Dict[str, str], + ) -> List[FineGrainedLineage]: fine_grained_lineages = [] - for field in datasource.get(tableau_constant.FIELDS): + for field in datasource.get(tableau_constant.FIELDS) or []: field_name = field.get(tableau_constant.NAME) # upstreamColumns lineage will be set via upstreamFields. # such as for CalculatedField @@ -1086,9 +1093,12 @@ def get_upstream_columns_of_fields_in_datasource( and upstream_table_id and upstream_table_id in table_id_to_urn.keys() ): + parent_dataset_urn = table_id_to_urn[upstream_table_id] + if self.is_snowflake_urn(parent_dataset_urn): + name = name.lower() input_columns.append( builder.make_schema_field_urn( - parent_urn=table_id_to_urn[upstream_table_id], + parent_urn=parent_dataset_urn, field_path=name, ) ) @@ -1107,9 +1117,19 @@ def get_upstream_columns_of_fields_in_datasource( return fine_grained_lineages - def get_upstream_fields_of_field_in_datasource(self, datasource, datasource_urn): + def is_snowflake_urn(self, urn: str) -> bool: + return ( + DatasetUrn.create_from_string(urn) + .get_data_platform_urn() + .get_platform_name() + == "snowflake" + ) + + def get_upstream_fields_of_field_in_datasource( + self, datasource: dict, datasource_urn: str + ) -> List[FineGrainedLineage]: fine_grained_lineages = [] - for field in datasource.get(tableau_constant.FIELDS): + for field in datasource.get(tableau_constant.FIELDS) or []: field_name = field.get(tableau_constant.NAME) # It is observed that upstreamFields gives one-hop field # lineage, and not multi-hop field lineage @@ -1205,7 +1225,7 @@ def get_upstream_fields_from_custom_sql( return fine_grained_lineages - def get_transform_operation(self, field): + def get_transform_operation(self, field: dict) -> str: field_type = field[tableau_constant.TYPE_NAME] if field_type in ( tableau_constant.DATA_SOURCE_FIELD, @@ -1381,7 +1401,7 @@ def get_schema_metadata_for_custom_sql( ) return schema_metadata - def _get_published_datasource_project_luid(self, ds): + def _get_published_datasource_project_luid(self, ds: dict) -> Optional[str]: # This is fallback in case "get all datasources" query fails for some reason. # It is possible due to https://github.com/tableau/server-client-python/issues/1210 if ( @@ -1430,7 +1450,7 @@ def _query_published_datasource_for_project_luid(self, ds_luid: str) -> None: ) logger.debug("Error stack trace", exc_info=True) - def _get_workbook_project_luid(self, wb): + def _get_workbook_project_luid(self, wb: dict) -> Optional[str]: if wb.get(tableau_constant.LUID) and self.workbook_project_map.get( wb[tableau_constant.LUID] ): @@ -1440,11 +1460,12 @@ def _get_workbook_project_luid(self, wb): return None - def _get_embedded_datasource_project_luid(self, ds): + def _get_embedded_datasource_project_luid(self, ds: dict) -> Optional[str]: if ds.get(tableau_constant.WORKBOOK): project_luid: Optional[str] = self._get_workbook_project_luid( - ds.get(tableau_constant.WORKBOOK) + ds[tableau_constant.WORKBOOK] ) + if project_luid and project_luid in self.tableau_project_registry: return project_luid @@ -1454,7 +1475,7 @@ def _get_embedded_datasource_project_luid(self, ds): return None - def _get_datasource_project_luid(self, ds): + def _get_datasource_project_luid(self, ds: dict) -> Optional[str]: # Only published and embedded data-sources are supported ds_type: Optional[str] = ds.get(tableau_constant.TYPE_NAME) if ds_type not in ( @@ -1486,7 +1507,7 @@ def _get_datasource_project_name(ds: dict) -> Optional[str]: return ds.get(tableau_constant.PROJECT_NAME) return None - def _get_project_browse_path_name(self, ds): + def _get_project_browse_path_name(self, ds: dict) -> Optional[str]: if self.config.extract_project_hierarchy is False: # backward compatibility. Just return the name of datasource project return self._get_datasource_project_name(ds) @@ -1494,9 +1515,8 @@ def _get_project_browse_path_name(self, ds): # form path as per nested project structure project_luid = self._get_datasource_project_luid(ds) if project_luid is None: - datasource_name: str = ds.get(tableau_constant.NAME) logger.warning( - f"Could not load project hierarchy for datasource {datasource_name}. Please check permissions." + f"Could not load project hierarchy for datasource {ds.get(tableau_constant.NAME)}. Please check permissions." ) logger.debug(f"datasource = {ds}") return None @@ -1509,7 +1529,7 @@ def _create_lineage_to_upstream_tables( # This adds an edge to upstream DatabaseTables using `upstreamTables` upstream_tables, _ = self.get_upstream_tables( tables, - datasource.get(tableau_constant.NAME), + datasource.get(tableau_constant.NAME) or "", self._get_project_browse_path_name(datasource), is_custom_sql=True, ) @@ -1541,7 +1561,6 @@ def parse_custom_sql( ] ], ) -> Optional["SqlParsingResult"]: - database_info = datasource.get(tableau_constant.DATABASE) or {} if datasource.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False): @@ -1593,7 +1612,6 @@ def parse_custom_sql( def _create_lineage_from_unsupported_csql( self, csql_urn: str, csql: dict ) -> Iterable[MetadataWorkUnit]: - parsed_result = self.parse_custom_sql( datasource=csql, datasource_urn=csql_urn, @@ -1812,7 +1830,9 @@ def emit_datasource( def get_custom_props_from_dict(self, obj: dict, keys: List[str]) -> Optional[dict]: return {key: str(obj[key]) for key in keys if obj.get(key)} or None - def _get_datasource_container_key(self, datasource, workbook, is_embedded_ds): + def _get_datasource_container_key( + self, datasource: dict, workbook: Optional[dict], is_embedded_ds: bool + ) -> Optional[ContainerKey]: container_key: Optional[ContainerKey] = None if is_embedded_ds: # It is embedded then parent is container is workbook if workbook is not None: @@ -2072,20 +2092,8 @@ def emit_sheets_as_charts( if wu is not None: yield wu - project_luid: Optional[str] = self._get_workbook_project_luid(workbook) - - if ( - workbook is not None - and project_luid - and project_luid in self.tableau_project_registry - and workbook.get(tableau_constant.NAME) - ): - browse_paths = BrowsePathsClass( - paths=[ - f"/{self.platform}/{self._project_luid_to_browse_path_name(project_luid)}" - f"/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" - ] - ) + browse_paths = self.get_browse_paths_aspect(workbook) + if browse_paths: chart_snapshot.aspects.append(browse_paths) else: logger.warning( @@ -2221,7 +2229,7 @@ def gen_workbook_key(self, workbook_id: str) -> WorkbookKey: workbook_id=workbook_id, ) - def gen_project_key(self, project_luid): + def gen_project_key(self, project_luid: str) -> ProjectKey: return ProjectKey( platform=self.platform, instance=self.config.platform_instance, @@ -2378,32 +2386,8 @@ def emit_dashboard( if wu is not None: yield wu - project_luid: Optional[str] = self._get_workbook_project_luid(workbook) - if ( - workbook is not None - and project_luid - and project_luid in self.tableau_project_registry - and workbook.get(tableau_constant.NAME) - ): - browse_paths = BrowsePathsClass( - paths=[ - f"/{self.platform}/{self._project_luid_to_browse_path_name(project_luid)}" - f"/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" - ] - ) - dashboard_snapshot.aspects.append(browse_paths) - elif ( - workbook is not None - and workbook.get(tableau_constant.PROJECT_NAME) - and workbook.get(tableau_constant.NAME) - ): - # browse path - browse_paths = BrowsePathsClass( - paths=[ - f"/{self.platform}/{workbook[tableau_constant.PROJECT_NAME].replace('/', REPLACE_SLASH_CHAR)}" - f"/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" - ] - ) + browse_paths = self.get_browse_paths_aspect(workbook) + if browse_paths: dashboard_snapshot.aspects.append(browse_paths) else: logger.warning( @@ -2432,6 +2416,31 @@ def emit_dashboard( dashboard_snapshot.urn, ) + def get_browse_paths_aspect( + self, workbook: Optional[Dict] + ) -> Optional[BrowsePathsClass]: + browse_paths: Optional[BrowsePathsClass] = None + if workbook and workbook.get(tableau_constant.NAME): + project_luid: Optional[str] = self._get_workbook_project_luid(workbook) + if project_luid in self.tableau_project_registry: + browse_paths = BrowsePathsClass( + paths=[ + f"/{self.platform}/{self._project_luid_to_browse_path_name(project_luid)}" + f"/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" + ] + ) + + elif workbook.get(tableau_constant.PROJECT_NAME): + # browse path + browse_paths = BrowsePathsClass( + paths=[ + f"/{self.platform}/{workbook[tableau_constant.PROJECT_NAME].replace('/', REPLACE_SLASH_CHAR)}" + f"/{workbook[tableau_constant.NAME].replace('/', REPLACE_SLASH_CHAR)}" + ] + ) + + return browse_paths + def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]: datasource_filter = f"{tableau_constant.ID_WITH_IN}: {json.dumps(self.embedded_datasource_ids_being_used)}"