Skip to content

Commit

Permalink
MongoDB: Improve MongoDBCrateDBConverter.decode_extended_json
Browse files Browse the repository at this point in the history
MongoDBCrateDBConverter now accepts three new options:

- timestamp_to_epoch
- timestamp_to_iso8601
- timestamp_use_milliseconds
  • Loading branch information
amotl committed Sep 24, 2024
1 parent f97c040 commit 5370381
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Zyp/Moksha/jq: `to_object` function now respects a `zap` option, that
removes the element altogether if it's empty
- Zyp/Moksha/jq: Improve error reporting at `MokshaTransformation.apply`
- MongoDB: Improved `MongoDBCrateDBConverter.decode_extended_json`

## 2024/09/22 v0.0.17
- MongoDB: Fixed edge case when decoding MongoDB Extended JSON elements
Expand Down
101 changes: 71 additions & 30 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datetime as dt
import logging
import typing as t
from functools import cached_property
from typing import Iterable

import bson
Expand All @@ -24,18 +25,6 @@
logger = logging.getLogger(__name__)


def date_converter(value):
if isinstance(value, int):
return value
elif isinstance(value, (str, bytes)):
datetime = dateparser.parse(value)
elif isinstance(value, dt.datetime):
datetime = value
else:
raise ValueError(f"Unable to convert datetime value: {value}")
return calendar.timegm(datetime.utctimetuple()) * 1000


@define
class MongoDBCrateDBConverter:
"""
Expand All @@ -44,6 +33,9 @@ class MongoDBCrateDBConverter:
Extracted from cratedb-toolkit, earlier migr8.
"""

timestamp_to_epoch: bool = False
timestamp_to_iso8601: bool = False
timestamp_use_milliseconds: bool = False
transformation: t.Any = None

def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]:
Expand Down Expand Up @@ -73,7 +65,7 @@ def decode_value(self, value: t.Any) -> t.Any:
if isinstance(value, dict):
# Decode item in BSON CANONICAL format.
if len(value) == 1 and next(iter(value)).startswith("$"):
return self.decode_canonical(value)
return self.decode_extended_json(value)

# Custom adjustments to compensate shape anomalies in source data.
# TODO: Review if it can be removed or refactored.
Expand Down Expand Up @@ -111,33 +103,82 @@ def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]:
"""
return _json_convert(item)

@staticmethod
def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any:
def decode_extended_json(self, value: t.Dict[str, t.Any]) -> t.Any:
"""
Decode MongoDB Extended JSON CANONICAL representation.
Decode MongoDB Extended JSON representation, canonical and legacy variants.
"""
type_ = list(value.keys())[0]

out: t.Any

# Special handling for datetime representation in NUMBERLONG format (emulated depth-first).
type_ = next(iter(value)) # Get key of first item in dictionary.
is_date_numberlong = type_ == "$date" and "$numberLong" in value["$date"]
if is_date_numberlong:
return int(object_hook(value["$date"]))
out = dt.datetime.fromtimestamp(int(value["$date"]["$numberLong"]) / 1000, tz=dt.timezone.utc)
else:
value = object_hook(value)
is_bson = type(value).__module__.startswith("bson")

out: t.Any = value
if isinstance(value, bson.Binary) and value.subtype == bson.UUID_SUBTYPE:
out = value.as_uuid()
elif isinstance(value, bson.Binary):
out = base64.b64encode(value).decode()
elif isinstance(value, bson.Timestamp):
out = value.as_datetime()
out = object_hook(value)

is_bson = isinstance(out, self.all_bson_types)

# Decode BSON types.
if isinstance(out, bson.Binary) and out.subtype == bson.UUID_SUBTYPE:
out = out.as_uuid()
elif isinstance(out, bson.Binary):
out = base64.b64encode(out).decode()
elif isinstance(out, bson.Timestamp):
out = out.as_datetime()

# Decode Python types.
if isinstance(out, dt.datetime):
return date_converter(out)
if self.timestamp_to_epoch:
out = self.convert_epoch(out)
if self.timestamp_use_milliseconds:
out *= 1000
return out
elif self.timestamp_to_iso8601:
return self.convert_iso8601(out)

# Wrap up decoded BSON types as strings.
if is_bson:
return str(out)

# Return others converted as-is.
return out

@cached_property
def all_bson_types(self) -> t.Tuple[t.Type, ...]:
_types: t.List[t.Type] = []
for _typ in bson._ENCODERS:
if hasattr(_typ, "_type_marker"):
_types.append(_typ)
return tuple(_types)

@staticmethod
def convert_epoch(value: t.Any) -> float:
if isinstance(value, int):
return value
elif isinstance(value, dt.datetime):
datetime = value
elif isinstance(value, (str, bytes)):
datetime = dateparser.parse(value)
else:
raise ValueError(f"Unable to convert datetime value: {value}")
return calendar.timegm(datetime.utctimetuple())

@staticmethod
def convert_iso8601(value: t.Any) -> str:
if isinstance(value, str):
return value
elif isinstance(value, dt.datetime):
datetime = value
elif isinstance(value, bytes):
return value.decode("utf-8")
elif isinstance(value, int):
datetime = dt.datetime.fromtimestamp(value, tz=dt.timezone.utc)
else:
raise ValueError(f"Unable to convert datetime value: {value}")
return datetime.isoformat()

def apply_special_treatments(self, value: t.Any):
"""
Apply special treatments to value that can't be described otherwise up until now.
Expand Down Expand Up @@ -176,7 +217,7 @@ class MongoDBTranslatorBase:

