diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index ce2b9ce2981e0..2aa8b1d37d477 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -1,5 +1,6 @@ import logging from dataclasses import dataclass, field +from enum import Enum from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView import bson.timestamp @@ -74,6 +75,12 @@ DENY_DATABASE_LIST = set(["admin", "config", "local"]) +class HostingEnvironment(Enum): + SELF_HOSTED = "SELF_HOSTED" + ATLAS = "ATLAS" + AWS_DOCUMENTDB = "AWS_DOCUMENTDB" + + class MongoDBConfig( PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase ): @@ -108,6 +115,11 @@ class MongoDBConfig( # errors out with "16793600" as the maximum size supported. maxDocumentSize: Optional[PositiveInt] = Field(default=16793600, description="") + hostingEnvironment: Optional[HostingEnvironment] = Field( + default=HostingEnvironment.SELF_HOSTED, + description="Hosting environment of MongoDB, default is SELF_HOSTED, currently support `SELF_HOSTED`, `ATLAS`, `AWS_DOCUMENTDB`", + ) + database_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description="regex patterns for databases to filter in ingestion.", @@ -176,7 +188,7 @@ def construct_schema_pymongo( delimiter: str, use_random_sampling: bool, max_document_size: int, - is_version_gte_4_4: bool, + should_add_document_size_filter: bool, sample_size: Optional[int] = None, ) -> Dict[Tuple[str, ...], SchemaDescription]: """ @@ -191,15 +203,19 @@ def construct_schema_pymongo( the PyMongo collection delimiter: string to concatenate field names by + use_random_sampling: + boolean to indicate if random sampling should be added to aggregation + max_document_size: + maximum size of the document that will be considered for generating the schema. + should_add_document_size_filter: + boolean to indicate if document size filter should be added to aggregation sample_size: number of items in the collection to sample (reads entire collection if not provided) - max_document_size: - maximum size of the document that will be considered for generating the schema. """ aggregations: List[Dict] = [] - if is_version_gte_4_4: + if should_add_document_size_filter: doc_size_field = "temporary_doc_size_field" # create a temporary field to store the size of the document. filter on it and then remove it. aggregations = [ @@ -381,7 +397,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: delimiter=".", use_random_sampling=self.config.useRandomSampling, max_document_size=self.config.maxDocumentSize, - is_version_gte_4_4=self.is_server_version_gte_4_4(), + should_add_document_size_filter=self.should_add_document_size_filter(), sample_size=self.config.schemaSamplingSize, ) @@ -475,6 +491,18 @@ def is_server_version_gte_4_4(self) -> bool: return False + def is_hosted_on_aws_documentdb(self) -> bool: + return self.config.hostingEnvironment == HostingEnvironment.AWS_DOCUMENTDB + + def should_add_document_size_filter(self) -> bool: + # the operation $bsonsize is only available in server version greater than 4.4 + # and is not supported by AWS DocumentDB, we should only add this operation to + # aggregation for mongodb that doesn't run on AWS DocumentDB and version is greater than 4.4 + # https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html + return ( + self.is_server_version_gte_4_4() and not self.is_hosted_on_aws_documentdb() + ) + def get_report(self) -> MongoDBSourceReport: return self.report