Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/mongodb): support AWS DocumentDB for MongoDB #9201

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple, Type, Union, ValuesView

import bson.timestamp
Expand Down Expand Up @@ -74,6 +75,12 @@
DENY_DATABASE_LIST = set(["admin", "config", "local"])


class HostingEnvironment(Enum):
SELF_HOSTED = "SELF_HOSTED"
ATLAS = "ATLAS"
AWS_DOCUMENTDB = "AWS_DOCUMENTDB"


class MongoDBConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, StatefulIngestionConfigBase
):
Expand Down Expand Up @@ -108,6 +115,11 @@ class MongoDBConfig(
# errors out with "16793600" as the maximum size supported.
maxDocumentSize: Optional[PositiveInt] = Field(default=16793600, description="")

hostingEnvironment: Optional[HostingEnvironment] = Field(
default=HostingEnvironment.SELF_HOSTED,
description="Hosting environment of MongoDB, default is SELF_HOSTED, currently support `SELF_HOSTED`, `ATLAS`, `AWS_DOCUMENTDB`",
)

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="regex patterns for databases to filter in ingestion.",
Expand Down Expand Up @@ -176,7 +188,7 @@ def construct_schema_pymongo(
delimiter: str,
use_random_sampling: bool,
max_document_size: int,
is_version_gte_4_4: bool,
should_add_document_size_filter: bool,
sample_size: Optional[int] = None,
) -> Dict[Tuple[str, ...], SchemaDescription]:
"""
Expand All @@ -191,15 +203,19 @@ def construct_schema_pymongo(
the PyMongo collection
delimiter:
string to concatenate field names by
use_random_sampling:
boolean to indicate if random sampling should be added to aggregation
max_document_size:
maximum size of the document that will be considered for generating the schema.
should_add_document_size_filter:
boolean to indicate if document size filter should be added to aggregation
sample_size:
number of items in the collection to sample
(reads entire collection if not provided)
max_document_size:
maximum size of the document that will be considered for generating the schema.
"""

aggregations: List[Dict] = []
if is_version_gte_4_4:
if should_add_document_size_filter:
doc_size_field = "temporary_doc_size_field"
# create a temporary field to store the size of the document. filter on it and then remove it.
aggregations = [
Expand Down Expand Up @@ -381,7 +397,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
delimiter=".",
use_random_sampling=self.config.useRandomSampling,
max_document_size=self.config.maxDocumentSize,
is_version_gte_4_4=self.is_server_version_gte_4_4(),
should_add_document_size_filter=self.should_add_document_size_filter(),
sample_size=self.config.schemaSamplingSize,
)

Expand Down Expand Up @@ -475,6 +491,18 @@ def is_server_version_gte_4_4(self) -> bool:

return False

def is_hosted_on_aws_documentdb(self) -> bool:
return self.config.hostingEnvironment == HostingEnvironment.AWS_DOCUMENTDB

def should_add_document_size_filter(self) -> bool:
# the operation $bsonsize is only available in server version greater than 4.4
# and is not supported by AWS DocumentDB, we should only add this operation to
# aggregation for mongodb that doesn't run on AWS DocumentDB and version is greater than 4.4
# https://docs.aws.amazon.com/documentdb/latest/developerguide/mongo-apis.html
return (
self.is_server_version_gte_4_4() and not self.is_hosted_on_aws_documentdb()
)

def get_report(self) -> MongoDBSourceReport:
return self.report

Expand Down
Loading