diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index cb4e15c6ede5c..a8753509ea019 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -142,6 +142,9 @@ SubTypesClass, ViewPropertiesClass, ) +from datahub.sql_parsing.sql_parsing_result_utils import ( + transform_parsing_result_to_in_tables_schemas, +) from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, SqlParsingResult, @@ -375,6 +378,17 @@ class TableauConfig( description="[Experimental] Whether to extract lineage from unsupported custom sql queries using SQL parsing", ) + force_extraction_of_lineage_from_custom_sql_queries: bool = Field( + default=False, + description="[Experimental] Force extraction of lineage from custom sql queries using SQL parsing, ignoring Tableau metadata", + ) + + sql_parsing_disable_schema_awareness: bool = Field( + default=False, + description="[Experimental] Ignore pre ingested tables schemas during parsing of SQL queries " + "(allows to workaround ingestion errors when pre ingested schema and queries are out of sync)", + ) + # pre = True because we want to take some decision before pydantic initialize the configuration to default values @root_validator(pre=True) def projects_backward_compatibility(cls, values: Dict) -> Dict: @@ -432,21 +446,43 @@ class DatabaseTable: """ urn: str - id: str - num_cols: Optional[int] + id: Optional[ + str + ] = None # is not None only for tables that came from Tableau metadata + num_cols: Optional[int] = None - paths: Set[str] # maintains all browse paths encountered for this table + paths: Optional[ + Set[str] + ] = None # maintains all browse paths encountered for this table + + parsed_columns: Optional[ + Set[str] + ] = None # maintains all columns encountered for this table during parsing SQL queries def update_table( - self, id: str, num_tbl_cols: Optional[int], path: Optional[str] + self, + id: Optional[str] = None, + num_tbl_cols: Optional[int] = None, + path: Optional[str] = None, + parsed_columns: Optional[Set[str]] = None, ) -> None: - if path and path not in self.paths: - self.paths.add(path) + if path: + if self.paths: + self.paths.add(path) + else: + self.paths = {path} + # the new instance of table has columns information, prefer its id. if not self.num_cols and num_tbl_cols: self.id = id self.num_cols = num_tbl_cols + if parsed_columns: + if self.parsed_columns: + self.parsed_columns.update(parsed_columns) + else: + self.parsed_columns = parsed_columns + class TableauSourceReport(StaleEntityRemovalSourceReport): get_all_datasources_query_failed: bool = False @@ -1137,9 +1173,16 @@ def get_upstream_columns_of_fields_in_datasource( and upstream_table_id in table_id_to_urn.keys() ): parent_dataset_urn = table_id_to_urn[upstream_table_id] - if self.is_snowflake_urn(parent_dataset_urn): + if ( + self.is_snowflake_urn(parent_dataset_urn) + and not self.config.ingest_tables_external + ): # This is required for column level lineage to work correctly as # DataHub Snowflake source lowercases all field names in the schema. + # + # It should not be done if snowflake tables are not pre ingested but + # parsed from SQL queries or ingested from Tableau metadata (in this case + # it just breaks case sensitive table level linage) name = name.lower() input_columns.append( builder.make_schema_field_urn( @@ -1299,6 +1342,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: custom_sql_filter, ) ) + unique_custom_sql = get_unique_custom_sql(custom_sql_connection) for csql in unique_custom_sql: @@ -1364,33 +1408,46 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: project = self._get_project_browse_path_name(datasource) - tables = csql.get(c.TABLES, []) + # if condition is needed as graphQL return "columns": None + columns: List[Dict[Any, Any]] = ( + cast(List[Dict[Any, Any]], csql.get(c.COLUMNS)) + if c.COLUMNS in csql and csql.get(c.COLUMNS) is not None + else [] + ) - if tables: - # lineage from custom sql -> datasets/tables # - yield from self._create_lineage_to_upstream_tables( - csql_urn, tables, datasource - ) - elif self.config.extract_lineage_from_unsupported_custom_sql_queries: - logger.debug("Extracting TLL & CLL from custom sql") - # custom sql tables may contain unsupported sql, causing incomplete lineage - # we extract the lineage from the raw queries + # The Tableau SQL parser much worse than our sqlglot based parser, + # so relying on metadata parsed by Tableau from SQL queries can be + # less accurate. This option allows us to ignore Tableau's parser and + # only use our own. + if self.config.force_extraction_of_lineage_from_custom_sql_queries: + logger.debug("Extracting TLL & CLL from custom sql (forced)") yield from self._create_lineage_from_unsupported_csql( - csql_urn, csql + csql_urn, csql, columns ) + else: + tables = csql.get(c.TABLES, []) + + if tables: + # lineage from custom sql -> datasets/tables # + yield from self._create_lineage_to_upstream_tables( + csql_urn, tables, datasource + ) + elif ( + self.config.extract_lineage_from_unsupported_custom_sql_queries + ): + logger.debug("Extracting TLL & CLL from custom sql") + # custom sql tables may contain unsupported sql, causing incomplete lineage + # we extract the lineage from the raw queries + yield from self._create_lineage_from_unsupported_csql( + csql_urn, csql, columns + ) + # Schema Metadata - # if condition is needed as graphQL return "cloumns": None - columns: List[Dict[Any, Any]] = ( - cast(List[Dict[Any, Any]], csql.get(c.COLUMNS)) - if c.COLUMNS in csql and csql.get(c.COLUMNS) is not None - else [] - ) schema_metadata = self.get_schema_metadata_for_custom_sql(columns) if schema_metadata is not None: dataset_snapshot.aspects.append(schema_metadata) # Browse path - if project and datasource_name: browse_paths = BrowsePathsClass( paths=[f"{self.dataset_browse_prefix}/{project}/{datasource_name}"] @@ -1585,6 +1642,33 @@ def _create_lineage_to_upstream_tables( aspect=upstream_lineage, ) + @staticmethod + def _clean_tableau_query_parameters(query: str) -> str: + if not query: + return query + + # + # It replaces all following occurrences by 1 + # which is enough to fix syntax of SQL query + # and make sqlglot parser happy: + # + # <[Parameters].[SomeParameterName]> + # + # <[Parameters].SomeParameterName> + # <[Parameters].SomeParameter Name> + # + # + # After, it unescapes (Tableau escapes it) + # >> to > + # << to < + # + return ( + re.sub(r"\<\[?[Pp]arameters\]?\.(\[[^\]]+\]|[^\>]+)\>", "1", query) + .replace("<<", "<") + .replace(">>", ">") + .replace("\n\n", "\n") + ) + def parse_custom_sql( self, datasource: dict, @@ -1604,9 +1688,15 @@ def parse_custom_sql( ] ], ) -> Optional["SqlParsingResult"]: - database_info = datasource.get(c.DATABASE) or {} + database_info = datasource.get(c.DATABASE) or { + c.NAME: c.UNKNOWN.lower(), + c.CONNECTION_TYPE: "databricks", + } - if datasource.get(c.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False): + if ( + datasource.get(c.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False) + and not self.config.force_extraction_of_lineage_from_custom_sql_queries + ): logger.debug(f"datasource {datasource_urn} is not created from custom sql") return None @@ -1622,6 +1712,7 @@ def parse_custom_sql( f"raw sql query is not available for datasource {datasource_urn}" ) return None + query = self._clean_tableau_query_parameters(query) logger.debug(f"Parsing sql={query}") @@ -1647,10 +1738,33 @@ def parse_custom_sql( platform_instance=platform_instance, env=env, graph=self.ctx.graph, + schema_aware=not self.config.sql_parsing_disable_schema_awareness, ) + def _enrich_database_tables_with_parsed_schemas( + self, parsing_result: SqlParsingResult + ) -> None: + + in_tables_schemas: Dict[ + str, Set[str] + ] = transform_parsing_result_to_in_tables_schemas(parsing_result) + + if not in_tables_schemas: + logger.info("Unable to extract table schema from parsing result") + return + + for table_urn, columns in in_tables_schemas.items(): + if table_urn in self.database_tables: + self.database_tables[table_urn].update_table( + table_urn, parsed_columns=columns + ) + else: + self.database_tables[table_urn] = DatabaseTable( + urn=table_urn, parsed_columns=columns + ) + def _create_lineage_from_unsupported_csql( - self, csql_urn: str, csql: dict + self, csql_urn: str, csql: dict, out_columns: List[Dict[Any, Any]] ) -> Iterable[MetadataWorkUnit]: parsed_result = self.parse_custom_sql( datasource=csql, @@ -1667,6 +1781,8 @@ def _create_lineage_from_unsupported_csql( ) return + self._enrich_database_tables_with_parsed_schemas(parsed_result) + upstream_tables = make_upstream_class(parsed_result) logger.debug(f"Upstream tables = {upstream_tables}") @@ -1675,7 +1791,7 @@ def _create_lineage_from_unsupported_csql( if self.config.extract_column_level_lineage: logger.info("Extracting CLL from custom sql") fine_grained_lineages = make_fine_grained_lineage_class( - parsed_result, csql_urn + parsed_result, csql_urn, out_columns ) upstream_lineage = UpstreamLineage( @@ -1811,7 +1927,11 @@ def emit_datasource( dataset_snapshot.aspects.append(dataset_props) # Upstream Tables - if datasource.get(c.UPSTREAM_TABLES) or datasource.get(c.UPSTREAM_DATA_SOURCES): + if ( + datasource.get(c.UPSTREAM_TABLES) + or datasource.get(c.UPSTREAM_DATA_SOURCES) + or datasource.get(c.FIELDS) + ): # datasource -> db table relations ( upstream_tables, @@ -1906,32 +2026,58 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: yield from self.emit_datasource(datasource) def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]: - database_table_id_to_urn_map: Dict[str, str] = dict() + tableau_database_table_id_to_urn_map: Dict[str, str] = dict() for urn, tbl in self.database_tables.items(): - database_table_id_to_urn_map[tbl.id] = urn - tables_filter = ( - f"{c.ID_WITH_IN}: {json.dumps(list(database_table_id_to_urn_map.keys()))}" - ) + # only tables that came from Tableau metadata have id + if tbl.id: + tableau_database_table_id_to_urn_map[tbl.id] = urn + + tables_filter = f"{c.ID_WITH_IN}: {json.dumps(list(tableau_database_table_id_to_urn_map.keys()))}" - for table in self.get_connection_objects( + # Emmitting tables that came from Tableau metadata + for tableau_table in self.get_connection_objects( database_tables_graphql_query, c.DATABASE_TABLES_CONNECTION, tables_filter, ): - yield from self.emit_table(table, database_table_id_to_urn_map) + database_table = self.database_tables[ + tableau_database_table_id_to_urn_map[tableau_table[c.ID]] + ] + tableau_columns = tableau_table.get(c.COLUMNS, []) + is_embedded = tableau_table.get(c.IS_EMBEDDED) or False + if not is_embedded and not self.config.ingest_tables_external: + logger.debug( + f"Skipping external table {database_table.urn} as ingest_tables_external is set to False" + ) + continue + + yield from self.emit_table(database_table, tableau_columns) + + # Emmitting tables that were purely parsed from SQL queries + for database_table in self.database_tables.values(): + # Only tables purely parsed from SQL queries don't have ID + if database_table.id: + logger.debug( + f"Skipping external table {database_table.urn} should have already been ingested from Tableau metadata" + ) + continue + + if not self.config.ingest_tables_external: + logger.debug( + f"Skipping external table {database_table.urn} as ingest_tables_external is set to False" + ) + continue + + yield from self.emit_table(database_table, None) def emit_table( - self, table: dict, database_table_id_to_urn_map: Dict[str, str] + self, + database_table: DatabaseTable, + tableau_columns: Optional[List[Dict[str, Any]]], ) -> Iterable[MetadataWorkUnit]: - database_table = self.database_tables[database_table_id_to_urn_map[table[c.ID]]] - columns = table.get(c.COLUMNS, []) - is_embedded = table.get(c.IS_EMBEDDED) or False - if not is_embedded and not self.config.ingest_tables_external: - logger.debug( - f"Skipping external table {database_table.urn} as ingest_tables_external is set to False" - ) - return - + logger.debug( + f"Emiting external table {database_table} tableau_columns {tableau_columns}" + ) dataset_snapshot = DatasetSnapshot( urn=database_table.urn, aspects=[], @@ -1948,19 +2094,25 @@ def emit_table( else: logger.debug(f"Browse path not set for table {database_table.urn}") - schema_metadata = self.get_schema_metadata_for_table(columns or []) + schema_metadata = self.get_schema_metadata_for_table( + tableau_columns, database_table.parsed_columns + ) if schema_metadata is not None: dataset_snapshot.aspects.append(schema_metadata) yield self.get_metadata_change_event(dataset_snapshot) def get_schema_metadata_for_table( - self, columns: List[dict] + self, + tableau_columns: Optional[List[Dict[str, Any]]], + parsed_columns: Optional[Set[str]] = None, ) -> Optional[SchemaMetadata]: schema_metadata: Optional[SchemaMetadata] = None - if columns: - fields = [] - for field in columns: + + fields = [] + + if tableau_columns: + for field in tableau_columns: if field.get(c.NAME) is None: self.report.num_table_field_skipped_no_name += 1 logger.warning( @@ -1979,6 +2131,24 @@ def get_schema_metadata_for_table( fields.append(schema_field) + if parsed_columns: + remaining_columns = ( + parsed_columns.difference(map(lambda x: x.get(c.NAME), tableau_columns)) + if tableau_columns + else parsed_columns + ) + remaining_schema_fields = [ + SchemaField( + fieldPath=col, + type=SchemaFieldDataType(type=NullTypeClass()), + description="", + nativeDataType=c.UNKNOWN, + ) + for col in remaining_columns + ] + fields.extend(remaining_schema_fields) + + if fields: schema_metadata = SchemaMetadata( schemaName="test", platform=f"urn:li:dataPlatform:{self.platform}", @@ -1988,6 +2158,10 @@ def get_schema_metadata_for_table( platformSchema=OtherSchema(rawSchema=""), ) + # TODO: optionally add logic that will lookup current table schema from DataHub + # and merge it together with what was inferred during current run, it allows incrementally + # ingest different Tableau projects sharing the same tables + return schema_metadata def get_sheetwise_upstream_datasources(self, sheet: dict) -> set: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 289f638c5cb98..e547934bc4a2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -2,7 +2,7 @@ import logging from dataclasses import dataclass from functools import lru_cache -from typing import Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from pydantic.fields import Field from tableauserverclient import Server @@ -762,8 +762,19 @@ def make_upstream_class( def make_fine_grained_lineage_class( - parsed_result: Optional[SqlParsingResult], dataset_urn: str + parsed_result: Optional[SqlParsingResult], + dataset_urn: str, + out_columns: List[Dict[Any, Any]], ) -> List[FineGrainedLineage]: + # 1) fine grained lineage links are case sensitive + # 2) parsed out columns are always lower cased + # 3) corresponding Custom SQL output columns can be in any case lower/upper/mix + # + # we need a map between 2 and 3 that will be used during building column level linage links (see below) + out_columns_map = { + col.get(c.NAME, "").lower(): col.get(c.NAME, "") for col in out_columns + } + fine_grained_lineages: List[FineGrainedLineage] = [] if parsed_result is None: @@ -775,7 +786,15 @@ def make_fine_grained_lineage_class( for cll_info in cll: downstream = ( - [builder.make_schema_field_urn(dataset_urn, cll_info.downstream.column)] + [ + builder.make_schema_field_urn( + dataset_urn, + out_columns_map.get( + cll_info.downstream.column.lower(), + cll_info.downstream.column, + ), + ) + ] if cll_info.downstream is not None and cll_info.downstream.column is not None else [] diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_result_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_result_utils.py new file mode 100644 index 0000000000000..117bfaf663a7d --- /dev/null +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_result_utils.py @@ -0,0 +1,23 @@ +from typing import Dict, Set + +from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult, Urn + + +def transform_parsing_result_to_in_tables_schemas( + parsing_result: SqlParsingResult, +) -> Dict[Urn, Set[str]]: + table_urn_to_schema_map: Dict[str, Set[str]] = ( + {it: set() for it in parsing_result.in_tables} + if parsing_result.in_tables + else {} + ) + + if parsing_result.column_lineage: + for cli in parsing_result.column_lineage: + for upstream in cli.upstreams: + if upstream.table in table_urn_to_schema_map: + table_urn_to_schema_map[upstream.table].add(upstream.column) + else: + table_urn_to_schema_map[upstream.table] = {upstream.column} + + return table_urn_to_schema_map diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index d45d35e3903d6..5565c8947261f 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -529,6 +529,9 @@ def _schema_aware_fuzzy_column_resolve( # Parse the column name out of the node name. # Sqlglot calls .sql(), so we have to do the inverse. + if node.name == "*": + continue + normalized_col = sqlglot.parse_one(node.name).this.name if node.subfield: normalized_col = f"{normalized_col}.{node.subfield}" @@ -834,6 +837,7 @@ def _sqlglot_lineage_inner( # Fetch schema info for the relevant tables. table_name_urn_mapping: Dict[_TableName, str] = {} table_name_schema_mapping: Dict[_TableName, SchemaInfo] = {} + for table in tables | modified: # For select statements, qualification will be a no-op. For other statements, this # is where the qualification actually happens. @@ -1016,8 +1020,9 @@ def create_lineage_sql_parsed_result( env: str, default_schema: Optional[str] = None, graph: Optional[DataHubGraph] = None, + schema_aware: bool = True, ) -> SqlParsingResult: - if graph: + if graph and schema_aware: needs_close = False schema_resolver = graph._make_schema_resolver( platform=platform, diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index a3aaa1a87db6a..bad72e6922101 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -16,6 +16,11 @@ def _get_dialect_str(platform: str) -> str: return "tsql" elif platform == "athena": return "trino" + # TODO: define SalesForce SOQL dialect + # Temporary workaround is to treat SOQL as databricks dialect + # At least it allows to parse simple SQL queries and built linage for them + elif platform == "salesforce": + return "databricks" elif platform in {"mysql", "mariadb"}: # In sqlglot v20+, MySQL is now case-sensitive by default, which is the # default behavior on Linux. However, MySQL's default case sensitivity @@ -31,6 +36,7 @@ def _get_dialect_str(platform: str) -> str: def get_dialect(platform: DialectOrStr) -> sqlglot.Dialect: if isinstance(platform, sqlglot.Dialect): return platform + return sqlglot.Dialect.get_or_raise(_get_dialect_str(platform)) diff --git a/metadata-ingestion/tests/integration/tableau/tableau_cll_mces_golden.json b/metadata-ingestion/tests/integration/tableau/tableau_cll_mces_golden.json index 4f83aa2ba43d1..3f481207a03ea 100644 --- a/metadata-ingestion/tests/integration/tableau/tableau_cll_mces_golden.json +++ b/metadata-ingestion/tests/integration/tableau/tableau_cll_mces_golden.json @@ -42870,6 +42870,38 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,demo-custom-323403.bigquery_demo.order_items,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "tableau-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,demo-custom-323403.bigquery_demo.sellers,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "tableau-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:external,sample - superstore%2C %28new%29.xls.orders,PROD)", diff --git a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py index 6deb4e84751e1..cccf79ccbd8e0 100644 --- a/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py +++ b/metadata-ingestion/tests/integration/tableau/test_tableau_ingest.py @@ -319,6 +319,8 @@ def test_tableau_cll_ingest(pytestconfig, tmp_path, mock_datahub_graph): new_pipeline_config: Dict[Any, Any] = { **config_source_default, "extract_lineage_from_unsupported_custom_sql_queries": True, + "force_extraction_of_lineage_from_custom_sql_queries": False, + "sql_parsing_disable_schema_awareness": False, "extract_column_level_lineage": True, } @@ -834,6 +836,7 @@ def test_tableau_unsupported_csql(mock_datahub_graph): "connectionType": "bigquery", }, }, + out_columns=[], ) mcp = cast(MetadataChangeProposalClass, next(iter(lineage)).metadata) diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_parsing_result_utils.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_parsing_result_utils.py new file mode 100644 index 0000000000000..928dfd26fca2c --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_parsing_result_utils.py @@ -0,0 +1,67 @@ +from datahub.sql_parsing.sql_parsing_result_utils import ( + transform_parsing_result_to_in_tables_schemas, +) +from datahub.sql_parsing.sqlglot_lineage import ( + ColumnLineageInfo, + ColumnRef, + DownstreamColumnRef, + SqlParsingResult, +) + + +def test_transform_parsing_result_to_in_tables_schemas__empty_parsing_result(): + parsing_result = SqlParsingResult(in_tables=[], out_tables=[], column_lineage=None) + + in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result) + assert not in_tables_schema + + +def test_transform_parsing_result_to_in_tables_schemas__in_tables_only(): + parsing_result = SqlParsingResult( + in_tables=["table_urn1", "table_urn2", "table_urn3"], + out_tables=[], + column_lineage=None, + ) + + in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result) + assert in_tables_schema == { + "table_urn1": set(), + "table_urn2": set(), + "table_urn3": set(), + } + + +def test_transform_parsing_result_to_in_tables_schemas__in_tables_and_column_linage(): + parsing_result = SqlParsingResult( + in_tables=["table_urn1", "table_urn2", "table_urn3"], + out_tables=[], + column_lineage=[ + ColumnLineageInfo( + downstream=DownstreamColumnRef(column="out_col1"), + upstreams=[ + ColumnRef(table="table_urn1", column="col11"), + ], + ), + ColumnLineageInfo( + downstream=DownstreamColumnRef(column="out_col2"), + upstreams=[ + ColumnRef(table="table_urn2", column="col21"), + ColumnRef(table="table_urn2", column="col22"), + ], + ), + ColumnLineageInfo( + downstream=DownstreamColumnRef(column="out_col3"), + upstreams=[ + ColumnRef(table="table_urn1", column="col12"), + ColumnRef(table="table_urn2", column="col23"), + ], + ), + ], + ) + + in_tables_schema = transform_parsing_result_to_in_tables_schemas(parsing_result) + assert in_tables_schema == { + "table_urn1": {"col11", "col12"}, + "table_urn2": {"col21", "col22", "col23"}, + "table_urn3": set(), + } diff --git a/metadata-ingestion/tests/unit/test_tableau_source.py b/metadata-ingestion/tests/unit/test_tableau_source.py new file mode 100644 index 0000000000000..9a2b1dd408d80 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_tableau_source.py @@ -0,0 +1,123 @@ +import pytest + +from datahub.ingestion.source.tableau import TableauSource + + +def test_tableau_source_unescapes_lt(): + res = TableauSource._clean_tableau_query_parameters( + "select * from t where c1 << 135" + ) + + assert res == "select * from t where c1 < 135" + + +def test_tableau_source_unescapes_gt(): + res = TableauSource._clean_tableau_query_parameters( + "select * from t where c1 >> 135" + ) + + assert res == "select * from t where c1 > 135" + + +def test_tableau_source_unescapes_gte(): + res = TableauSource._clean_tableau_query_parameters( + "select * from t where c1 >>= 135" + ) + + assert res == "select * from t where c1 >= 135" + + +def test_tableau_source_unescapeslgte(): + res = TableauSource._clean_tableau_query_parameters( + "select * from t where c1 <<= 135" + ) + + assert res == "select * from t where c1 <= 135" + + +def test_tableau_source_doesnt_touch_not_escaped(): + res = TableauSource._clean_tableau_query_parameters( + "select * from t where c1 < 135 and c2 > 15" + ) + + assert res == "select * from t where c1 < 135 and c2 > 15" + + +TABLEAU_PARAMS = [ + "", + "", + "", + "", + "<[Parameters].MyParam>", + "<[Parameters].MyParam_1>", + "<[Parameters].My Param _ 1>", + "<[Parameters].My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>", + "", + "", + "", + "", + "<[Parameters].[MyParam]>", + "<[Parameters].[MyParam_1]>", + "<[Parameters].[My Param _ 1]>", + "<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<]>", + "]>", + "<[Parameters].[My Param 1 !@\"',.#$%^:;&*()-_+={}|\\ /<>]>", +] + + +@pytest.mark.parametrize("p", TABLEAU_PARAMS) +def test_tableau_source_cleanups_tableau_parameters_in_equi_predicates(p): + assert ( + TableauSource._clean_tableau_query_parameters( + f"select * from t where c1 = {p} and c2 = {p} and c3 = 7" + ) + == "select * from t where c1 = 1 and c2 = 1 and c3 = 7" + ) + + +@pytest.mark.parametrize("p", TABLEAU_PARAMS) +def test_tableau_source_cleanups_tableau_parameters_in_lt_gt_predicates(p): + assert ( + TableauSource._clean_tableau_query_parameters( + f"select * from t where c1 << {p} and c2<<{p} and c3 >> {p} and c4>>{p} or {p} >> c1 and {p}>>c2 and {p} << c3 and {p}< 1 and c4>1 or 1 > c1 and 1>c2 and 1 < c3 and 1>= {p} and c4>>={p} or {p} >>= c1 and {p}>>=c2 and {p} <<= c3 and {p}<<=c4" + ) + == "select * from t where c1 <= 1 and c2<=1 and c3 >= 1 and c4>=1 or 1 >= c1 and 1>=c2 and 1 <= c3 and 1<=c4" + ) + + +@pytest.mark.parametrize("p", TABLEAU_PARAMS) +def test_tableau_source_cleanups_tableau_parameters_in_join_predicate(p): + assert ( + TableauSource._clean_tableau_query_parameters( + f"select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = {p} and t1.c11 = 123 + {p}" + ) + == "select * from t1 inner join t2 on t1.id = t2.id and t2.c21 = 1 and t1.c11 = 123 + 1" + ) + + +@pytest.mark.parametrize("p", TABLEAU_PARAMS) +def test_tableau_source_cleanups_tableau_parameters_in_complex_expressions(p): + assert ( + TableauSource._clean_tableau_query_parameters( + f"select myudf1(c1, {p}, c2) / myudf2({p}) > ({p} + 3 * {p} * c5) * {p} - c4" + ) + == "select myudf1(c1, 1, c2) / myudf2(1) > (1 + 3 * 1 * c5) * 1 - c4" + ) + + +@pytest.mark.parametrize("p", TABLEAU_PARAMS) +def test_tableau_source_cleanups_tableau_parameters_in_udfs(p): + assert ( + TableauSource._clean_tableau_query_parameters(f"select myudf({p}) from t") + == "select myudf(1) from t" + )