def __init__(self, table_name: str, converter: t.Union[MongoDBCrateDBConverter, None] = None):
self.table_name = quote_relation_name(table_name)
self.converter = converter or MongoDBCrateDBConverter()
self.converter = converter or MongoDBCrateDBConverter(timestamp_to_epoch=True, timestamp_use_milliseconds=True)

@property
def sql_ddl(self):
Expand Down
131 changes: 119 additions & 12 deletions tests/transform/mongodb/test_mongodb_convert.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,67 @@
# ruff: noqa: E402
import datetime as dt
import typing as t

import pytest
from attrs import define

pytestmark = pytest.mark.mongodb

from commons_codec.transform.mongodb import MongoDBCrateDBConverter, date_converter
from commons_codec.transform.mongodb import MongoDBCrateDBConverter
from zyp.model.bucket import BucketTransformation, ValueConverter
from zyp.model.collection import CollectionTransformation
from zyp.model.treatment import Treatment

convert_epoch = MongoDBCrateDBConverter.convert_epoch
convert_iso8601 = MongoDBCrateDBConverter.convert_iso8601


def test_date_converter_int():
def test_epoch_ms_converter_int():
"""
Datetime values encoded as integer values will be returned unmodified.
"""
assert date_converter(1443004362000) == 1443004362000
assert convert_epoch(1443004362) == 1443004362
assert convert_epoch(1443004362987) == 1443004362987


def test_date_converter_iso8601():
def test_epoch_ms_converter_iso8601():
"""
Datetime values encoded as ISO8601 values will be parsed.
"""
assert date_converter("2015-09-23T10:32:42.33Z") == 1443004362000
assert date_converter(b"2015-09-23T10:32:42.33Z") == 1443004362000
assert convert_epoch("2015-09-23T10:32:42.33Z") == 1443004362
assert convert_epoch(b"2015-09-23T10:32:42.33Z") == 1443004362


def test_date_converter_invalid():
def test_epoch_ms_converter_invalid():
"""
Incorrect datetime values will not be parsed.
"""
with pytest.raises(ValueError) as ex:
date_converter(None)
convert_epoch(None)
assert ex.match("Unable to convert datetime value: None")


def test_iso8601_converter_int():
"""
Datetime values encoded as integer values will be returned unmodified.
"""
assert convert_iso8601(1443004362) == "2015-09-23T10:32:42+00:00"


def test_iso8601_converter_iso8601():
"""
Datetime values encoded as ISO8601 values will be parsed.
"""
assert convert_iso8601("2015-09-23T10:32:42.33Z") == "2015-09-23T10:32:42.33Z"
assert convert_iso8601(b"2015-09-23T10:32:42.33Z") == "2015-09-23T10:32:42.33Z"


def test_iso8601_converter_invalid():
"""
Incorrect datetime values will not be parsed.
"""
with pytest.raises(ValueError) as ex:
convert_iso8601(None)
assert ex.match("Unable to convert datetime value: None")


Expand All @@ -56,6 +88,65 @@ def test_convert_basic():
assert list(converter.decode_documents([data_in])) == [data_out]


@define
class DateConversionCase:
converter: t.Callable
data_in: t.Any
data_out: t.Any


