Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
TonyOuyangGit committed Oct 27, 2023
1 parent 09b45f5 commit 3c811e2
Show file tree
Hide file tree
Showing 2 changed files with 4,132 additions and 4,196 deletions.
68 changes: 22 additions & 46 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from pymongo.mongo_client import MongoClient

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -72,7 +74,9 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase):
class MongoDBConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
# See the MongoDB authentication docs for details and examples.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: str = Field(
Expand Down Expand Up @@ -103,10 +107,6 @@ class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngesti
# mongodb only supports 16MB as max size for documents. However, if we try to retrieve a larger document it
# errors out with "16793600" as the maximum size supported.
maxDocumentSize: Optional[PositiveInt] = Field(default=16793600, description="")
ingest_data_platform_instance_aspect: Optional[bool] = Field(
default=False,
description="Option to enable/disable ingestion of the data platform instance aspect and if platform instance is included in a dataset's urn.",
)

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
Expand Down Expand Up @@ -224,11 +224,8 @@ def construct_schema_pymongo(
@platform_name("MongoDB")
@config_class(MongoDBConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@capability(
SourceCapability.PLATFORM_INSTANCE,
"Disabled by default",
)
@dataclass
class MongoDBSource(StatefulIngestionSourceBase):
"""
Expand Down Expand Up @@ -337,8 +334,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
platform = "mongodb"

database_names: List[str] = self.mongo_client.list_database_names()
# Only ingest the DPI aspect if the flag ingest_data_platform_instance_aspect is true
data_platform_instance = self._create_data_platform_instance_aspect(platform)

# traverse databases in sorted order so output is consistent
for database_name in sorted(database_names):
Expand All @@ -359,7 +354,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.report_dropped(dataset_name)
continue

dataset_urn = self._create_dataset_urn(platform, dataset_name)
dataset_urn = make_dataset_urn_with_platform_instance(
platform=platform,
name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance,
)

if self.config.platform_instance:
data_platform_instance = DataPlatformInstanceClass(
platform=make_data_platform_urn(platform),
instance=make_dataplatform_instance_urn(
platform, self.config.platform_instance
),
)

dataset_properties = DatasetPropertiesClass(
tags=[],
Expand Down Expand Up @@ -473,35 +481,3 @@ def get_report(self) -> MongoDBSourceReport:
def close(self):
self.mongo_client.close()
super().close()

def _create_dataset_urn(self, platform: str, dataset_name: str) -> str:
if (
self.config.ingest_data_platform_instance_aspect
and self.config.platform_instance
):
dataset_urn = make_dataset_urn_with_platform_instance(
platform=platform,
platform_instance=self.config.platform_instance,
name=dataset_name,
)
else:
dataset_urn = make_dataset_urn(
platform=platform,
name=dataset_name,
)
return dataset_urn

def _create_data_platform_instance_aspect(
self, platform: str
) -> Optional[DataPlatformInstanceClass]:
if (
self.config.ingest_data_platform_instance_aspect
and self.config.platform_instance
):
return DataPlatformInstanceClass(
platform=make_data_platform_urn(platform),
instance=make_dataplatform_instance_urn(
platform, self.config.platform_instance
),
)
return None
Loading

0 comments on commit 3c811e2

Please sign in to comment.