Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-19454: Fixes broken looker lineage #19456

Merged
merged 4 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,18 @@
import traceback
from datetime import datetime
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Set, Type, Union, cast
from typing import (
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
Type,
Union,
cast,
get_args,
)

import giturlparse
import lkml
Expand Down Expand Up @@ -91,7 +102,7 @@
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
Expand All @@ -108,7 +119,6 @@
LookMlView,
ViewName,
)
from metadata.ingestion.source.dashboard.looker.parser import LkmlParser
from metadata.ingestion.source.dashboard.looker.utils import _clone_repo
from metadata.readers.file.api_reader import ReadersCredentials
from metadata.readers.file.base import Reader
Expand All @@ -121,7 +131,6 @@

logger = ingestion_logger()


LIST_DASHBOARD_FIELDS = ["id", "title"]
IMPORTED_PROJECTS_DIR = "imported_projects"

Expand Down Expand Up @@ -178,7 +187,7 @@ def __init__(
self._explores_cache = {}
self._repo_credentials: Optional[ReadersCredentials] = None
self._reader_class: Optional[Type[Reader]] = None
self._project_parsers: Optional[Dict[str, LkmlParser]] = None
self._project_parsers: Optional[Dict[str, BulkLkmlParser]] = None
self._main_lookml_repo: Optional[LookMLRepo] = None
self._main__lookml_manifest: Optional[LookMLManifest] = None
self._view_data_model: Optional[DashboardDataModel] = None
Expand Down Expand Up @@ -260,7 +269,7 @@ def prepare(self):
self._main__lookml_manifest = self.__read_manifest(credentials)

@property
def parser(self) -> Optional[Dict[str, LkmlParser]]:
def parser(self) -> Optional[Dict[str, BulkLkmlParser]]:
if self.repository_credentials:
return self._project_parsers
return None
Expand All @@ -282,7 +291,7 @@ def parser(self, all_lookml_models: Sequence[LookmlModel]) -> None:
"""
if self.repository_credentials:
all_projects: Set[str] = {model.project_name for model in all_lookml_models}
self._project_parsers: Dict[str, LkmlParser] = {
self._project_parsers: Dict[str, BulkLkmlParser] = {
project_name: BulkLkmlParser(
reader=self.reader(Path(self._main_lookml_repo.path))
)
Expand Down Expand Up @@ -325,7 +334,7 @@ def repository_credentials(self) -> Optional[ReadersCredentials]:
"""
if not self._repo_credentials:
if self.service_connection.gitCredentials and isinstance(
self.service_connection.gitCredentials, ReadersCredentials
self.service_connection.gitCredentials, get_args(ReadersCredentials)
):
self._repo_credentials = self.service_connection.gitCredentials

Expand Down Expand Up @@ -572,10 +581,12 @@ def add_view_lineage(

if view.sql_table_name:
sql_table_name = self._render_table_name(view.sql_table_name)
source_table_name = self._clean_table_name(sql_table_name)

# View to the source is only there if we are informing the dbServiceNames
for db_service_name in db_service_names or []:
dialect = self._get_db_dialect(db_service_name)
source_table_name = self._clean_table_name(sql_table_name, dialect)

Comment on lines +587 to +588
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I f i understand correctly this is so the query is compiled to the correct dialect, right?

Copy link
Contributor Author

@olof-nn olof-nn Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleaning of the table name already occurs, this is just adding the functionality of making `project_id.dataset_id.table_id` -> project_id.dataset_id.table_id. This is common to enclose table references like this in BigQuery. Our looker views are defined like:
sql_table_name: `project_id.dataset_id.table_id` ;;. The Looker ingestion then tries to find lineage references by searching for:

db: `project_id
schema: dataset_id
table: table_id`

Thus we find no lineage between looker <-> BQ

This addition in the _clean_table_name method will address that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I.e. it is not compiling the query. That is happening only if the table ref is a derived table in Looker. That scenario is already handled in the elif statement

# View to the source is only there if we are informing the dbServiceNames
yield self.build_lineage_request(
source=source_table_name,
db_service_name=db_service_name,
Expand All @@ -587,15 +598,9 @@ def add_view_lineage(
if not sql_query:
return
for db_service_name in db_service_names or []:
db_service = self.metadata.get_by_name(
DatabaseService, db_service_name
)

lineage_parser = LineageParser(
sql_query,
ConnectionTypeDialectMapper.dialect_of(
db_service.connection.config.type.value
),
self._get_db_dialect(db_service_name),
timeout_seconds=30,
)
if lineage_parser.source_tables:
Expand All @@ -615,6 +620,12 @@ def add_view_lineage(
)
)

def _get_db_dialect(self, db_service_name) -> Dialect:
db_service = self.metadata.get_by_name(DatabaseService, db_service_name)
return ConnectionTypeDialectMapper.dialect_of(
db_service.connection.config.type.value
)

def get_dashboards_list(self) -> List[DashboardBase]:
"""
Get List of all dashboards
Expand Down Expand Up @@ -718,15 +729,18 @@ def get_project_name(self, dashboard_details: LookerDashboard) -> Optional[str]:
return None

@staticmethod
def _clean_table_name(table_name: str) -> str:
def _clean_table_name(table_name: str, dialect: Dialect = Dialect.ANSI) -> str:
"""
sql_table_names might be renamed when defining
an explore. E.g., customers as cust
:param table_name: explore table name
:return: clean table name
"""

return table_name.lower().split(" as ")[0].strip()
clean_table_name = table_name.lower().split(" as ")[0].strip()
if dialect == Dialect.BIGQUERY:
clean_table_name = clean_table_name.strip("`")
return clean_table_name

@staticmethod
def _render_table_name(table_name: str) -> str:
Expand Down
29 changes: 25 additions & 4 deletions ingestion/tests/unit/topology/dashboard/test_looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from metadata.generated.schema.type.usageDetails import UsageDetails, UsageStats
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardUsage
from metadata.ingestion.source.dashboard.looker.metadata import LookerSource
Expand Down Expand Up @@ -292,13 +293,33 @@ def test_clean_table_name(self):
"""
Check table cleaning
"""
self.assertEqual(self.looker._clean_table_name("MY_TABLE"), "my_table")
self.assertEqual(
self.looker._clean_table_name("MY_TABLE", Dialect.MYSQL), "my_table"
)

self.assertEqual(self.looker._clean_table_name(" MY_TABLE "), "my_table")
self.assertEqual(
self.looker._clean_table_name(" MY_TABLE ", Dialect.REDSHIFT), "my_table"
)

self.assertEqual(self.looker._clean_table_name(" my_table"), "my_table")
self.assertEqual(
self.looker._clean_table_name(" my_table", Dialect.SNOWFLAKE), "my_table"
)

self.assertEqual(self.looker._clean_table_name("TABLE AS ALIAS"), "table")
self.assertEqual(
self.looker._clean_table_name("TABLE AS ALIAS", Dialect.BIGQUERY), "table"
)

self.assertEqual(
self.looker._clean_table_name(
"`project_id.dataset_id.table_id` AS ALIAS", Dialect.BIGQUERY
),
"project_id.dataset_id.table_id",
)

self.assertEqual(
self.looker._clean_table_name("`db.schema.table`", Dialect.POSTGRES),
"`db.schema.table`",
)

def test_render_table_name(self):
"""
Expand Down
Loading