Skip to content

Commit

Permalink
Zyp: Translate special treatments to jq-based MokshaTransformation again
Browse files Browse the repository at this point in the history
A few operations can be performed more efficiently on the whole
collection at once when using Moksha/jq.

- Flattening nested lists
- Pruning lists of objects
- Converting to objects, lists, and strings

To support those operations, a jq support library was added, including
a few helper functions written in jqlang itself. The helper functions
are:

- to_array
- to_object
- is_array_of_objects
- del_array_of_objects
- prune_array_of_objects
- prune_null
  • Loading branch information
amotl committed Sep 19, 2024
1 parent 9034076 commit cf9716e
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- MongoDB: Complete and verify BSON data type mapping end-to-end
- MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator`
- Dependencies: Make MongoDB subsystem not strictly depend on Zyp
- Zyp: Translate a few special treatments to jq-based `MokshaTransformation` again

## 2024/09/10 v0.0.15
- Added Zyp Treatments, a slightly tailored transformation subsystem
Expand Down
5 changes: 5 additions & 0 deletions doc/zyp/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
- Documentation
- CLI interface
- Apply to MongoDB Table Loader in CrateDB Toolkit
- Document `jq` functions
- `builtin.jq`: https://github.com/jqlang/jq/blob/master/src/builtin.jq
- `function.jq`

## Iteration +2
Demonstrate!
Expand All @@ -18,6 +21,8 @@ Demonstrate!
- https://github.com/MeltanoLabs/meltano-map-transform/pull/255
- https://github.com/MeltanoLabs/meltano-map-transform/issues/252
- Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response
- Is `jqpy` better than `jq`?
https://baterflyrity.github.io/jqpy/

## Iteration +3
- Moksha transformations on Buckets
Expand Down
74 changes: 37 additions & 37 deletions src/commons_codec/transform/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]:
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
data = map(self.decode_bson, data)
data = map(self.decode_value, data)
# TODO: This is currently untyped. Types are defined in Zyp, see `zyp.model.base`.
if self.transformation is not None:
Expand All @@ -59,7 +60,7 @@ def decode_document(self, data: Document) -> Document:
"""
Decode MongoDB Extended JSON, considering CrateDB specifics.
"""
return self.decode_value(data)
return self.decode_value(self.decode_bson(data))

def decode_value(self, value: t.Any) -> t.Any:
"""
Expand All @@ -82,6 +83,38 @@ def decode_value(self, value: t.Any) -> t.Any:

return value

@staticmethod
def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]:
"""
Convert MongoDB Extended JSON to vanilla Python dictionary.
https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
Example:
{
"_id": ObjectId("669683c2b0750b2c84893f3e"),
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"},
}
IN (top-level stripped):
"fullDocument": {
"_id": ObjectId("669683c2b0750b2c84893f3e"),
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"},
}
OUT:
{"_id": {"$oid": "669683c2b0750b2c84893f3e"},
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"},
}
"""
return _json_convert(item)

@staticmethod
def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any:
"""
Expand Down Expand Up @@ -154,38 +187,6 @@ def sql_ddl(self):
f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));"
)

@staticmethod
def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]:
"""
Convert MongoDB Extended JSON to vanilla Python dictionary.
https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
Example:
{
"_id": ObjectId("669683c2b0750b2c84893f3e"),
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"},
}
IN (top-level stripped):
"fullDocument": {
"_id": ObjectId("669683c2b0750b2c84893f3e"),
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"},
}
OUT:
{"_id": {"$oid": "669683c2b0750b2c84893f3e"},
"id": "5F9E",
"data": {"temperature": 42.42, "humidity": 84.84},
"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"},
}
"""
return _json_convert(item)


class MongoDBFullLoadTranslator(MongoDBTranslatorBase):
"""
Expand Down Expand Up @@ -213,8 +214,7 @@ def to_sql(self, data: t.Union[Document, t.List[Document]]) -> SQLOperation:

# Converge multiple MongoDB documents into SQL parameters for `executemany` operation.
parameters: t.List[Document] = []
for document in data:
record = self.converter.decode_document(self.decode_bson(document))
for record in self.converter.decode_documents(data):
oid: str = self.get_document_key(record)
parameters.append({"oid": oid, "record": record})

Expand Down Expand Up @@ -266,7 +266,7 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]:
if operation_type == "insert":
oid: str = self.get_document_key(event)
document = self.get_full_document(event)
record = self.converter.decode_document(self.decode_bson(document))
record = self.converter.decode_document(document)
sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);"
parameters = {"oid": oid, "record": record}

Expand All @@ -275,7 +275,7 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]:
# https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations
elif operation_type in ["update", "replace"]:
document = self.get_full_document(event)
record = self.converter.decode_document(self.decode_bson(document))
record = self.converter.decode_document(document)
where_clause = self.where_clause(event)
sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};"
parameters = {"record": record}
Expand Down
40 changes: 40 additions & 0 deletions src/zyp/function.jq
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Support functions for Zyp Transformations
# https://commons-codec.readthedocs.io/zyp/

def to_array:
# Convert element to array if it isn't an array already.
if . | type == "array" then
.
else
[.]
end;

def to_object(options):
# Wrap element into object with given key if it isn't an object already.
if . | type == "object" then
.
else
{(options.key): .}
end;

def is_array_of_objects:
# Check if element is an array containing objects.
if (. | type == "array") and (.[0] | type == "object") then
true
else
false
end;

def del_array_of_objects:
# If element is an array containing objects, delete it.
if is_array_of_objects then
del(.)
end;

def prune_array_of_objects:
# Recursively drop arrays of objects.
walk(del_array_of_objects);

def prune_null:
# Recursively delete `null` values.
walk(values);
4 changes: 3 additions & 1 deletion src/zyp/model/moksha.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def evaluate(self, data: DictOrList) -> DictOrList:
if isinstance(self.transformer, jmespath.parser.ParsedResult):
return self.transformer.search(data, options=jmespath.Options(dict_cls=collections.OrderedDict))
elif isinstance(self.transformer, jq._Program):
return self.transformer.input_value(data).first()
if isinstance(data, map):
data = list(data)
return self.transformer.transform(data)
elif isinstance(self.transformer, transon.Transformer):
return self.transformer.transform(data)
else:
Expand Down
9 changes: 8 additions & 1 deletion src/zyp/util/expression.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import importlib
import importlib.resources
import typing as t

import jmespath
Expand All @@ -6,12 +8,17 @@

from zyp.model.bucket import MokshaTransformer, TransonTemplate

# TODO: Is there a better way to configure jq using a custom search path
# instead of injecting the `include` statement each time again?
jq_functions_path = importlib.resources.files("src") / "zyp"
jq_functions_import = f'include "function" {{"search": "{jq_functions_path}"}};'


def compile_expression(type: str, expression: t.Union[str, TransonTemplate]) -> MokshaTransformer: # noqa: A002
if type == "jmes":
return jmespath.compile(expression)
elif type == "jq":
return jq.compile(expression)
return jq.compile(f"{jq_functions_import} {expression}")
elif type == "transon":
return transon.Transformer(expression)
else:
Expand Down
25 changes: 25 additions & 0 deletions tests/transform/mongodb/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,28 @@
"uuid": "73ffd264-44b3-4c69-90e8-e7d1dfc035d4",
},
}


RECORD_IN_ANOMALIES = {
"_id": {
"$oid": "56027fcae4b09385a85f9344",
},
"python": {
"list_of_nested_list": [1, [2, 3], 4],
"list_of_objects": [{}],
"to_dict": 123,
"to_list": 123,
"to_string": 123,
},
}

RECORD_OUT_ANOMALIES = {
"_id": "56027fcae4b09385a85f9344",
"python": {
"list_of_nested_list": [1, 2, 3, 4],
"list_of_objects": None,
"to_dict": {"id": 123},
"to_list": [123],
"to_string": "123",
},
}
65 changes: 54 additions & 11 deletions tests/transform/mongodb/test_mongodb_full.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,81 @@
# ruff: noqa: E402
from copy import deepcopy

import pytest

from zyp.model.collection import CollectionTransformation
from zyp.model.moksha import MokshaTransformation

pytestmark = pytest.mark.mongodb

from commons_codec.model import SQLOperation
from commons_codec.transform.mongodb import MongoDBFullLoadTranslator
from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES
from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator
from tests.transform.mongodb.data import (
RECORD_IN_ALL_TYPES,
RECORD_IN_ANOMALIES,
RECORD_OUT_ALL_TYPES,
RECORD_OUT_ANOMALIES,
)

testdata = [
(RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES, "all-types"),
(RECORD_IN_ANOMALIES, RECORD_OUT_ANOMALIES, "anomalies"),
]
testdata_ids = [
"all-types",
"anomalies",
]


def test_sql_ddl():
translator = MongoDBFullLoadTranslator(table_name="foo")
assert translator.sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));"


def test_to_sql_operation():
def make_translator(kind: str) -> MongoDBFullLoadTranslator:
transformation = None
if kind == "anomalies":
transformation = CollectionTransformation(
pre=MokshaTransformation()
.jq(".[] |= (.python.list_of_nested_list |= flatten)")
.jq(".[] |= (.python.list_of_objects |= prune_array_of_objects)")
.jq('.[] |= (.python.to_dict |= to_object({"key": "id"}))')
.jq(".[] |= (.python.to_list |= to_array)")
.jq(".[] |= (.python.to_string |= tostring)")
)
converter = MongoDBCrateDBConverter(transformation=transformation)
translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=converter)
return translator


@pytest.mark.parametrize("data_in, data_out, kind", testdata, ids=testdata_ids)
def test_to_sql_operation(data_in, data_out, kind):
"""
Verify outcome of `MongoDBFullLoadTranslator.to_sql` operation.
"""
translator = MongoDBFullLoadTranslator(table_name="foo")
assert translator.to_sql([RECORD_IN_ALL_TYPES]) == SQLOperation(
statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);",
parameters=[{"oid": "56027fcae4b09385a85f9344", "record": RECORD_OUT_ALL_TYPES}],
# Create translator component.
translator = make_translator(kind)

# Compute CrateDB operation (SQL+parameters) from MongoDB document.
operation = translator.to_sql(deepcopy([data_in]))
assert operation == SQLOperation(
statement='INSERT INTO "from".mongodb (oid, data) VALUES (:oid, :record);',
parameters=[{"oid": "56027fcae4b09385a85f9344", "record": data_out}],
)


@pytest.mark.integration
def test_to_sql_cratedb(caplog, cratedb):
@pytest.mark.parametrize("data_in, data_out, kind", testdata, ids=testdata_ids)
def test_to_sql_cratedb(caplog, cratedb, data_in, data_out, kind):
"""
Verify writing converted MongoDB document to CrateDB.
"""

# Create translator component.
translator = make_translator(kind)

# Compute CrateDB operation (SQL+parameters) from MongoDB document.
translator = MongoDBFullLoadTranslator(table_name="from.mongodb")
operation = translator.to_sql(RECORD_IN_ALL_TYPES)
operation = translator.to_sql(deepcopy(data_in))

# Insert into CrateDB.
cratedb.database.run_sql(translator.sql_ddl)
Expand All @@ -44,4 +87,4 @@ def test_to_sql_cratedb(caplog, cratedb):
assert cratedb.database.count_records("from.mongodb") == 1

results = cratedb.database.run_sql('SELECT * FROM "from".mongodb;', records=True) # noqa: S608
assert results[0]["data"] == RECORD_OUT_ALL_TYPES
assert results[0]["data"] == data_out
Loading

0 comments on commit cf9716e

Please sign in to comment.