Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/lookml): emit dummy sql condition for lookml custom condition tag #11008

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from functools import lru_cache
from typing import ClassVar, Optional, TextIO, cast
from typing import ClassVar, Optional, TextIO

from liquid import Environment
from liquid.ast import Node
Expand All @@ -25,18 +25,9 @@ def __init__(self, tok: Token, sql_or_lookml_reference: str, filter_name: str):
self.filter_name = filter_name

def render_to_output(self, context: Context, buffer: TextIO) -> Optional[bool]:
filter_value: Optional[str] = cast(
str, context.globals.get(self.filter_name)
) # to silent lint

if filter_value is None:
raise CustomTagException(
f'filter {self.filter_name} value is not provided for "condition" tag'
)

filter_value = filter_value.strip()

buffer.write(f"{self.sql_or_lookml_reference}='{filter_value}'")
# This implementation will make sure that sql parse work correctly if looker condition tag
# is used in lookml sql field
buffer.write(f"{self.sql_or_lookml_reference}='dummy_value'")

return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
CustomTagException,
create_template,
)
from datahub.ingestion.source.looker.lookml_config import DERIVED_VIEW_PATTERN
from datahub.ingestion.source.looker.str_functions import (
remove_extra_spaces_and_newlines,
)
Expand Down Expand Up @@ -94,6 +95,24 @@ def resolve_liquid_variable(text: str, liquid_variable: Dict[Any, Any]) -> str:
return text


def _complete_incomplete_sql(raw_view: dict, sql: str) -> str:

# Looker supports sql fragments that omit the SELECT and FROM parts of the query
# Add those in if we detect that it is missing
sql_query: str = sql

if not re.search(r"SELECT\s", sql_query, flags=re.I):
# add a SELECT clause at the beginning
sql_query = f"SELECT {sql}"

if not re.search(r"FROM\s", sql_query, flags=re.I):
# add a FROM clause at the end
sql_query = f"{sql_query} FROM {raw_view['name']}"

# Drop ${ and }
return re.sub(DERIVED_VIEW_PATTERN, r"\1", sql_query)


