Skip to content

Commit

Permalink
feat(ingest/mongodb): support stateful ingestion (#9118)
Browse files Browse the repository at this point in the history
  • Loading branch information
TonyOuyangGit authored Nov 1, 2023
1 parent 55f1453 commit 876de21
Show file tree
Hide file tree
Showing 2 changed files with 4,270 additions and 4,124 deletions.
74 changes: 56 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
Expand All @@ -25,14 +30,21 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.schema_inference.object import (
SchemaDescription,
construct_schema,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulIngestionConfigBase,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
Expand All @@ -48,7 +60,10 @@
TimeTypeClass,
UnionTypeClass,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
)

logger = logging.getLogger(__name__)

Expand All @@ -59,7 +74,9 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
class MongoDBConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
# See the MongoDB authentication docs for details and examples.
# https://pymongo.readthedocs.io/en/stable/examples/authentication.html
connect_uri: str = Field(
Expand Down Expand Up @@ -99,6 +116,8 @@ class MongoDBConfig(PlatformInstanceConfigMixin, EnvConfigMixin):
default=AllowDenyPattern.allow_all(),
description="regex patterns for collections to filter in ingestion.",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("maxDocumentSize")
def check_max_doc_size_filter_is_valid(cls, doc_size_filter_value):
Expand All @@ -108,7 +127,7 @@ def check_max_doc_size_filter_is_valid(cls, doc_size_filter_value):


@dataclass
class MongoDBSourceReport(SourceReport):
class MongoDBSourceReport(StaleEntityRemovalSourceReport):
filtered: List[str] = field(default_factory=list)

def report_dropped(self, name: str) -> None:
Expand All @@ -129,6 +148,7 @@ def report_dropped(self, name: str) -> None:
bson.timestamp.Timestamp: "timestamp",
bson.dbref.DBRef: "dbref",
bson.objectid.ObjectId: "oid",
bson.Decimal128: "numberDecimal",
"mixed": "mixed",
}

Expand All @@ -145,6 +165,7 @@ def report_dropped(self, name: str) -> None:
bson.timestamp.Timestamp: TimeTypeClass,
bson.dbref.DBRef: BytesTypeClass,
bson.objectid.ObjectId: BytesTypeClass,
bson.Decimal128: NumberTypeClass,
dict: RecordTypeClass,
"mixed": UnionTypeClass,
}
Expand Down Expand Up @@ -206,7 +227,7 @@ def construct_schema_pymongo(
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@dataclass
class MongoDBSource(Source):
class MongoDBSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:
Expand All @@ -227,7 +248,7 @@ class MongoDBSource(Source):
mongo_client: MongoClient

def __init__(self, ctx: PipelineContext, config: MongoDBConfig):
super().__init__(ctx)
super().__init__(config, ctx)
self.config = config
self.report = MongoDBSourceReport()

Expand All @@ -254,6 +275,14 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "MongoDBSource":
config = MongoDBConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]

def get_pymongo_type_string(
self, field_type: Union[Type, str], collection_name: str
) -> str:
Expand Down Expand Up @@ -332,16 +361,18 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
platform_instance=self.config.platform_instance,
)

dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[],
)
if self.config.platform_instance:
data_platform_instance = DataPlatformInstanceClass(
platform=make_data_platform_urn(platform),
instance=make_dataplatform_instance_urn(
platform, self.config.platform_instance
),
)

dataset_properties = DatasetPropertiesClass(
tags=[],
customProperties={},
)
dataset_snapshot.aspects.append(dataset_properties)

if self.config.enableSchemaInference:
assert self.config.maxDocumentSize is not None
Expand Down Expand Up @@ -412,13 +443,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
fields=canonical_schema,
)

dataset_snapshot.aspects.append(schema_metadata)

# TODO: use list_indexes() or index_information() to get index information
# See https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.list_indexes.

mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
yield MetadataWorkUnit(id=dataset_name, mce=mce)
yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
entityUrn=dataset_urn,
aspects=[
schema_metadata,
dataset_properties,
data_platform_instance,
],
)
]

def is_server_version_gte_4_4(self) -> bool:
try:
Expand Down
Loading

0 comments on commit 876de21

Please sign in to comment.