Skip to content

Commit

Permalink
Merge branch 'master' into origin/DownloadLineageResultsCypressTest
Browse files Browse the repository at this point in the history
  • Loading branch information
kkorchak authored Oct 17, 2023
2 parents 5245143 + ae5fd90 commit 6e1f2dc
Show file tree
Hide file tree
Showing 16 changed files with 234 additions and 39 deletions.
20 changes: 20 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,26 @@ enum FilterOperator {
Represents the relation: The field exists. If the field is an array, the field is either not present or empty.
"""
EXISTS

"""
Represent the relation greater than, e.g. ownerCount > 5
"""
GREATER_THAN

"""
Represent the relation greater than or equal to, e.g. ownerCount >= 5
"""
GREATER_THAN_OR_EQUAL_TO

"""
Represent the relation less than, e.g. ownerCount < 3
"""
LESS_THAN

"""
Represent the relation less than or equal to, e.g. ownerCount <= 3
"""
LESS_THAN_OR_EQUAL_TO
}

"""
Expand Down
1 change: 1 addition & 0 deletions datahub-web-react/src/graphql/scroll.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ fragment downloadScrollAcrossLineageResult on ScrollAcrossLineageResults {
count
total
searchResults {
degree
entity {
...downloadSearchResults
}
Expand Down
25 changes: 25 additions & 0 deletions metadata-ingestion/examples/library/create_dataproduct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datahub.api.entities.dataproduct.dataproduct import DataProduct
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))

data_product = DataProduct(
id="pet_of_the_week",
display_name="Pet of the Week Campagin",
domain="urn:li:domain:ef39e99a-9d61-406d-b4a8-c70b16380206",
description="This campaign includes Pet of the Week data.",
assets=[
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD)",
"urn:li:dashboard:(looker,baz)",
"urn:li:dataFlow:(airflow,dag_abc,PROD)",
],
owners=[{"id": "urn:li:corpuser:jdoe", "type": "BUSINESS_OWNER"}],
terms=["urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance"],
tags=["urn:li:tag:adoption"],
properties={"lifecycle": "production", "sla": "7am every day"},
external_url="https://en.wikipedia.org/wiki/Sloth",
)

for mcp in data_product.generate_mcp(upsert=False):
graph.emit(mcp)
3 changes: 1 addition & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"progressbar2",
"termcolor>=1.0.0",
"psutil>=5.8.0",
"ratelimiter",
"Deprecated",
"humanfriendly",
"packaging",
Expand Down Expand Up @@ -354,7 +353,7 @@
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redshift": sql_common | redshift_common | usage_common | {"redshift-connector"},
"redshift": sql_common | redshift_common | usage_common | sqlglot_lib | {"redshift-connector"},
"redshift-legacy": sql_common | redshift_common,
"redshift-usage-legacy": sql_common | usage_common | redshift_common,
"s3": {*s3_base, *data_lake_profiling},
Expand Down
11 changes: 8 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,26 @@ def get_bucket_relative_path(s3_uri: str) -> str:
return "/".join(strip_s3_prefix(s3_uri).split("/")[1:])


def make_s3_urn(s3_uri: str, env: str) -> str:
def make_s3_urn(s3_uri: str, env: str, remove_extension: bool = True) -> str:
s3_name = strip_s3_prefix(s3_uri)

if s3_name.endswith("/"):
s3_name = s3_name[:-1]

name, extension = os.path.splitext(s3_name)

if extension != "":
if remove_extension and extension != "":
extension = extension[1:] # remove the dot
return f"urn:li:dataset:(urn:li:dataPlatform:s3,{name}_{extension},{env})"

return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})"


def make_s3_urn_for_lineage(s3_uri: str, env: str) -> str:
# Ideally this is the implementation for all S3 URNs
# Don't feel comfortable changing `make_s3_urn` for glue, sagemaker, and athena
return make_s3_urn(s3_uri, env, remove_extension=False)


def get_bucket_name(s3_uri: str) -> str:
if not is_s3_uri(s3_uri):
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from ratelimiter import RateLimiter

from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
AuditLogEntry,
Expand All @@ -17,6 +16,7 @@
BQ_DATE_SHARD_FORMAT,
BQ_DATETIME_FORMAT,
)
from datahub.utilities.ratelimiter import RateLimiter