testdata = [
DateConversionCase(
converter=MongoDBCrateDBConverter(),
data_in={"$date": "2015-09-23T10:32:42.123456Z"},
data_out=dt.datetime(2015, 9, 23, 10, 32, 42, 123456),
),
DateConversionCase(
converter=MongoDBCrateDBConverter(),
data_in={"$date": {"$numberLong": "1655210544987"}},
data_out=dt.datetime(2022, 6, 14, 12, 42, 24, 987000, tzinfo=dt.timezone.utc),
),
DateConversionCase(
converter=MongoDBCrateDBConverter(timestamp_to_epoch=True, timestamp_use_milliseconds=True),
data_in={"$date": "2015-09-23T10:32:42.123456Z"},
data_out=1443004362000,
),
DateConversionCase(
converter=MongoDBCrateDBConverter(timestamp_to_epoch=True, timestamp_use_milliseconds=True),
data_in={"$date": {"$numberLong": "1655210544987"}},
data_out=1655210544000,
),
DateConversionCase(
converter=MongoDBCrateDBConverter(timestamp_to_iso8601=True),
data_in={"$date": "2015-09-23T10:32:42.123456Z"},
data_out="2015-09-23T10:32:42.123456",
),
DateConversionCase(
converter=MongoDBCrateDBConverter(timestamp_to_iso8601=True),
data_in={"$date": {"$numberLong": "1655210544987"}},
data_out="2022-06-14T12:42:24.987000+00:00",
),
]


testdata_ids = [
"vanilla-$date-canonical",
"vanilla-$date-legacy",
"epochms-$date-canonical",
"epochms-$date-legacy",
"iso8601-$date-canonical",
"iso8601-$date-legacy",
]


@pytest.mark.parametrize("testcase", testdata, ids=testdata_ids)
def test_convert_timestamp_many(testcase: DateConversionCase):
"""
Verify converting timestamps using different modifiers.
"""
assert testcase.converter.decode_document(testcase.data_in) == testcase.data_out


def test_convert_with_treatment_ignore_complex_lists():
"""
The `ignore_complex_lists` treatment ignores lists of dictionaries, often having deviating substructures.
Expand Down Expand Up @@ -83,7 +174,11 @@ def test_convert_with_treatment_ignore_complex_lists():

treatment = Treatment(ignore_complex_lists=True)
transformation = CollectionTransformation(treatment=treatment)
converter = MongoDBCrateDBConverter(transformation=transformation)
converter = MongoDBCrateDBConverter(
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)
assert converter.decode_document(data_in) == data_out


Expand Down Expand Up @@ -119,7 +214,11 @@ def test_convert_with_treatment_normalize_complex_lists():

treatment = Treatment(normalize_complex_lists=True)
transformation = CollectionTransformation(treatment=treatment)
converter = MongoDBCrateDBConverter(transformation=transformation)
converter = MongoDBCrateDBConverter(
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)
assert converter.decode_document(data_in) == data_out


Expand Down Expand Up @@ -165,7 +264,11 @@ def test_convert_with_treatment_all_options():
],
)
transformation = CollectionTransformation(treatment=treatment)
converter = MongoDBCrateDBConverter(transformation=transformation)
converter = MongoDBCrateDBConverter(
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)
assert converter.decode_document(data_in) == data_out


Expand All @@ -180,6 +283,10 @@ def test_convert_transform_timestamp():
values=ValueConverter().add(pointer="/timestamp", transformer="to_unixtime"),
)
transformation = CollectionTransformation(bucket=bucket_transformation)
converter = MongoDBCrateDBConverter(transformation=transformation)
converter = MongoDBCrateDBConverter(
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)
data = converter.decode_documents(data_in)
assert data == data_out
6 changes: 5 additions & 1 deletion tests/transform/mongodb/test_mongodb_full.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def make_translator(kind: str) -> MongoDBFullLoadTranslator:
.jq(".[] |= (.python.to_list |= to_array)")
.jq(".[] |= (.python.to_string |= tostring)")
)
converter = MongoDBCrateDBConverter(transformation=transformation)
converter = MongoDBCrateDBConverter(
timestamp_to_epoch=True,
timestamp_use_milliseconds=True,
transformation=transformation,
)
translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=converter)
return translator

Expand Down

0 comments on commit 5370381

Please sign in to comment.