Skip to content

Commit

Permalink
Merge branch 'master' into snowflake_view_downstream_cll_for_platform…
Browse files Browse the repository at this point in the history
…instance
  • Loading branch information
mayurinehate authored Oct 27, 2023
2 parents dab2eed + 1ac831f commit 9a6d7b4
Show file tree
Hide file tree
Showing 37 changed files with 2,292 additions and 532 deletions.
4 changes: 3 additions & 1 deletion docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ if [[ $KAFKA_PROPERTIES_SECURITY_PROTOCOL == "SSL" ]]; then
fi
if [[ -n $KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION ]]; then
echo "ssl.truststore.location=$KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION" >> $CONNECTION_PROPERTIES_PATH
echo "ssl.truststore.password=$KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD" >> $CONNECTION_PROPERTIES_PATH
if [[ $KAFKA_PROPERTIES_SSL_TRUSTSTORE_TYPE != "PEM" ]]; then
echo "ssl.truststore.password=$KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD" >> $CONNECTION_PROPERTIES_PATH
fi
if [[ -n $KAFKA_PROPERTIES_SSL_TRUSTSTORE_TYPE ]]; then
echo "ssl.truststore.type=$KAFKA_PROPERTIES_SSL_TRUSTSTORE_TYPE" >> $CONNECTION_PROPERTIES_PATH
fi
Expand Down
52 changes: 28 additions & 24 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,36 @@
"grpcio-tools>=1.44.0,<2",
}

sql_common = {
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
usage_common = {
"sqlparse",
}

sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==18.5.2.dev45",
}

sql_common = (
{
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
}
| usage_common
| sqlglot_lib
)

sqllineage_lib = {
"sqllineage==1.3.8",
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
Expand All @@ -125,12 +139,6 @@
"sqlparse==0.4.4",
}

sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==18.5.2.dev45",
}