logger: logging.Logger = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.snowflake.constants import (
LINEAGE_PERMISSION_ERROR,
SnowflakeEdition,
Expand Down Expand Up @@ -652,7 +652,9 @@ def get_external_upstreams(self, external_lineage: Set[str]) -> List[UpstreamCla
# For now, populate only for S3
if external_lineage_entry.startswith("s3://"):
external_upstream_table = UpstreamClass(
dataset=make_s3_urn(external_lineage_entry, self.config.env),
dataset=make_s3_urn_for_lineage(
external_lineage_entry, self.config.env
),
type=DatasetLineageTypeClass.COPY,
)
external_upstreams.append(external_upstream_table)
Expand Down
47 changes: 20 additions & 27 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional, Set, Union
from typing import Iterable, MutableMapping, Optional, Union

# This import verifies that the dependencies are available.
import teradatasqlalchemy # noqa: F401
Expand All @@ -12,7 +12,6 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -34,11 +33,7 @@
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import (
MetadataChangeEventClass,
SchemaMetadataClass,
ViewPropertiesClass,
)
from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
Expand Down Expand Up @@ -112,6 +107,11 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
description="Generate usage statistic.",
)

use_file_backed_cache: bool = Field(
default=True,
description="Whether to use a file backed cache for the view definitions.",
)


@platform_name("Teradata")
@config_class(TeradataConfig)
Expand Down Expand Up @@ -142,7 +142,8 @@ class TeradataSource(TwoTierSQLAlchemySource):
and "timestamp" >= TIMESTAMP '{start_time}'
and "timestamp" < TIMESTAMP '{end_time}'
"""
urns: Optional[Set[str]]

_view_definition_cache: MutableMapping[str, str]

def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata")
Expand All @@ -166,7 +167,10 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
env=self.config.env,
)

self._view_definition_cache: FileBackedDict[str] = FileBackedDict()
if self.config.use_file_backed_cache:
self._view_definition_cache = FileBackedDict[str]()
else:
self._view_definition_cache = {}

@classmethod
def create(cls, config_dict, ctx):
Expand Down Expand Up @@ -249,24 +253,13 @@ def get_metadata_engine(self) -> Engine:
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
# Add all schemas to the schema resolver
for wu in super().get_workunits_internal():
if isinstance(wu.metadata, MetadataChangeEventClass):
if wu.metadata.proposedSnapshot:
for aspect in wu.metadata.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
self.schema_resolver.add_schema_metadata(
wu.metadata.proposedSnapshot.urn,
aspect,
)
break
if isinstance(wu.metadata, MetadataChangeProposalWrapper):
if (
wu.metadata.entityUrn
and isinstance(wu.metadata.aspect, ViewPropertiesClass)
and wu.metadata.aspect.viewLogic
):
self._view_definition_cache[
wu.metadata.entityUrn
] = wu.metadata.aspect.viewLogic
urn = wu.get_urn()
schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass)
if schema_metadata:
self.schema_resolver.add_schema_metadata(urn, schema_metadata)
view_properties = wu.get_aspect_of_type(ViewPropertiesClass)
if view_properties and self.config.include_view_lineage:
self._view_definition_cache[urn] = view_properties.viewLogic
yield wu

if self.config.include_view_lineage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ class UnityCatalogSourceConfig(
description="Option to enable/disable lineage generation.",
)

include_external_lineage: bool = pydantic.Field(
default=True,
description=(
"Option to enable/disable lineage generation for external tables."
" Only external S3 tables are supported at the moment."
),
)

include_notebooks: bool = pydantic.Field(
default=False,
description="Ingest notebooks, represented as DataHub datasets.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
ALLOWED_STATEMENT_TYPES,
Catalog,
Column,
ExternalTableReference,
Metastore,
Notebook,
Query,
Expand Down Expand Up @@ -248,6 +249,13 @@ def table_lineage(self, table: Table, include_entity_lineage: bool) -> None:
)
if table_ref:
table.upstreams[table_ref] = {}
elif "fileInfo" in item:
external_ref = ExternalTableReference.create_from_lineage(
item["fileInfo"]
)
if external_ref:
table.external_upstreams.add(external_ref)

for notebook in item.get("notebookInfos") or []:
table.upstream_notebooks.add(notebook["notebook_id"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CatalogType,
ColumnTypeName,
DataSourceFormat,
SecurableType,
TableType,
)
from databricks.sdk.service.sql import QueryStatementType
Expand Down Expand Up @@ -176,6 +177,35 @@ def external_path(self) -> str:
return f"{self.catalog}/{self.schema}/{self.table}"


@dataclass(frozen=True, order=True)
class ExternalTableReference:
path: str
has_permission: bool
name: Optional[str]
type: Optional[SecurableType]
storage_location: Optional[str]

@classmethod
def create_from_lineage(cls, d: dict) -> Optional["ExternalTableReference"]:
try:
securable_type: Optional[SecurableType]
try:
securable_type = SecurableType(d.get("securable_type", "").lower())
except ValueError:
securable_type = None

return cls(
path=d["path"],
has_permission=d.get("has_permission") or True,
name=d.get("securable_name"),
type=securable_type,
storage_location=d.get("storage_location"),
)
except Exception as e:
logger.warning(f"Failed to create ExternalTableReference from {d}: {e}")
return None


@dataclass
class Table(CommonProperty):
schema: Schema
Expand All @@ -193,6 +223,7 @@ class Table(CommonProperty):
view_definition: Optional[str]
properties: Dict[str, str]
upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict)
external_upstreams: Set[ExternalTableReference] = field(default_factory=set)
upstream_notebooks: Set[NotebookId] = field(default_factory=set)
downstream_notebooks: Set[NotebookId] = field(default_factory=set)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport):
notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook")

num_column_lineage_skipped_column_count: int = 0
num_external_upstreams_lacking_permissions: int = 0
num_external_upstreams_unsupported: int = 0

num_queries: int = 0
num_queries_dropped_parse_failure: int = 0
Expand Down
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
Expand Down Expand Up @@ -455,6 +456,28 @@ def _generate_lineage_aspect(
)
)

if self.config.include_external_lineage:
for external_ref in table.external_upstreams:
if not external_ref.has_permission or not external_ref.path:
self.report.num_external_upstreams_lacking_permissions += 1
logger.warning(
f"Lacking permissions for external file upstream on {table.ref}"
)
elif external_ref.path.startswith("s3://"):
upstreams.append(
UpstreamClass(
dataset=make_s3_urn_for_lineage(
external_ref.path, self.config.env
),
type=DatasetLineageTypeClass.COPY,
)
)
else:
self.report.num_external_upstreams_unsupported += 1
logger.warning(
f"Unsupported external file upstream on {table.ref}: {external_ref.path}"
)

if upstreams:
return UpstreamLineageClass(
upstreams=upstreams,
Expand Down
Loading

0 comments on commit 6e1f2dc

Please sign in to comment.