Skip to content

Commit

Permalink
Merge branch 'master' into pluginv2
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkSimplify authored Jul 25, 2023
2 parents 1e1d116 + eac003c commit 9e45dfb
Show file tree
Hide file tree
Showing 74 changed files with 5,680 additions and 3,036 deletions.
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
certain column-level metrics. Instead, set `profile_table_level_only` to `false` and
individually enable / disable desired field metrics.
- #8451: The `bigquery-beta` and `snowflake-beta` source aliases have been dropped. Use `bigquery` and `snowflake` as the source type instead.
- #8472: Ingestion runs created with Pipeline.create will show up in the DataHub ingestion tab as CLI-based runs. To revert to the previous behavior of not showing these runs in DataHub, pass `no_default_report=True`.

### Potential Downtime

Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/examples/library/create_mlmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@
description="my feature",
groups=model_group_urns,
mlFeatures=feature_urns,
trainingMetrics=[
models.MLMetricClass(
name="accuracy", description="accuracy of the model", value="1.0"
)
],
hyperParams=[
models.MLHyperParamClass(
name="hyper_1", description="hyper_1", value="0.102"
)
],
),
)

Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None:
"type": "file",
"config": {"filename": out_file.name},
},
}
},
no_default_report=True,
)

pipeline.run()
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/docker_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ def ingest_sample_data(path: Optional[str], token: Optional[str]) -> None:
if token is not None:
recipe["sink"]["config"]["token"] = token

pipeline = Pipeline.create(recipe)
pipeline = Pipeline.create(recipe, no_default_report=True)
pipeline.run()
ret = pipeline.pretty_print_summary()
sys.exit(ret)
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def mcps(path: str) -> None:
},
}

pipeline = Pipeline.create(recipe)
pipeline = Pipeline.create(recipe, no_default_report=True)
pipeline.run()
ret = pipeline.pretty_print_summary()
sys.exit(ret)
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor

platform_instance: Optional[str] = None
if isinstance(config, PlatformInstanceConfigMixin) and config.platform_instance:
platform_instance = platform_instance
platform_instance = config.platform_instance

return partial(
auto_browse_path_v2,
Expand Down
12 changes: 6 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ class DatahubClientConfig(ConfigModel):
"""Configuration class for holding connectivity to datahub gms"""

server: str = "http://localhost:8080"
token: Optional[str]
timeout_sec: Optional[int]
retry_status_codes: Optional[List[int]]
retry_max_times: Optional[int]
extra_headers: Optional[Dict[str, str]]
ca_certificate_path: Optional[str]
token: Optional[str] = None
timeout_sec: Optional[int] = None
retry_status_codes: Optional[List[int]] = None
retry_max_times: Optional[int] = None
extra_headers: Optional[Dict[str, str]] = None
ca_certificate_path: Optional[str] = None
disable_ssl_verification: bool = False

_max_threads_moved_to_sink = pydantic_removed_field(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def create(
dry_run: bool = False,
preview_mode: bool = False,
preview_workunits: int = 10,
report_to: Optional[str] = None,
report_to: Optional[str] = "datahub",
no_default_report: bool = False,
raw_config: Optional[dict] = None,
) -> "Pipeline":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
support_status,
)
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.sagemaker_processors.common import (
SagemakerSourceConfig,
Expand Down Expand Up @@ -57,9 +56,6 @@ def create(cls, config_dict, ctx):
config = SagemakerSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# get common lineage graph
lineage_processor = LineageProcessor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source_config.csv_enricher import CSVEnricherConfig
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -590,9 +589,6 @@ def maybe_extract_owners(
]
return owners

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# As per https://stackoverflow.com/a/49150749/5004662, we want to use
# the 'utf-8-sig' encoding to handle any BOM character that may be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags
from datahub.ingestion.source.aws.s3_util import (
Expand Down Expand Up @@ -340,9 +339,6 @@ def local_get_folders(self, path: str) -> Iterable[str]:
for folder in os.listdir(path):
yield os.path.join(path, folder)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.container_WU_creator = ContainerWUCreator(
self.source_config.platform,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
Expand Down Expand Up @@ -352,9 +351,6 @@ def create(
config = ElasticsearchSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
indices = self.client.indices.get_alias()
for index in indices:
Expand Down
5 changes: 0 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import sys

from datahub.ingestion.api.source_helpers import auto_workunit_reporter

if sys.version_info < (3, 8):
raise ImportError("Feast is only supported on Python 3.8+")

Expand Down Expand Up @@ -370,9 +368,6 @@ def create(cls, config_dict, ctx):
config = FeastRepositorySourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for feature_view in self.feature_store.list_feature_views():
for entity_name in feature_view.entities:
Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
Expand Down Expand Up @@ -611,9 +610,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = MetabaseConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_card_mces()
yield from self.emit_dashboard_mces()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import (
auto_status_aspect,
auto_workunit,
auto_workunit_reporter,
)
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.registries.domain_registry import DomainRegistry
Expand Down Expand Up @@ -503,14 +499,6 @@ def load_glossary_config(
glossary_cfg = BusinessGlossaryConfig.parse_obj(config)
return glossary_cfg

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(
self.report,
auto_status_aspect(
self.get_workunits_internal(),
),
)

def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass, field
from functools import partial
from typing import Any, Dict, Iterable, List, Optional

from pydantic import validator
Expand All @@ -26,8 +27,11 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
from datahub.ingestion.api.source_helpers import (
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
FineGrainedLineageDownstreamType,
Expand Down Expand Up @@ -139,8 +143,11 @@ def load_lineage_config(file_name: str) -> LineageConfig:
lineage_config = LineageConfig.parse_obj(config)
return lineage_config

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
auto_status_aspect,
partial(auto_workunit_reporter, self.get_report()),
]

def get_workunits_internal(
self,
Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
Expand Down Expand Up @@ -797,9 +796,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = ModeConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_dashboard_mces()
yield from self.emit_chart_mces()
Expand Down
8 changes: 0 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView

import bson
import bson.dbref
import bson.int64
import bson.objectid
import bson.timestamp
import pymongo
import pymongo.collection
Expand All @@ -26,7 +22,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.schema_inference.object import (
SchemaDescription,
Expand Down Expand Up @@ -301,9 +296,6 @@ def get_field_type(

return SchemaFieldDataType(type=TypeClass())

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
platform = "mongodb"

Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
DataFlowInfoClass,
Expand Down Expand Up @@ -1024,9 +1023,6 @@ def authenticate(self):
token_response.raise_for_status()
self.session.headers.update({"Authorization": "Bearer " + token_response.text})

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
try:
self.authenticate()
Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.openapi_parser import (
clean_url,
Expand Down Expand Up @@ -213,9 +212,6 @@ def build_wu(
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
return ApiWorkUnit(id=dataset_name, mce=mce)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
config = self.config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.powerbi_report_server.constants import (
API_ENDPOINTS,
Expand Down Expand Up @@ -533,9 +532,6 @@ def create(cls, config_dict, ctx):
config = PowerBiReportServerDashboardSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
Datahub Ingestion framework invoke this method
Expand Down
4 changes: 0 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
)
from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
Expand Down Expand Up @@ -776,9 +775,6 @@ def _emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
def add_config_to_report(self) -> None:
self.report.api_page_limit = self.config.api_page_limit

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.validate_connection()
self.add_config_to_report()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def __init__(self, config: SalesforceConfig, ctx: PipelineContext) -> None:
)
)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
sObjects = self.get_salesforce_objects()

for sObject in sObjects:
Expand Down
Loading

0 comments on commit 9e45dfb

Please sign in to comment.