diff --git a/CHANGES.md b/CHANGES.md index b8d368d..1e6ff98 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,7 @@ - MongoDB: Complete and verify BSON data type mapping end-to-end - MongoDB: Use improved decoding machinery also for `MongoDBCDCTranslator` - Dependencies: Make MongoDB subsystem not strictly depend on Zyp +- Zyp: Translate a few special treatments to jq-based `MokshaTransformation` again ## 2024/09/10 v0.0.15 - Added Zyp Treatments, a slightly tailored transformation subsystem diff --git a/doc/zyp/backlog.md b/doc/zyp/backlog.md index dda82f1..d3c7d0b 100644 --- a/doc/zyp/backlog.md +++ b/doc/zyp/backlog.md @@ -5,6 +5,9 @@ - Documentation - CLI interface - Apply to MongoDB Table Loader in CrateDB Toolkit +- Document `jq` functions + - `builtin.jq`: https://github.com/jqlang/jq/blob/master/src/builtin.jq + - `function.jq` ## Iteration +2 Demonstrate! @@ -18,6 +21,8 @@ Demonstrate! - https://github.com/MeltanoLabs/meltano-map-transform/pull/255 - https://github.com/MeltanoLabs/meltano-map-transform/issues/252 - Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response +- Is `jqpy` better than `jq`? + https://baterflyrity.github.io/jqpy/ ## Iteration +3 - Moksha transformations on Buckets diff --git a/src/commons_codec/transform/mongodb.py b/src/commons_codec/transform/mongodb.py index e7cf450..3f19e85 100644 --- a/src/commons_codec/transform/mongodb.py +++ b/src/commons_codec/transform/mongodb.py @@ -49,6 +49,7 @@ def decode_documents(self, data: t.Iterable[Document]) -> Iterable[Document]: """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ + data = map(self.decode_bson, data) data = map(self.decode_value, data) # TODO: This is currently untyped. Types are defined in Zyp, see `zyp.model.base`. if self.transformation is not None: @@ -59,7 +60,7 @@ def decode_document(self, data: Document) -> Document: """ Decode MongoDB Extended JSON, considering CrateDB specifics. """ - return self.decode_value(data) + return self.decode_value(self.decode_bson(data)) def decode_value(self, value: t.Any) -> t.Any: """ @@ -82,6 +83,38 @@ def decode_value(self, value: t.Any) -> t.Any: return value + @staticmethod + def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]: + """ + Convert MongoDB Extended JSON to vanilla Python dictionary. + + https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ + + Example: + { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, + } + + IN (top-level stripped): + "fullDocument": { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, + } + + OUT: + {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + } + """ + return _json_convert(item) + @staticmethod def decode_canonical(value: t.Dict[str, t.Any]) -> t.Any: """ @@ -154,38 +187,6 @@ def sql_ddl(self): f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" ) - @staticmethod - def decode_bson(item: t.Mapping[str, t.Any]) -> t.Mapping[str, t.Any]: - """ - Convert MongoDB Extended JSON to vanilla Python dictionary. - - https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ - - Example: - { - "_id": ObjectId("669683c2b0750b2c84893f3e"), - "id": "5F9E", - "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, - } - - IN (top-level stripped): - "fullDocument": { - "_id": ObjectId("669683c2b0750b2c84893f3e"), - "id": "5F9E", - "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, - } - - OUT: - {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, - "id": "5F9E", - "data": {"temperature": 42.42, "humidity": 84.84}, - "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, - } - """ - return _json_convert(item) - class MongoDBFullLoadTranslator(MongoDBTranslatorBase): """ @@ -213,8 +214,7 @@ def to_sql(self, data: t.Union[Document, t.List[Document]]) -> SQLOperation: # Converge multiple MongoDB documents into SQL parameters for `executemany` operation. parameters: t.List[Document] = [] - for document in data: - record = self.converter.decode_document(self.decode_bson(document)) + for record in self.converter.decode_documents(data): oid: str = self.get_document_key(record) parameters.append({"oid": oid, "record": record}) @@ -266,7 +266,7 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: if operation_type == "insert": oid: str = self.get_document_key(event) document = self.get_full_document(event) - record = self.converter.decode_document(self.decode_bson(document)) + record = self.converter.decode_document(document) sql = f"INSERT INTO {self.table_name} " f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " "VALUES (:oid, :record);" parameters = {"oid": oid, "record": record} @@ -275,7 +275,7 @@ def to_sql(self, event: t.Dict[str, t.Any]) -> t.Union[SQLOperation, None]: # https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations elif operation_type in ["update", "replace"]: document = self.get_full_document(event) - record = self.converter.decode_document(self.decode_bson(document)) + record = self.converter.decode_document(document) where_clause = self.where_clause(event) sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = :record " f"WHERE {where_clause};" parameters = {"record": record} diff --git a/src/zyp/function.jq b/src/zyp/function.jq new file mode 100644 index 0000000..c6a7342 --- /dev/null +++ b/src/zyp/function.jq @@ -0,0 +1,40 @@ +# Support functions for Zyp Transformations +# https://commons-codec.readthedocs.io/zyp/ + +def to_array: + # Convert element to array if it isn't an array already. + if . | type == "array" then + . + else + [.] + end; + +def to_object(options): + # Wrap element into object with given key if it isn't an object already. + if . | type == "object" then + . + else + {(options.key): .} + end; + +def is_array_of_objects: + # Check if element is an array containing objects. + if (. | type == "array") and (.[0] | type == "object") then + true + else + false + end; + +def del_array_of_objects: + # If element is an array containing objects, delete it. + if is_array_of_objects then + del(.) + end; + +def prune_array_of_objects: + # Recursively drop arrays of objects. + walk(del_array_of_objects); + +def prune_null: + # Recursively delete `null` values. + walk(values); diff --git a/src/zyp/model/moksha.py b/src/zyp/model/moksha.py index 97c7637..6cb638a 100644 --- a/src/zyp/model/moksha.py +++ b/src/zyp/model/moksha.py @@ -30,7 +30,9 @@ def evaluate(self, data: DictOrList) -> DictOrList: if isinstance(self.transformer, jmespath.parser.ParsedResult): return self.transformer.search(data, options=jmespath.Options(dict_cls=collections.OrderedDict)) elif isinstance(self.transformer, jq._Program): - return self.transformer.input_value(data).first() + if isinstance(data, map): + data = list(data) + return self.transformer.transform(data) elif isinstance(self.transformer, transon.Transformer): return self.transformer.transform(data) else: diff --git a/src/zyp/util/expression.py b/src/zyp/util/expression.py index af00231..8d3c0ce 100644 --- a/src/zyp/util/expression.py +++ b/src/zyp/util/expression.py @@ -1,3 +1,5 @@ +import importlib +import importlib.resources import typing as t import jmespath @@ -6,12 +8,17 @@ from zyp.model.bucket import MokshaTransformer, TransonTemplate +# TODO: Is there a better way to configure jq using a custom search path +# instead of injecting the `include` statement each time again? +jq_functions_path = importlib.resources.files("src") / "zyp" +jq_functions_import = f'include "function" {{"search": "{jq_functions_path}"}};' + def compile_expression(type: str, expression: t.Union[str, TransonTemplate]) -> MokshaTransformer: # noqa: A002 if type == "jmes": return jmespath.compile(expression) elif type == "jq": - return jq.compile(expression) + return jq.compile(f"{jq_functions_import} {expression}") elif type == "transon": return transon.Transformer(expression) else: diff --git a/tests/transform/mongodb/data.py b/tests/transform/mongodb/data.py index ac29876..ddcff7a 100644 --- a/tests/transform/mongodb/data.py +++ b/tests/transform/mongodb/data.py @@ -215,3 +215,28 @@ "uuid": "73ffd264-44b3-4c69-90e8-e7d1dfc035d4", }, } + + +RECORD_IN_ANOMALIES = { + "_id": { + "$oid": "56027fcae4b09385a85f9344", + }, + "python": { + "list_of_nested_list": [1, [2, 3], 4], + "list_of_objects": [{}], + "to_dict": 123, + "to_list": 123, + "to_string": 123, + }, +} + +RECORD_OUT_ANOMALIES = { + "_id": "56027fcae4b09385a85f9344", + "python": { + "list_of_nested_list": [1, 2, 3, 4], + "list_of_objects": None, + "to_dict": {"id": 123}, + "to_list": [123], + "to_string": "123", + }, +} diff --git a/tests/transform/mongodb/test_mongodb_full.py b/tests/transform/mongodb/test_mongodb_full.py index 5151a3e..9c7bd6a 100644 --- a/tests/transform/mongodb/test_mongodb_full.py +++ b/tests/transform/mongodb/test_mongodb_full.py @@ -1,11 +1,30 @@ # ruff: noqa: E402 +from copy import deepcopy + import pytest +from zyp.model.collection import CollectionTransformation +from zyp.model.moksha import MokshaTransformation + pytestmark = pytest.mark.mongodb from commons_codec.model import SQLOperation -from commons_codec.transform.mongodb import MongoDBFullLoadTranslator -from tests.transform.mongodb.data import RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES +from commons_codec.transform.mongodb import MongoDBCrateDBConverter, MongoDBFullLoadTranslator +from tests.transform.mongodb.data import ( + RECORD_IN_ALL_TYPES, + RECORD_IN_ANOMALIES, + RECORD_OUT_ALL_TYPES, + RECORD_OUT_ANOMALIES, +) + +testdata = [ + (RECORD_IN_ALL_TYPES, RECORD_OUT_ALL_TYPES, "all-types"), + (RECORD_IN_ANOMALIES, RECORD_OUT_ANOMALIES, "anomalies"), +] +testdata_ids = [ + "all-types", + "anomalies", +] def test_sql_ddl(): @@ -13,26 +32,50 @@ def test_sql_ddl(): assert translator.sql_ddl == "CREATE TABLE IF NOT EXISTS foo (oid TEXT, data OBJECT(DYNAMIC));" -def test_to_sql_operation(): +def make_translator(kind: str) -> MongoDBFullLoadTranslator: + transformation = None + if kind == "anomalies": + transformation = CollectionTransformation( + pre=MokshaTransformation() + .jq(".[] |= (.python.list_of_nested_list |= flatten)") + .jq(".[] |= (.python.list_of_objects |= prune_array_of_objects)") + .jq('.[] |= (.python.to_dict |= to_object({"key": "id"}))') + .jq(".[] |= (.python.to_list |= to_array)") + .jq(".[] |= (.python.to_string |= tostring)") + ) + converter = MongoDBCrateDBConverter(transformation=transformation) + translator = MongoDBFullLoadTranslator(table_name="from.mongodb", converter=converter) + return translator + + +@pytest.mark.parametrize("data_in, data_out, kind", testdata, ids=testdata_ids) +def test_to_sql_operation(data_in, data_out, kind): """ Verify outcome of `MongoDBFullLoadTranslator.to_sql` operation. """ - translator = MongoDBFullLoadTranslator(table_name="foo") - assert translator.to_sql([RECORD_IN_ALL_TYPES]) == SQLOperation( - statement="INSERT INTO foo (oid, data) VALUES (:oid, :record);", - parameters=[{"oid": "56027fcae4b09385a85f9344", "record": RECORD_OUT_ALL_TYPES}], + # Create translator component. + translator = make_translator(kind) + + # Compute CrateDB operation (SQL+parameters) from MongoDB document. + operation = translator.to_sql(deepcopy([data_in])) + assert operation == SQLOperation( + statement='INSERT INTO "from".mongodb (oid, data) VALUES (:oid, :record);', + parameters=[{"oid": "56027fcae4b09385a85f9344", "record": data_out}], ) @pytest.mark.integration -def test_to_sql_cratedb(caplog, cratedb): +@pytest.mark.parametrize("data_in, data_out, kind", testdata, ids=testdata_ids) +def test_to_sql_cratedb(caplog, cratedb, data_in, data_out, kind): """ Verify writing converted MongoDB document to CrateDB. """ + # Create translator component. + translator = make_translator(kind) + # Compute CrateDB operation (SQL+parameters) from MongoDB document. - translator = MongoDBFullLoadTranslator(table_name="from.mongodb") - operation = translator.to_sql(RECORD_IN_ALL_TYPES) + operation = translator.to_sql(deepcopy(data_in)) # Insert into CrateDB. cratedb.database.run_sql(translator.sql_ddl) @@ -44,4 +87,4 @@ def test_to_sql_cratedb(caplog, cratedb): assert cratedb.database.count_records("from.mongodb") == 1 results = cratedb.database.run_sql('SELECT * FROM "from".mongodb;', records=True) # noqa: S608 - assert results[0]["data"] == RECORD_OUT_ALL_TYPES + assert results[0]["data"] == data_out diff --git a/tests/zyp/test_moksha.py b/tests/zyp/test_moksha.py index 5393ffd..532dbcc 100644 --- a/tests/zyp/test_moksha.py +++ b/tests/zyp/test_moksha.py @@ -1,21 +1,143 @@ +""" +Exercise a few transformation recipes using `jq`. +https://github.com/jqlang/jq/blob/master/src/builtin.jq +""" + +from copy import deepcopy + import pytest from jmespath.exceptions import ParseError from zyp.model.moksha import MokshaRule, MokshaTransformation -def test_moksha_jq_compute_nested(): +def test_moksha_jq_idempotency(): + """ + Idempotent transformations should not modify data. + """ + data = [{"foo": "bar"}, {"baz": "qux"}] + transformation = MokshaTransformation().jq(".") + assert transformation.apply(deepcopy(data)) == data + + +def test_moksha_jq_select_pick_keys(): + """ + Verify selecting elements with moksha/jq. + """ + data_in = [{"meta": {"id": "Hotzenplotz", "timestamp": 123456789}, "data": {"abc": 123, "def": 456}}] + data_out = [{"meta": {"id": "Hotzenplotz"}, "data": {"abc": 123}}] + transformation = MokshaTransformation().jq(".[] |= pick(.meta.id, .data.abc)") + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_select_pick_indices(): + """ + Verify selecting elements with moksha/jq. + """ + data_in = [{"data": [1, {"foo": "bar"}, 2]}] + data_out = [{"data": [1, 2]}] + transformation = MokshaTransformation().jq(".[] |= (pick(.data.[0], .data.[2]) | prune_null)") + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_select_drop_keys(): """ - Verify updating deeply nested field with value, using moksha/jq. + Verify selecting elements with moksha/jq. + """ + data_in = [{"meta": {"id": "Hotzenplotz", "timestamp": 123456789}, "data": {"abc": 123, "def": 456}}] + data_out = [{"meta": {"id": "Hotzenplotz"}, "data": {"abc": 123}}] + transformation = MokshaTransformation().jq(".[] |= del(.meta.timestamp, .data.def)") + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_select_drop_indices(): + """ + Verify selecting elements with moksha/jq. + """ + data_in = [{"data": [1, {"foo": "bar"}, 2]}] + data_out = [{"data": [1, 2]}] + transformation = MokshaTransformation().jq(".[] |= del(.data.[1])") + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_compute_scalar(): + """ + Verify updating deeply nested field with value. https://stackoverflow.com/a/65822084 """ + data_in = [{"data": {"abc": 123}}] + data_out = [{"data": {"abc": 246}}] transformation = MokshaTransformation().jq(".[] |= (.data.abc *= 2)") - assert transformation.apply([{"data": {"abc": 123}}]) == [{"data": {"abc": 246}}] + assert transformation.apply(data_in) == data_out -def test_moksha_jq_flatten_list(): +def test_moksha_jq_cast_string(): """ - Verify flattening nested list, using moksha/jq. + Verify type casting using moksha/jq. + """ + data_in = [{"data": {"abc": 123}}] + data_out = [{"data": {"abc": "123"}}] + transformation = MokshaTransformation().jq(".[] |= (.data.abc |= tostring)") + assert transformation.apply(data_in) == data_out + + data_in = [{"data": [{"abc": 123}, {"abc": "123"}]}] + data_out = [{"data": [{"abc": "123"}, {"abc": "123"}]}] + transformation = MokshaTransformation().jq(".[] |= (.data[].abc |= tostring)") + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_cast_array_exact(): + """ + Verify type casting using moksha/jq. + """ + transformation = MokshaTransformation().jq(".[] |= (.data.abc |= to_array)") + assert transformation.apply([{"data": {"abc": 123}}]) == [{"data": {"abc": [123]}}] + assert transformation.apply([{"data": {"abc": [123]}}]) == [{"data": {"abc": [123]}}] + + +def test_moksha_jq_cast_array_iterable(): + """ + Verify type casting using moksha/jq. + """ + + data_in = [{"data": [{"abc": 123}, {"abc": [456]}]}] + data_out = [{"data": [{"abc": [123]}, {"abc": [456]}]}] + + transformation = MokshaTransformation().jq(".[] |= (.data[].abc |= to_array)") + assert transformation.apply(data_in) == data_out + + transformation = MokshaTransformation().jq(".[] |= (.data[] |= (.abc |= to_array))") + assert transformation.apply(data_in) == data_out + + transformation = MokshaTransformation().jq(".[] |= (.data[].abc |= (foreach . as $item (0; $item; to_array)))") + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_cast_dict(): + """ + Verify type casting using moksha/jq. + """ + + data_in = [{"data": [{"abc": 123}, {"abc": 456}, {"abc": {"id": 789}}]}] + data_out = [{"data": [{"abc": {"id": 123}}, {"abc": {"id": 456}}, {"abc": {"id": 789}}]}] + + transformation = MokshaTransformation().jq('.[] |= (.data[].abc |= to_object({"key": "id"}))') + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_prune_array_of_objects(): + """ + Verify dropping arrays of objects recursively. + """ + data_in = [{"data": {"abc": [{"foo": 1}], "def": [42.42]}}] + data_out = [{"data": {"abc": None, "def": [42.42]}}] + transformation = MokshaTransformation().jq(".[] |= prune_array_of_objects") + assert transformation.apply(data_in) == data_out + + +def test_moksha_jq_flatten_array(): + """ + Verify flattening nested arrays. """ data_in = [{"data": {"abc": [{"foo": 1}, [{"foo": 2}, {"foo": 3}]]}}] data_out = [{"data": {"abc": [{"foo": 1}, {"foo": 2}, {"foo": 3}]}}] @@ -27,16 +149,19 @@ def test_transon_duplicate_records(): """ Verify record duplication works well. """ + data_in = [{"foo": "bar", "baz": "qux"}] + data_out = [{"foo": "bar", "baz": "qux"}] * 42 transformation = MokshaTransformation().transon({"$": "expr", "op": "mul", "value": 42}) - assert transformation.apply([{"foo": "bar", "baz": "qux"}]) == [{"foo": "bar", "baz": "qux"}] * 42 + assert transformation.apply(data_in) == data_out def test_transon_idempotency(): """ - Verify record duplication works well. + Idempotent transformations should not modify data. """ + data = [{"foo": "bar"}, {"baz": "qux"}] transformation = MokshaTransformation().transon({"$": "this"}) - assert transformation.apply([{"foo": "bar"}, {"baz": "qux"}]) == [{"foo": "bar"}, {"baz": "qux"}] + assert transformation.apply(deepcopy(data)) == data def test_moksha_rule(): diff --git a/tests/zyp/test_util.py b/tests/zyp/test_util.py index 3955a86..50d8ab0 100644 --- a/tests/zyp/test_util.py +++ b/tests/zyp/test_util.py @@ -37,7 +37,7 @@ def test_compile_expression_jmes(): def test_compile_expression_jq(): transformer: jq._Program = compile_expression(type="jq", expression=".") - assert transformer.program_string == "." + assert transformer.program_string.endswith(".") def test_compile_expression_transon():