diff --git a/datahub-graphql-core/src/main/resources/search.graphql b/datahub-graphql-core/src/main/resources/search.graphql index 4cabdb04afe77..e0cde5a2db9f9 100644 --- a/datahub-graphql-core/src/main/resources/search.graphql +++ b/datahub-graphql-core/src/main/resources/search.graphql @@ -458,6 +458,26 @@ enum FilterOperator { Represents the relation: The field exists. If the field is an array, the field is either not present or empty. """ EXISTS + + """ + Represent the relation greater than, e.g. ownerCount > 5 + """ + GREATER_THAN + + """ + Represent the relation greater than or equal to, e.g. ownerCount >= 5 + """ + GREATER_THAN_OR_EQUAL_TO + + """ + Represent the relation less than, e.g. ownerCount < 3 + """ + LESS_THAN + + """ + Represent the relation less than or equal to, e.g. ownerCount <= 3 + """ + LESS_THAN_OR_EQUAL_TO } """ diff --git a/datahub-web-react/src/graphql/scroll.graphql b/datahub-web-react/src/graphql/scroll.graphql index 18274c50c2166..1031fed7b9e13 100644 --- a/datahub-web-react/src/graphql/scroll.graphql +++ b/datahub-web-react/src/graphql/scroll.graphql @@ -408,6 +408,7 @@ fragment downloadScrollAcrossLineageResult on ScrollAcrossLineageResults { count total searchResults { + degree entity { ...downloadSearchResults } diff --git a/metadata-ingestion/examples/library/create_dataproduct.py b/metadata-ingestion/examples/library/create_dataproduct.py new file mode 100644 index 0000000000000..245395b602480 --- /dev/null +++ b/metadata-ingestion/examples/library/create_dataproduct.py @@ -0,0 +1,25 @@ +from datahub.api.entities.dataproduct.dataproduct import DataProduct +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph + +gms_endpoint = "http://localhost:8080" +graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint)) + +data_product = DataProduct( + id="pet_of_the_week", + display_name="Pet of the Week Campagin", + domain="urn:li:domain:ef39e99a-9d61-406d-b4a8-c70b16380206", + description="This campaign includes Pet of the Week data.", + assets=[ + "urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.analytics.pet_details,PROD)", + "urn:li:dashboard:(looker,baz)", + "urn:li:dataFlow:(airflow,dag_abc,PROD)", + ], + owners=[{"id": "urn:li:corpuser:jdoe", "type": "BUSINESS_OWNER"}], + terms=["urn:li:glossaryTerm:ClientsAndAccounts.AccountBalance"], + tags=["urn:li:tag:adoption"], + properties={"lifecycle": "production", "sla": "7am every day"}, + external_url="https://en.wikipedia.org/wiki/Sloth", +) + +for mcp in data_product.generate_mcp(upsert=False): + graph.emit(mcp) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 545cafca9d4df..7be565d51260d 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -38,7 +38,6 @@ "progressbar2", "termcolor>=1.0.0", "psutil>=5.8.0", - "ratelimiter", "Deprecated", "humanfriendly", "packaging", @@ -354,7 +353,7 @@ | {"psycopg2-binary", "pymysql>=1.0.2"}, "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib, - "redshift": sql_common | redshift_common | usage_common | {"redshift-connector"}, + "redshift": sql_common | redshift_common | usage_common | sqlglot_lib | {"redshift-connector"}, "redshift-legacy": sql_common | redshift_common, "redshift-usage-legacy": sql_common | usage_common | redshift_common, "s3": {*s3_base, *data_lake_profiling}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py index 501162455cc45..878b8dd1bb9a5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/s3_util.py @@ -34,21 +34,26 @@ def get_bucket_relative_path(s3_uri: str) -> str: return "/".join(strip_s3_prefix(s3_uri).split("/")[1:]) -def make_s3_urn(s3_uri: str, env: str) -> str: +def make_s3_urn(s3_uri: str, env: str, remove_extension: bool = True) -> str: s3_name = strip_s3_prefix(s3_uri) if s3_name.endswith("/"): s3_name = s3_name[:-1] name, extension = os.path.splitext(s3_name) - - if extension != "": + if remove_extension and extension != "": extension = extension[1:] # remove the dot return f"urn:li:dataset:(urn:li:dataPlatform:s3,{name}_{extension},{env})" return f"urn:li:dataset:(urn:li:dataPlatform:s3,{s3_name},{env})" +def make_s3_urn_for_lineage(s3_uri: str, env: str) -> str: + # Ideally this is the implementation for all S3 URNs + # Don't feel comfortable changing `make_s3_urn` for glue, sagemaker, and athena + return make_s3_urn(s3_uri, env, remove_extension=False) + + def get_bucket_name(s3_uri: str) -> str: if not is_s3_uri(s3_uri): raise ValueError( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py index 03b12c61ee5c6..db552c09cd0a7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit_log_api.py @@ -4,7 +4,6 @@ from google.cloud import bigquery from google.cloud.logging_v2.client import Client as GCPLoggingClient -from ratelimiter import RateLimiter from datahub.ingestion.source.bigquery_v2.bigquery_audit import ( AuditLogEntry, @@ -17,6 +16,7 @@ BQ_DATE_SHARD_FORMAT, BQ_DATETIME_FORMAT, ) +from datahub.utilities.ratelimiter import RateLimiter logger: logging.Logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index 9a993f5774032..0a15c352fc842 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -21,7 +21,7 @@ import datahub.emitter.mce_builder as builder from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.ingestion.source.aws.s3_util import make_s3_urn +from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage from datahub.ingestion.source.snowflake.constants import ( LINEAGE_PERMISSION_ERROR, SnowflakeEdition, @@ -652,7 +652,9 @@ def get_external_upstreams(self, external_lineage: Set[str]) -> List[UpstreamCla # For now, populate only for S3 if external_lineage_entry.startswith("s3://"): external_upstream_table = UpstreamClass( - dataset=make_s3_urn(external_lineage_entry, self.config.env), + dataset=make_s3_urn_for_lineage( + external_lineage_entry, self.config.env + ), type=DatasetLineageTypeClass.COPY, ) external_upstreams.append(external_upstream_table) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 6080cf7b371e3..e628e4dbd3446 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -1,7 +1,7 @@ import logging from dataclasses import dataclass from datetime import datetime -from typing import Iterable, Optional, Set, Union +from typing import Iterable, MutableMapping, Optional, Union # This import verifies that the dependencies are available. import teradatasqlalchemy # noqa: F401 @@ -12,7 +12,6 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BaseTimeWindowConfig -from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.sql_parsing_builder import SqlParsingBuilder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -34,11 +33,7 @@ from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.ingestion.source_report.time_window import BaseTimeWindowReport -from datahub.metadata._schema_classes import ( - MetadataChangeEventClass, - SchemaMetadataClass, - ViewPropertiesClass, -) +from datahub.metadata._schema_classes import SchemaMetadataClass, ViewPropertiesClass from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BytesTypeClass, TimeTypeClass, @@ -112,6 +107,11 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): description="Generate usage statistic.", ) + use_file_backed_cache: bool = Field( + default=True, + description="Whether to use a file backed cache for the view definitions.", + ) + @platform_name("Teradata") @config_class(TeradataConfig) @@ -142,7 +142,8 @@ class TeradataSource(TwoTierSQLAlchemySource): and "timestamp" >= TIMESTAMP '{start_time}' and "timestamp" < TIMESTAMP '{end_time}' """ - urns: Optional[Set[str]] + + _view_definition_cache: MutableMapping[str, str] def __init__(self, config: TeradataConfig, ctx: PipelineContext): super().__init__(config, ctx, "teradata") @@ -166,7 +167,10 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): env=self.config.env, ) - self._view_definition_cache: FileBackedDict[str] = FileBackedDict() + if self.config.use_file_backed_cache: + self._view_definition_cache = FileBackedDict[str]() + else: + self._view_definition_cache = {} @classmethod def create(cls, config_dict, ctx): @@ -249,24 +253,13 @@ def get_metadata_engine(self) -> Engine: def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: # Add all schemas to the schema resolver for wu in super().get_workunits_internal(): - if isinstance(wu.metadata, MetadataChangeEventClass): - if wu.metadata.proposedSnapshot: - for aspect in wu.metadata.proposedSnapshot.aspects: - if isinstance(aspect, SchemaMetadataClass): - self.schema_resolver.add_schema_metadata( - wu.metadata.proposedSnapshot.urn, - aspect, - ) - break - if isinstance(wu.metadata, MetadataChangeProposalWrapper): - if ( - wu.metadata.entityUrn - and isinstance(wu.metadata.aspect, ViewPropertiesClass) - and wu.metadata.aspect.viewLogic - ): - self._view_definition_cache[ - wu.metadata.entityUrn - ] = wu.metadata.aspect.viewLogic + urn = wu.get_urn() + schema_metadata = wu.get_aspect_of_type(SchemaMetadataClass) + if schema_metadata: + self.schema_resolver.add_schema_metadata(urn, schema_metadata) + view_properties = wu.get_aspect_of_type(ViewPropertiesClass) + if view_properties and self.config.include_view_lineage: + self._view_definition_cache[urn] = view_properties.viewLogic yield wu if self.config.include_view_lineage: diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index a57ee39848855..16820c37d546e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -166,6 +166,14 @@ class UnityCatalogSourceConfig( description="Option to enable/disable lineage generation.", ) + include_external_lineage: bool = pydantic.Field( + default=True, + description=( + "Option to enable/disable lineage generation for external tables." + " Only external S3 tables are supported at the moment." + ), + ) + include_notebooks: bool = pydantic.Field( default=False, description="Ingest notebooks, represented as DataHub datasets.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 9bcdb200f180e..3fb77ce512ed2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -33,6 +33,7 @@ ALLOWED_STATEMENT_TYPES, Catalog, Column, + ExternalTableReference, Metastore, Notebook, Query, @@ -248,6 +249,13 @@ def table_lineage(self, table: Table, include_entity_lineage: bool) -> None: ) if table_ref: table.upstreams[table_ref] = {} + elif "fileInfo" in item: + external_ref = ExternalTableReference.create_from_lineage( + item["fileInfo"] + ) + if external_ref: + table.external_upstreams.add(external_ref) + for notebook in item.get("notebookInfos") or []: table.upstream_notebooks.add(notebook["notebook_id"]) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py index 18ac2475b51e0..315c1c0d20186 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy_types.py @@ -10,6 +10,7 @@ CatalogType, ColumnTypeName, DataSourceFormat, + SecurableType, TableType, ) from databricks.sdk.service.sql import QueryStatementType @@ -176,6 +177,35 @@ def external_path(self) -> str: return f"{self.catalog}/{self.schema}/{self.table}" +@dataclass(frozen=True, order=True) +class ExternalTableReference: + path: str + has_permission: bool + name: Optional[str] + type: Optional[SecurableType] + storage_location: Optional[str] + + @classmethod + def create_from_lineage(cls, d: dict) -> Optional["ExternalTableReference"]: + try: + securable_type: Optional[SecurableType] + try: + securable_type = SecurableType(d.get("securable_type", "").lower()) + except ValueError: + securable_type = None + + return cls( + path=d["path"], + has_permission=d.get("has_permission") or True, + name=d.get("securable_name"), + type=securable_type, + storage_location=d.get("storage_location"), + ) + except Exception as e: + logger.warning(f"Failed to create ExternalTableReference from {d}: {e}") + return None + + @dataclass class Table(CommonProperty): schema: Schema @@ -193,6 +223,7 @@ class Table(CommonProperty): view_definition: Optional[str] properties: Dict[str, str] upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict) + external_upstreams: Set[ExternalTableReference] = field(default_factory=set) upstream_notebooks: Set[NotebookId] = field(default_factory=set) downstream_notebooks: Set[NotebookId] = field(default_factory=set) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index fa61571fa92cb..4153d9dd88eb8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -19,6 +19,8 @@ class UnityCatalogReport(IngestionStageReport, StaleEntityRemovalSourceReport): notebooks: EntityFilterReport = EntityFilterReport.field(type="notebook") num_column_lineage_skipped_column_count: int = 0 + num_external_upstreams_lacking_permissions: int = 0 + num_external_upstreams_unsupported: int = 0 num_queries: int = 0 num_queries_dropped_parse_failure: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 27c1f341aa84d..b63cf65d55dc8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -41,6 +41,7 @@ TestConnectionReport, ) from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage from datahub.ingestion.source.common.subtypes import ( DatasetContainerSubTypes, DatasetSubTypes, @@ -455,6 +456,28 @@ def _generate_lineage_aspect( ) ) + if self.config.include_external_lineage: + for external_ref in table.external_upstreams: + if not external_ref.has_permission or not external_ref.path: + self.report.num_external_upstreams_lacking_permissions += 1 + logger.warning( + f"Lacking permissions for external file upstream on {table.ref}" + ) + elif external_ref.path.startswith("s3://"): + upstreams.append( + UpstreamClass( + dataset=make_s3_urn_for_lineage( + external_ref.path, self.config.env + ), + type=DatasetLineageTypeClass.COPY, + ) + ) + else: + self.report.num_external_upstreams_unsupported += 1 + logger.warning( + f"Unsupported external file upstream on {table.ref}: {external_ref.path}" + ) + if upstreams: return UpstreamLineageClass( upstreams=upstreams, diff --git a/metadata-ingestion/src/datahub/utilities/file_backed_collections.py b/metadata-ingestion/src/datahub/utilities/file_backed_collections.py index c04d2138bc116..18493edded4b7 100644 --- a/metadata-ingestion/src/datahub/utilities/file_backed_collections.py +++ b/metadata-ingestion/src/datahub/utilities/file_backed_collections.py @@ -3,6 +3,7 @@ import logging import pathlib import pickle +import shutil import sqlite3 import tempfile from dataclasses import dataclass, field @@ -56,15 +57,15 @@ class ConnectionWrapper: conn: sqlite3.Connection filename: pathlib.Path - _temp_directory: Optional[tempfile.TemporaryDirectory] + _temp_directory: Optional[str] def __init__(self, filename: Optional[pathlib.Path] = None): self._temp_directory = None # Warning: If filename is provided, the file will not be automatically cleaned up. if not filename: - self._temp_directory = tempfile.TemporaryDirectory() - filename = pathlib.Path(self._temp_directory.name) / _DEFAULT_FILE_NAME + self._temp_directory = tempfile.mkdtemp() + filename = pathlib.Path(self._temp_directory) / _DEFAULT_FILE_NAME self.conn = sqlite3.connect(filename, isolation_level=None) self.conn.row_factory = sqlite3.Row @@ -101,7 +102,8 @@ def executemany( def close(self) -> None: self.conn.close() if self._temp_directory: - self._temp_directory.cleanup() + shutil.rmtree(self._temp_directory) + self._temp_directory = None def __enter__(self) -> "ConnectionWrapper": return self diff --git a/metadata-ingestion/src/datahub/utilities/ratelimiter.py b/metadata-ingestion/src/datahub/utilities/ratelimiter.py new file mode 100644 index 0000000000000..3d47d25e14c49 --- /dev/null +++ b/metadata-ingestion/src/datahub/utilities/ratelimiter.py @@ -0,0 +1,56 @@ +import collections +import threading +import time +from contextlib import AbstractContextManager +from typing import Any, Deque + + +# Modified version of https://github.com/RazerM/ratelimiter/blob/master/ratelimiter/_sync.py +class RateLimiter(AbstractContextManager): + + """Provides rate limiting for an operation with a configurable number of + requests for a time period. + """ + + def __init__(self, max_calls: int, period: float = 1.0) -> None: + """Initialize a RateLimiter object which enforces as much as max_calls + operations on period (eventually floating) number of seconds. + """ + if period <= 0: + raise ValueError("Rate limiting period should be > 0") + if max_calls <= 0: + raise ValueError("Rate limiting number of calls should be > 0") + + # We're using a deque to store the last execution timestamps, not for + # its maxlen attribute, but to allow constant time front removal. + self.calls: Deque = collections.deque() + + self.period = period + self.max_calls = max_calls + self._lock = threading.Lock() + + def __enter__(self) -> "RateLimiter": + with self._lock: + # We want to ensure that no more than max_calls were run in the allowed + # period. For this, we store the last timestamps of each call and run + # the rate verification upon each __enter__ call. + if len(self.calls) >= self.max_calls: + until = time.time() + self.period - self._timespan + sleeptime = until - time.time() + if sleeptime > 0: + time.sleep(sleeptime) + return self + + def __exit__(self, exc_type: Any, exc: Any, traceback: Any) -> None: + with self._lock: + # Store the last operation timestamp. + self.calls.append(time.time()) + + # Pop the timestamp list front (ie: the older calls) until the sum goes + # back below the period. This is our 'sliding period' window. + while self._timespan >= self.period: + self.calls.popleft() + + @property + def _timespan(self) -> float: + return self.calls[-1] - self.calls[0] diff --git a/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py b/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py new file mode 100644 index 0000000000000..0384e1f918881 --- /dev/null +++ b/metadata-ingestion/tests/unit/utilities/test_ratelimiter.py @@ -0,0 +1,20 @@ +from collections import defaultdict +from datetime import datetime +from typing import Dict + +from datahub.utilities.ratelimiter import RateLimiter + + +def test_rate_is_limited(): + MAX_CALLS_PER_SEC = 5 + TOTAL_CALLS = 18 + actual_calls: Dict[float, int] = defaultdict(lambda: 0) + + ratelimiter = RateLimiter(max_calls=MAX_CALLS_PER_SEC, period=1) + for _ in range(TOTAL_CALLS): + with ratelimiter: + actual_calls[datetime.now().replace(microsecond=0).timestamp()] += 1 + + assert len(actual_calls) == round(TOTAL_CALLS / MAX_CALLS_PER_SEC) + assert all(calls <= MAX_CALLS_PER_SEC for calls in actual_calls.values()) + assert sum(actual_calls.values()) == TOTAL_CALLS