Skip to content

Commit

Permalink
ISSUE-19454: Fixes broken looker lineage (#19456)
Browse files Browse the repository at this point in the history
* ISSUE-19454: Fixes the broken lineage in looker when backticks enclosed table refs

* refactor

* use isort

* Update ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py

Co-authored-by: Mayur Singal <[email protected]>

---------

Co-authored-by: Mayur Singal <[email protected]>
  • Loading branch information
olof-nn and ulixius9 authored Jan 22, 2025
1 parent 250b406 commit fd2575d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@
import traceback
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Set, Type, Union, cast
from typing import (
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
Type,
Union,
cast,
get_args,
)

import giturlparse
import lkml
Expand Down Expand Up @@ -91,7 +102,7 @@
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
Expand All @@ -108,7 +119,6 @@
LookMlView,
ViewName,
)
from metadata.ingestion.source.dashboard.looker.parser import LkmlParser
from metadata.ingestion.source.dashboard.looker.utils import _clone_repo
from metadata.readers.file.api_reader import ReadersCredentials
from metadata.readers.file.base import Reader
Expand All @@ -121,7 +131,6 @@

logger = ingestion_logger()


LIST_DASHBOARD_FIELDS = ["id", "title"]
IMPORTED_PROJECTS_DIR = "imported_projects"

Expand Down Expand Up @@ -178,7 +187,7 @@ def __init__(
self._explores_cache = {}
self._repo_credentials: Optional[ReadersCredentials] = None
self._reader_class: Optional[Type[Reader]] = None
self._project_parsers: Optional[Dict[str, LkmlParser]] = None
self._project_parsers: Optional[Dict[str, BulkLkmlParser]] = None
self._main_lookml_repo: Optional[LookMLRepo] = None
self._main__lookml_manifest: Optional[LookMLManifest] = None
self._view_data_model: Optional[DashboardDataModel] = None
Expand Down Expand Up @@ -260,7 +269,7 @@ def prepare(self):
self._main__lookml_manifest = self.__read_manifest(credentials)

@property
def parser(self) -> Optional[Dict[str, LkmlParser]]:
def parser(self) -> Optional[Dict[str, BulkLkmlParser]]:
if self.repository_credentials:
return self._project_parsers
return None
Expand All @@ -282,7 +291,7 @@ def parser(self, all_lookml_models: Sequence[LookmlModel]) -> None:
"""
if self.repository_credentials:
all_projects: Set[str] = {model.project_name for model in all_lookml_models}
self._project_parsers: Dict[str, LkmlParser] = {
self._project_parsers: Dict[str, BulkLkmlParser] = {
project_name: BulkLkmlParser(
reader=self.reader(Path(self._main_lookml_repo.path))
)
Expand Down Expand Up @@ -325,7 +334,7 @@ def repository_credentials(self) -> Optional[ReadersCredentials]:
"""
if not self._repo_credentials:
if self.service_connection.gitCredentials and isinstance(
self.service_connection.gitCredentials, ReadersCredentials
self.service_connection.gitCredentials, get_args(ReadersCredentials)
):
self._repo_credentials = self.service_connection.gitCredentials

Expand Down Expand Up @@ -572,10 +581,12 @@ def add_view_lineage(

if view.sql_table_name:
sql_table_name = self._render_table_name(view.sql_table_name)
source_table_name = self._clean_table_name(sql_table_name)

# View to the source is only there if we are informing the dbServiceNames
for db_service_name in db_service_names or []:
dialect = self._get_db_dialect(db_service_name)
source_table_name = self._clean_table_name(sql_table_name, dialect)

# View to the source is only there if we are informing the dbServiceNames
yield self.build_lineage_request(
source=source_table_name,
db_service_name=db_service_name,
Expand All @@ -587,15 +598,9 @@ def add_view_lineage(
if not sql_query:
return
for db_service_name in db_service_names or []:
db_service = self.metadata.get_by_name(
DatabaseService, db_service_name
)

lineage_parser = LineageParser(
sql_query,
ConnectionTypeDialectMapper.dialect_of(
db_service.connection.config.type.value
),
self._get_db_dialect(db_service_name),
timeout_seconds=30,
)
if lineage_parser.source_tables:
Expand All @@ -615,6 +620,12 @@ def add_view_lineage(
)
)

def _get_db_dialect(self, db_service_name) -> Dialect:
db_service = self.metadata.get_by_name(DatabaseService, db_service_name)
return ConnectionTypeDialectMapper.dialect_of(
db_service.connection.config.type.value
)

def get_dashboards_list(self) -> List[DashboardBase]:
"""
Get List of all dashboards
Expand Down Expand Up @@ -718,15 +729,18 @@ def get_project_name(self, dashboard_details: LookerDashboard) -> Optional[str]:
return None

@staticmethod
def _clean_table_name(table_name: str) -> str:
def _clean_table_name(table_name: str, dialect: Dialect = Dialect.ANSI) -> str:
"""
sql_table_names might be renamed when defining
an explore. E.g., customers as cust
:param table_name: explore table name
:return: clean table name
"""

return table_name.lower().split(" as ")[0].strip()
clean_table_name = table_name.lower().split(" as ")[0].strip()
if dialect == Dialect.BIGQUERY:
clean_table_name = clean_table_name.strip("`")
return clean_table_name

@staticmethod
def _render_table_name(table_name: str) -> str:
Expand Down
29 changes: 25 additions & 4 deletions ingestion/tests/unit/topology/dashboard/test_looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
from metadata.ingestion.source.dashboard.looker.metadata import LookerSource
Expand Down Expand Up @@ -292,13 +293,33 @@ def test_clean_table_name(self):
"""
Check table cleaning
"""
self.assertEqual(self.looker._clean_table_name("MY_TABLE"), "my_table")
self.assertEqual(
self.looker._clean_table_name("MY_TABLE", Dialect.MYSQL), "my_table"
)

self.assertEqual(self.looker._clean_table_name(" MY_TABLE "), "my_table")
self.assertEqual(
self.looker._clean_table_name(" MY_TABLE ", Dialect.REDSHIFT), "my_table"
)

self.assertEqual(self.looker._clean_table_name(" my_table"), "my_table")
self.assertEqual(
self.looker._clean_table_name(" my_table", Dialect.SNOWFLAKE), "my_table"
)

self.assertEqual(self.looker._clean_table_name("TABLE AS ALIAS"), "table")
self.assertEqual(
self.looker._clean_table_name("TABLE AS ALIAS", Dialect.BIGQUERY), "table"
)

self.assertEqual(
self.looker._clean_table_name(
"`project_id.dataset_id.table_id` AS ALIAS", Dialect.BIGQUERY
),
"project_id.dataset_id.table_id",
)

self.assertEqual(
self.looker._clean_table_name("`db.schema.table`", Dialect.POSTGRES),
"`db.schema.table`",
)

def test_render_table_name(self):
"""
Expand Down

0 comments on commit fd2575d

Please sign in to comment.