Skip to content

Commit

Permalink
Merge branch 'master' into fix-looker-tags-owner
Browse files Browse the repository at this point in the history
  • Loading branch information
jjoyce0510 authored Oct 11, 2023
2 parents e93efc9 + e298801 commit 626d309
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
Expand Down Expand Up @@ -35,4 +36,20 @@ public Set<String> getOwners() {
}
return fieldResolvers.get(ResourceFieldType.OWNER).getFieldValuesFuture().join().getValues();
}

/**
* Fetch the platform instance for a Resolved Resource Spec
* @return a Platform Instance or null if one does not exist.
*/
@Nullable
public String getDataPlatformInstance() {
if (!fieldResolvers.containsKey(ResourceFieldType.DATA_PLATFORM_INSTANCE)) {
return null;
}
Set<String> dataPlatformInstance = fieldResolvers.get(ResourceFieldType.DATA_PLATFORM_INSTANCE).getFieldValuesFuture().join().getValues();
if (dataPlatformInstance.size() > 0) {
return dataPlatformInstance.stream().findFirst().get();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,9 @@ public enum ResourceFieldType {
/**
* Domains of resource
*/
DOMAIN
DOMAIN,
/**
* Data platform instance of resource
*/
DATA_PLATFORM_INSTANCE
}
74 changes: 10 additions & 64 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
FIELD_TYPE_MAPPING,
MetadataQueryException,
TableauLineageOverrides,
TableauUpstreamReference,
clean_query,
custom_sql_graphql_query,
dashboard_graphql_query,
Expand All @@ -85,7 +86,6 @@
get_overridden_info,
get_unique_custom_sql,
make_fine_grained_lineage_class,
make_table_urn,
make_upstream_class,
published_datasource_graphql_query,
query_metadata,
Expand Down Expand Up @@ -271,7 +271,7 @@ class TableauConfig(
"You can change this if your Tableau projects contain slashes in their names, and you'd like to filter by project.",
)

default_schema_map: dict = Field(
default_schema_map: Dict[str, str] = Field(
default={}, description="Default schema to use when schema is not found."
)
ingest_tags: Optional[bool] = Field(
Expand Down Expand Up @@ -997,41 +997,16 @@ def get_upstream_tables(
)
continue

schema = table.get(tableau_constant.SCHEMA) or ""
table_name = table.get(tableau_constant.NAME) or ""
full_name = table.get(tableau_constant.FULL_NAME) or ""
upstream_db = (
table[tableau_constant.DATABASE][tableau_constant.NAME]
if table.get(tableau_constant.DATABASE)
and table[tableau_constant.DATABASE].get(tableau_constant.NAME)
else ""
)
logger.debug(
"Processing Table with Connection Type: {0} and id {1}".format(
table.get(tableau_constant.CONNECTION_TYPE) or "",
table.get(tableau_constant.ID) or "",
try:
ref = TableauUpstreamReference.create(
table, default_schema_map=self.config.default_schema_map
)
)
schema = self._get_schema(schema, upstream_db, full_name)
# if the schema is included within the table name we omit it
if (
schema
and table_name
and full_name
and table_name == full_name
and schema in table_name
):
logger.debug(
f"Omitting schema for upstream table {table[tableau_constant.ID]}, schema included in table name"
)
schema = ""
except Exception as e:
logger.info(f"Failed to generate upstream reference for {table}: {e}")
continue

table_urn = make_table_urn(
table_urn = ref.make_dataset_urn(
self.config.env,
upstream_db,
table.get(tableau_constant.CONNECTION_TYPE) or "",
schema,
table_name,
self.config.platform_instance_map,
self.config.lineage_overrides,
)
Expand All @@ -1052,7 +1027,7 @@ def get_upstream_tables(
urn=table_urn,
id=table[tableau_constant.ID],
num_cols=num_tbl_cols,
paths=set([table_path]) if table_path else set(),
paths={table_path} if table_path else set(),
)
else:
self.database_tables[table_urn].update_table(
Expand Down Expand Up @@ -2462,35 +2437,6 @@ def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
is_embedded_ds=True,
)

@lru_cache(maxsize=None)
def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str:
# For some databases, the schema attribute in tableau api does not return
# correct schema name for the table. For more information, see
# https://help.tableau.com/current/api/metadata_api/en-us/docs/meta_api_model.html#schema_attribute.
# Hence we extract schema from fullName whenever fullName is available
schema = self._extract_schema_from_fullName(fullName) if fullName else ""
if not schema:
schema = schema_provided
elif schema != schema_provided:
logger.debug(
"Correcting schema, provided {0}, corrected {1}".format(
schema_provided, schema
)
)

if not schema and database in self.config.default_schema_map:
schema = self.config.default_schema_map[database]

return schema

@lru_cache(maxsize=None)
def _extract_schema_from_fullName(self, fullName: str) -> str:
# fullName is observed to be in format [schemaName].[tableName]
# OR simply tableName OR [tableName]
if fullName.startswith("[") and "].[" in fullName:
return fullName[1 : fullName.index("]")]
return ""

@lru_cache(maxsize=None)
def get_last_modified(
self, creator: Optional[str], created_at: bytes, updated_at: bytes
Expand Down
162 changes: 123 additions & 39 deletions metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import html
import logging
from dataclasses import dataclass
from functools import lru_cache
from typing import Dict, List, Optional, Tuple

from pydantic.fields import Field

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.source import tableau_constant as tc
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
Expand All @@ -31,6 +34,8 @@
)
from datahub.utilities.sqlglot_lineage import ColumnLineageInfo, SqlParsingResult

logger = logging.getLogger(__name__)


class TableauLineageOverrides(ConfigModel):
platform_override_map: Optional[Dict[str, str]] = Field(
Expand Down Expand Up @@ -537,12 +542,12 @@ def get_fully_qualified_table_name(
platform: str,
upstream_db: str,
schema: str,
full_name: str,
table_name: str,
) -> str:
if platform == "athena":
upstream_db = ""
database_name = f"{upstream_db}." if upstream_db else ""
final_name = full_name.replace("[", "").replace("]", "")
final_name = table_name.replace("[", "").replace("]", "")

schema_name = f"{schema}." if schema else ""

Expand Down Expand Up @@ -573,17 +578,123 @@ def get_fully_qualified_table_name(
return fully_qualified_table_name


def get_platform_instance(
platform: str, platform_instance_map: Optional[Dict[str, str]]
) -> Optional[str]:
if platform_instance_map is not None and platform in platform_instance_map.keys():
return platform_instance_map[platform]
@dataclass
class TableauUpstreamReference:
database: Optional[str]
schema: Optional[str]
table: str

connection_type: str

@classmethod
def create(
cls, d: dict, default_schema_map: Optional[Dict[str, str]] = None
) -> "TableauUpstreamReference":
# Values directly from `table` object from Tableau
database = t_database = d.get(tc.DATABASE, {}).get(tc.NAME)
schema = t_schema = d.get(tc.SCHEMA)
table = t_table = d.get(tc.NAME) or ""
t_full_name = d.get(tc.FULL_NAME)
t_connection_type = d[tc.CONNECTION_TYPE] # required to generate urn
t_id = d[tc.ID]

parsed_full_name = cls.parse_full_name(t_full_name)
if parsed_full_name and len(parsed_full_name) == 3:
database, schema, table = parsed_full_name
elif parsed_full_name and len(parsed_full_name) == 2:
schema, table = parsed_full_name
else:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" Did not parse full name {t_full_name}: unexpected number of values",
)

if not schema and default_schema_map and database in default_schema_map:
schema = default_schema_map[database]

if database != t_database:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing database {t_database} with {database} from full name {t_full_name}"
)
if schema != t_schema:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing schema {t_schema} with {schema} from full name {t_full_name}"
)
if table != t_table:
logger.debug(
f"Upstream urn generation ({t_id}):"
f" replacing table {t_table} with {table} from full name {t_full_name}"
)

# TODO: See if we can remove this -- made for redshift
if (
schema
and t_table
and t_full_name
and t_table == t_full_name
and schema in t_table
):
logger.debug(
f"Omitting schema for upstream table {t_id}, schema included in table name"
)
schema = ""

return cls(
database=database,
schema=schema,
table=table,
connection_type=t_connection_type,
)

@staticmethod
def parse_full_name(full_name: Optional[str]) -> Optional[List[str]]:
# fullName is observed to be in formats:
# [database].[schema].[table]
# [schema].[table]
# [table]
# table
# schema

# TODO: Validate the startswith check. Currently required for our integration tests
if full_name is None or not full_name.startswith("["):
return None

return full_name.replace("[", "").replace("]", "").split(".")

def make_dataset_urn(
self,
env: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:
(
upstream_db,
platform_instance,
platform,
original_platform,
) = get_overridden_info(
connection_type=self.connection_type,
upstream_db=self.database,
lineage_overrides=lineage_overrides,
platform_instance_map=platform_instance_map,
)

table_name = get_fully_qualified_table_name(
original_platform,
upstream_db or "",
self.schema,
self.table,
)

return None
return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)


def get_overridden_info(
connection_type: str,
connection_type: Optional[str],
upstream_db: Optional[str],
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
Expand All @@ -605,43 +716,16 @@ def get_overridden_info(
):
upstream_db = lineage_overrides.database_override_map[upstream_db]

platform_instance = get_platform_instance(original_platform, platform_instance_map)
platform_instance = (
platform_instance_map.get(original_platform) if platform_instance_map else None
)

if original_platform in ("athena", "hive", "mysql"): # Two tier databases
upstream_db = None

return upstream_db, platform_instance, platform, original_platform


def make_table_urn(
env: str,
upstream_db: Optional[str],
connection_type: str,
schema: str,
full_name: str,
platform_instance_map: Optional[Dict[str, str]],
lineage_overrides: Optional[TableauLineageOverrides] = None,
) -> str:

upstream_db, platform_instance, platform, original_platform = get_overridden_info(
connection_type=connection_type,
upstream_db=upstream_db,
lineage_overrides=lineage_overrides,
platform_instance_map=platform_instance_map,
)

table_name = get_fully_qualified_table_name(
original_platform,
upstream_db if upstream_db is not None else "",
schema,
full_name,
)

return builder.make_dataset_urn_with_platform_instance(
platform, table_name, platform_instance, env
)


def make_description_from_params(description, formula):
"""
Generate column description
Expand Down
Loading

0 comments on commit 626d309

Please sign in to comment.