def resolve_liquid_variable_in_view_dict(
raw_view: dict, liquid_variable: Dict[Any, Any]
) -> None:
Expand All @@ -102,14 +121,18 @@ def resolve_liquid_variable_in_view_dict(

for view in raw_view["views"]:
if "sql_table_name" in view:
view["sql_table_name"] = resolve_liquid_variable(
view["datahub_transformed_sql_table_name"] = resolve_liquid_variable(
text=remove_extra_spaces_and_newlines(view["sql_table_name"]),
liquid_variable=liquid_variable,
)
) # keeping original sql_table_name as is to avoid any visualization issue later

if "derived_table" in view and "sql" in view["derived_table"]:
# In sql we don't need to remove the extra spaces as sql parser takes care of extra spaces and \n
# while generating URN from sql
view["derived_table"]["sql"] = resolve_liquid_variable(
view["derived_table"]["datahub_transformed_sql"] = resolve_liquid_variable(
text=view["derived_table"]["sql"], liquid_variable=liquid_variable
) # keeping original sql as is, so that on UI sql will be shown same is it is visible on looker portal

view["derived_table"]["datahub_transformed_sql"] = _complete_incomplete_sql(
raw_view=view, sql=view["derived_table"]["datahub_transformed_sql"]
)
Original file line number Diff line number Diff line change
Expand Up @@ -266,15 +266,25 @@ def sql_table_name(self) -> str:
sql_table_name: Optional[str] = self._get_sql_table_name_field()
# if sql_table_name field is not set then the table name is equal to view-name
if sql_table_name is None:
return self.raw_view[NAME].lower()
sql_table_name = self.raw_view[NAME].lower()

return sql_table_name

def datahub_transformed_sql_table_name(self) -> str:
table_name: Optional[str] = self.raw_view.get(
"datahub_transformed_sql_table_name"
)

if not table_name:
table_name = self.sql_table_name()

# sql_table_name is in the format "${view-name}.SQL_TABLE_NAME"
# remove extra characters
if self._is_dot_sql_table_name_present():
sql_table_name = re.sub(DERIVED_VIEW_PATTERN, r"\1", sql_table_name)
table_name = re.sub(DERIVED_VIEW_PATTERN, r"\1", table_name)

# Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes
return sql_table_name.replace('"', "").replace("`", "").lower()
return table_name.replace('"', "").replace("`", "").lower()

def derived_table(self) -> Dict[Any, Any]:
"""
Expand All @@ -296,30 +306,21 @@ def explore_source(self) -> Dict[Any, Any]:

return derived_table["explore_source"]

def sql(self, transformed: bool = True) -> str:
def sql(self) -> str:
"""
This function should only be called if is_sql_based_derived_case return true
"""
derived_table = self.derived_table()

# Looker supports sql fragments that omit the SELECT and FROM parts of the query
# Add those in if we detect that it is missing
sql_query: str = derived_table["sql"]

if transformed: # update the original sql attribute only if transformed is true
if not re.search(r"SELECT\s", sql_query, flags=re.I):
# add a SELECT clause at the beginning
sql_query = f"SELECT {sql_query}"
return derived_table["sql"]

if not re.search(r"FROM\s", sql_query, flags=re.I):
# add a FROM clause at the end
sql_query = f"{sql_query} FROM {self.name()}"
# Get the list of tables in the query

# Drop ${ and }
sql_query = re.sub(DERIVED_VIEW_PATTERN, r"\1", sql_query)
def datahub_transformed_sql(self) -> str:
"""
This function should only be called if is_sql_based_derived_case return true
"""
derived_table = self.derived_table()

return sql_query
return derived_table["datahub_transformed_sql"]

def name(self) -> str:
return self.raw_view[NAME]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
from datahub.configuration.git import GitInfo
from datahub.configuration.source_common import EnvConfigMixin
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.source.looker.looker_config import LookerCommonConfig
from datahub.ingestion.source.looker.looker_connection import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_config import (
LookerCommonConfig,
LookerConnectionDefinition,
)
from datahub.ingestion.source.looker.looker_lib_wrapper import (
LookerAPI,
LookerAPIConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ def from_looker_dict(
view_logic = view_context.view_file.raw_file_content[:max_file_snippet_length]

if view_context.is_sql_based_derived_case():
view_logic = view_context.sql(transformed=False)
# Parse SQL to extract dependencies.
view_logic = view_context.sql()
view_details = ViewProperties(
materialized=False,
viewLogic=view_logic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,21 @@ class AbstractViewUpstream(ABC):
view_context: LookerViewContext
looker_view_id_cache: LookerViewIdCache
config: LookMLSourceConfig
reporter: LookMLSourceReport
ctx: PipelineContext

def __init__(
self,
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
self.view_context = view_context
self.looker_view_id_cache = looker_view_id_cache
self.config = config
self.reporter = reporter
self.ctx = ctx

@abstractmethod
Expand All @@ -244,9 +247,10 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)
# These are the function where we need to catch the response once calculated
self._get_spr = lru_cache(maxsize=1)(self.__get_spr)
self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
Expand All @@ -259,25 +263,14 @@ def __get_spr(self) -> Optional[SqlParsingResult]:
return None

spr = create_lineage_sql_parsed_result(
query=self.view_context.sql(),
query=self.view_context.datahub_transformed_sql(),
default_schema=self.view_context.view_connection.default_schema,
default_db=self.view_context.view_connection.default_db,
platform=self.view_context.view_connection.platform,
platform_instance=self.view_context.view_connection.platform_instance,
env=self.view_context.view_connection.platform_env or self.config.env,
graph=self.ctx.graph,
)

if (
spr.debug_info.table_error is not None
or spr.debug_info.column_error is not None
):
logging.debug(
f"Failed to parsed the sql query. table_error={spr.debug_info.table_error} and "
f"column_error={spr.debug_info.column_error}"
)
return None

return spr

def __get_upstream_dataset_urn(self) -> List[Urn]:
Expand All @@ -286,6 +279,15 @@ def __get_upstream_dataset_urn(self) -> List[Urn]:
if sql_parsing_result is None:
return []

if sql_parsing_result.debug_info.table_error is not None:
self.reporter.report_warning(
title="Table Level Lineage Missing",
message="Error in parsing derived sql",
context=f"View-name: {self.view_context.name()}. "
f"Error: {sql_parsing_result.debug_info.table_error}",
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)
return []

upstream_dataset_urns: List[str] = [
_drop_hive_dot(urn) for urn in sql_parsing_result.in_tables
]
Expand All @@ -306,6 +308,15 @@ def create_fields(self) -> List[ViewField]:
if spr is None:
return []

if spr.debug_info.column_error is not None:
self.reporter.report_warning(
title="Column Level Lineage Missing",
message="Error in parsing derived sql for CLL",
context=f"View-name: {self.view_context.name()}. "
f"Error: {spr.debug_info.column_error}",
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)
return []

fields: List[ViewField] = []

column_lineages: List[ColumnLineageInfo] = (
Expand Down Expand Up @@ -336,6 +347,15 @@ def get_upstream_column_ref(
if sql_parsing_result is None:
return []

if sql_parsing_result.debug_info.column_error is not None:
self.reporter.report_warning(
title="Column Level Lineage Missing",
message="Error in parsing derived sql for CLL",
context=f"View-name: {self.view_context.name()}. "
f"Error: {sql_parsing_result.debug_info.column_error}",
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved
)
return []

upstreams_column_refs: List[ColumnRef] = []
if sql_parsing_result.column_lineage:
for cll in sql_parsing_result.column_lineage:
Expand Down Expand Up @@ -384,9 +404,11 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)

self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
self.__get_upstream_dataset_urn
)
Expand All @@ -402,7 +424,7 @@ def __get_upstream_dataset_urn(self) -> List[str]:
base_folder_path=self.view_context.base_folder_path,
)

# Current view will always be present in cache. The assert will silence the lint
# Current view will always be present in cache. assert will silence the lint
assert current_view_id

# We're creating a "LookerExplore" just to use the urn generator.
Expand Down Expand Up @@ -467,9 +489,10 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)
self.upstream_dataset_urn = None

self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
Expand All @@ -478,9 +501,9 @@ def __init__(

def __get_upstream_dataset_urn(self) -> Urn:
# In regular case view's upstream dataset is either same as view-name or mentioned in "sql_table_name" field
# view_context.sql_table_name() handle this condition to return dataset name
# view_context.datahub_transformed_sql_table_name() handle this condition to return dataset name
qualified_table_name: str = _generate_fully_qualified_name(
sql_table_name=self.view_context.sql_table_name(),
sql_table_name=self.view_context.datahub_transformed_sql_table_name(),
connection_def=self.view_context.view_connection,
reporter=self.view_context.reporter,
)
Expand Down Expand Up @@ -522,20 +545,21 @@ def __init__(
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
):
super().__init__(view_context, looker_view_id_cache, config, ctx)
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)
self.upstream_dataset_urn = []

self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
self.__get_upstream_dataset_urn
)

def __get_upstream_dataset_urn(self) -> List[Urn]:
# In this case view_context.sql_table_name() refers to derived view name
# In this case view_context.datahub_transformed_sql_table_name() refers to derived view name
looker_view_id = get_derived_looker_view_id(
qualified_table_name=_generate_fully_qualified_name(
self.view_context.sql_table_name(),
self.view_context.datahub_transformed_sql_table_name(),
self.view_context.view_connection,
self.view_context.reporter,
),
Expand Down Expand Up @@ -591,6 +615,7 @@ def create_view_upstream(
return RegularViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -599,6 +624,7 @@ def create_view_upstream(
return DotSqlTableNameViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -610,6 +636,7 @@ def create_view_upstream(
return SqlBasedDerivedViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -618,6 +645,7 @@ def create_view_upstream(
return NativeDerivedViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Expand All @@ -631,6 +659,7 @@ def create_view_upstream(
return EmptyImplementation(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
)
Loading
Loading