diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index 4cf1858227f9..64bf13d38f7d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -24,7 +24,18 @@ import traceback from datetime import datetime from pathlib import Path -from typing import Dict, Iterable, List, Optional, Sequence, Set, Type, Union, cast +from typing import ( + Dict, + Iterable, + List, + Optional, + Sequence, + Set, + Type, + Union, + cast, + get_args, +) import giturlparse import lkml @@ -91,7 +102,7 @@ from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException -from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper +from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import ( @@ -108,7 +119,6 @@ LookMlView, ViewName, ) -from metadata.ingestion.source.dashboard.looker.parser import LkmlParser from metadata.ingestion.source.dashboard.looker.utils import _clone_repo from metadata.readers.file.api_reader import ReadersCredentials from metadata.readers.file.base import Reader @@ -121,7 +131,6 @@ logger = ingestion_logger() - LIST_DASHBOARD_FIELDS = ["id", "title"] IMPORTED_PROJECTS_DIR = "imported_projects" @@ -178,7 +187,7 @@ def __init__( self._explores_cache = {} self._repo_credentials: Optional[ReadersCredentials] = None self._reader_class: Optional[Type[Reader]] = None - self._project_parsers: Optional[Dict[str, LkmlParser]] = None + self._project_parsers: Optional[Dict[str, BulkLkmlParser]] = None self._main_lookml_repo: Optional[LookMLRepo] = None self._main__lookml_manifest: Optional[LookMLManifest] = None self._view_data_model: Optional[DashboardDataModel] = None @@ -260,7 +269,7 @@ def prepare(self): self._main__lookml_manifest = self.__read_manifest(credentials) @property - def parser(self) -> Optional[Dict[str, LkmlParser]]: + def parser(self) -> Optional[Dict[str, BulkLkmlParser]]: if self.repository_credentials: return self._project_parsers return None @@ -282,7 +291,7 @@ def parser(self, all_lookml_models: Sequence[LookmlModel]) -> None: """ if self.repository_credentials: all_projects: Set[str] = {model.project_name for model in all_lookml_models} - self._project_parsers: Dict[str, LkmlParser] = { + self._project_parsers: Dict[str, BulkLkmlParser] = { project_name: BulkLkmlParser( reader=self.reader(Path(self._main_lookml_repo.path)) ) @@ -325,7 +334,7 @@ def repository_credentials(self) -> Optional[ReadersCredentials]: """ if not self._repo_credentials: if self.service_connection.gitCredentials and isinstance( - self.service_connection.gitCredentials, ReadersCredentials + self.service_connection.gitCredentials, get_args(ReadersCredentials) ): self._repo_credentials = self.service_connection.gitCredentials @@ -572,10 +581,12 @@ def add_view_lineage( if view.sql_table_name: sql_table_name = self._render_table_name(view.sql_table_name) - source_table_name = self._clean_table_name(sql_table_name) - # View to the source is only there if we are informing the dbServiceNames for db_service_name in db_service_names or []: + dialect = self._get_db_dialect(db_service_name) + source_table_name = self._clean_table_name(sql_table_name, dialect) + + # View to the source is only there if we are informing the dbServiceNames yield self.build_lineage_request( source=source_table_name, db_service_name=db_service_name, @@ -587,15 +598,9 @@ def add_view_lineage( if not sql_query: return for db_service_name in db_service_names or []: - db_service = self.metadata.get_by_name( - DatabaseService, db_service_name - ) - lineage_parser = LineageParser( sql_query, - ConnectionTypeDialectMapper.dialect_of( - db_service.connection.config.type.value - ), + self._get_db_dialect(db_service_name), timeout_seconds=30, ) if lineage_parser.source_tables: @@ -615,6 +620,12 @@ def add_view_lineage( ) ) + def _get_db_dialect(self, db_service_name) -> Dialect: + db_service = self.metadata.get_by_name(DatabaseService, db_service_name) + return ConnectionTypeDialectMapper.dialect_of( + db_service.connection.config.type.value + ) + def get_dashboards_list(self) -> List[DashboardBase]: """ Get List of all dashboards @@ -718,7 +729,7 @@ def get_project_name(self, dashboard_details: LookerDashboard) -> Optional[str]: return None @staticmethod - def _clean_table_name(table_name: str) -> str: + def _clean_table_name(table_name: str, dialect: Dialect = Dialect.ANSI) -> str: """ sql_table_names might be renamed when defining an explore. E.g., customers as cust @@ -726,7 +737,10 @@ def _clean_table_name(table_name: str) -> str: :return: clean table name """ - return table_name.lower().split(" as ")[0].strip() + clean_table_name = table_name.lower().split(" as ")[0].strip() + if dialect == Dialect.BIGQUERY: + clean_table_name = clean_table_name.strip("`") + return clean_table_name @staticmethod def _render_table_name(table_name: str) -> str: diff --git a/ingestion/tests/unit/topology/dashboard/test_looker.py b/ingestion/tests/unit/topology/dashboard/test_looker.py index b5a2b2b0ab46..9acdc55111f8 100644 --- a/ingestion/tests/unit/topology/dashboard/test_looker.py +++ b/ingestion/tests/unit/topology/dashboard/test_looker.py @@ -47,6 +47,7 @@ from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.models import Dialect from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage from metadata.ingestion.source.dashboard.looker.metadata import LookerSource @@ -292,13 +293,33 @@ def test_clean_table_name(self): """ Check table cleaning """ - self.assertEqual(self.looker._clean_table_name("MY_TABLE"), "my_table") + self.assertEqual( + self.looker._clean_table_name("MY_TABLE", Dialect.MYSQL), "my_table" + ) - self.assertEqual(self.looker._clean_table_name(" MY_TABLE "), "my_table") + self.assertEqual( + self.looker._clean_table_name(" MY_TABLE ", Dialect.REDSHIFT), "my_table" + ) - self.assertEqual(self.looker._clean_table_name(" my_table"), "my_table") + self.assertEqual( + self.looker._clean_table_name(" my_table", Dialect.SNOWFLAKE), "my_table" + ) - self.assertEqual(self.looker._clean_table_name("TABLE AS ALIAS"), "table") + self.assertEqual( + self.looker._clean_table_name("TABLE AS ALIAS", Dialect.BIGQUERY), "table" + ) + + self.assertEqual( + self.looker._clean_table_name( + "`project_id.dataset_id.table_id` AS ALIAS", Dialect.BIGQUERY + ), + "project_id.dataset_id.table_id", + ) + + self.assertEqual( + self.looker._clean_table_name("`db.schema.table`", Dialect.POSTGRES), + "`db.schema.table`", + ) def test_render_table_name(self): """