aws_common = {
# AWS Python SDK
"boto3",
Expand Down Expand Up @@ -243,10 +251,6 @@

powerbi_report_server = {"requests", "requests_ntlm"}

usage_common = {
"sqlparse",
}

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
"databricks-sdk>=0.9.0",
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class VersionedConfig(ConfigModel):

class LineageConfig(ConfigModel):
incremental_lineage: bool = Field(
default=True,
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def process_sql_parsing_result(
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
include_column_lineage: bool = True,
) -> Iterable[MetadataWorkUnit]:
"""Process a single query and yield any generated workunits.
Expand All @@ -130,7 +131,9 @@ def process_sql_parsing_result(
_merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=result.column_lineage,
column_lineage=result.column_lineage
if include_column_lineage
else None,
upstream_edges=self._lineage_map[downstream_urn],
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ def auto_incremental_lineage(
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
if lineage_aspect.fineGrainedLineages:
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns

return [
auto_lowercase_dataset_urns,
auto_status_aspect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:
"dataset_pattern is not set but schema_pattern is set, using schema_pattern as dataset_pattern. schema_pattern will be deprecated, please use dataset_pattern instead."
)
values["dataset_pattern"] = schema_pattern
dataset_pattern = schema_pattern
elif (
dataset_pattern != AllowDenyPattern.allow_all()
and schema_pattern != AllowDenyPattern.allow_all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from google.cloud.datacatalog import lineage_v1
from google.cloud.logging_v2.client import Client as GCPLoggingClient

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter import mce_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -683,8 +684,11 @@ def _create_lineage_map(
self.report.num_skipped_lineage_entries_missing_data[e.project_id] += 1
continue

if not self.config.dataset_pattern.allowed(
destination_table.table_identifier.dataset
if not is_schema_allowed(
self.config.dataset_pattern,
destination_table.table_identifier.dataset,
destination_table.table_identifier.project_id,
self.config.match_fully_qualified_names,
) or not self.config.table_pattern.allowed(
destination_table.table_identifier.get_table_name()
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import humanfriendly

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.configuration.time_window_config import (
BaseTimeWindowConfig,
get_time_bucket,
Expand Down Expand Up @@ -335,10 +336,11 @@ def get_time_window(self) -> Tuple[datetime, datetime]:
def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool:
return (
table_ref is not None
and self.config.dataset_pattern.allowed(
f"{table_ref.table_identifier.project_id}.{table_ref.table_identifier.dataset}"
if self.config.match_fully_qualified_names
else table_ref.table_identifier.dataset
and is_schema_allowed(
self.config.dataset_pattern,
table_ref.table_identifier.dataset,
table_ref.table_identifier.project_id,
self.config.match_fully_qualified_names,
)
and self.config.table_pattern.allowed(str(table_ref.table_identifier))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ class DBTCommonConfig(
default=False,
description="When enabled, dbt test warnings will be treated as failures.",
)
# override fault value to True.
incremental_lineage: bool = Field(
default=True,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

@validator("target_platform")
def validate_target_platform_value(cls, target_platform: str) -> str:
Expand Down
16 changes: 13 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from pymongo.mongo_client import MongoClient

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import EnvConfigMixin
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand Down Expand Up @@ -55,7 +59,7 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class MongoDBConfig(EnvConfigMixin):
class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
# See the MongoDB authentication docs for details and examples.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: str = Field(
Expand Down Expand Up @@ -199,6 +203,7 @@ def construct_schema_pymongo(
@platform_name("MongoDB")
@config_class(MongoDBConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@dataclass
class MongoDBSource(Source):
Expand Down Expand Up @@ -320,7 +325,12 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_dropped(dataset_name)
continue

dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{self.config.env})"
dataset_urn = make_dataset_urn_with_platform_instance(
platform=platform,
name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance,
)

dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def get_workunits(
return

self._populate_external_lineage_map(discovered_tables)

if self.config.include_view_lineage:
if len(discovered_views) > 0:
yield from self.get_view_upstream_workunits(
Expand Down
83 changes: 71 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import json
import logging
import re
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Union

from pydantic.class_validators import validator
from pydantic.fields import Field

# This import verifies that the dependencies are available.
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveDialect, HiveTimestamp
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
Expand All @@ -18,8 +21,10 @@
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.sql.sql_common import register_custom_type
from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
Expand All @@ -31,6 +36,7 @@
SchemaField,
TimeTypeClass,
)
from datahub.metadata.schema_classes import ViewPropertiesClass
from datahub.utilities import config_clean
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column

Expand Down Expand Up @@ -90,19 +96,34 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw):
logger.warning(f"Failed to patch method due to {e}")


@reflection.cache # type: ignore
def get_view_names_patched(self, connection, schema=None, **kw):
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
return [row[0] for row in connection.execute(query)]


@reflection.cache # type: ignore
def get_view_definition_patched(self, connection, view_name, schema=None, **kw):
full_table = self.identifier_preparer.quote_identifier(view_name)
if schema:
full_table = "{}.{}".format(
self.identifier_preparer.quote_identifier(schema),
self.identifier_preparer.quote_identifier(view_name),
)
row = connection.execute("SHOW CREATE TABLE {}".format(full_table)).fetchone()
return row[0]


HiveDialect.get_view_names = get_view_names_patched
HiveDialect.get_view_definition = get_view_definition_patched


class HiveConfig(TwoTierSQLAlchemyConfig):
# defaults
scheme = Field(default="hive", hidden_from_docs=True)

# Hive SQLAlchemy connector returns views as tables.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# Disabling views helps us prevent this duplication.
include_views = Field(
default=False,
hidden_from_docs=True,
description="Hive SQLAlchemy connector returns views as tables. See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273. Disabling views helps us prevent this duplication.",
)

@validator("host_port")
def clean_host_port(cls, v):
return config_clean.remove_protocol(v)
Expand Down Expand Up @@ -174,3 +195,41 @@ def get_schema_fields_for_column(
return new_fields

return fields

# Hive SQLAlchemy connector returns views as tables in get_table_names.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# This override makes sure that we ingest view definitions for views
def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: SQLCommonConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)

try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""

if view_definition:
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()
Loading

0 comments on commit 9a6d7b4

Please sign in to comment.