Skip to content

Commit

Permalink
Zyp: Add Zyp, a compact transformation engine
Browse files Browse the repository at this point in the history
A data model and implementation for a compact transformation engine written
in Python.

- Based on JSON Pointer (RFC 6901), JMESPath, and transon
- Implemented using `attrs` and `cattrs`
- Includes built-in transformation functions `to_datetime` and
  `to_unixtime`
- Ability to marshal and unmarshal its representation to/from JSON and YAML
  • Loading branch information
amotl committed Aug 11, 2024
1 parent e9f17e9 commit 383d3f4
Show file tree
Hide file tree
Showing 32 changed files with 1,600 additions and 6 deletions.
64 changes: 63 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb]
pip install --use-pep517 --prefer-binary --editable=.[mongodb,develop,test]
- name: Run linters and software tests
run: poe check
Expand All @@ -120,3 +120,65 @@ jobs:
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true


test-zyp:
name: "
Zyp: Python ${{ matrix.python-version }}
"
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ['ubuntu-latest']
python-version: ['3.8', '3.9', '3.12']

env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}

steps:

- name: Acquire sources
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
architecture: x64
cache: 'pip'
cache-dependency-path:
pyproject.toml

- name: Set up project
run: |
# `setuptools 0.64.0` adds support for editable install hooks (PEP 660).
# https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400
pip install "setuptools>=64" --upgrade
# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[zyp,develop,test]
- name: Set timezone
uses: szenius/[email protected]
with:
timezoneLinux: "Europe/Berlin"
timezoneMacos: "Europe/Berlin"
timezoneWindows: "European Standard Time"

- name: Run linters and software tests
run: poe check

# https://github.com/codecov/codecov-action
- name: Upload coverage results to Codecov
uses: codecov/codecov-action@v4
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: zyp
env_vars: OS,PYTHON
name: codecov-umbrella
fail_ci_if_error: true
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Added `BucketTransformation`, a minimal transformation engine
based on JSON Pointer (RFC 6901).

## 2024/08/05 v0.0.3
- Added transformer for AWS DMS to CrateDB SQL
Expand Down
3 changes: 3 additions & 0 deletions docs/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@
- [ ] MongoDB: Implement stream resumption using `start_after`
- [ ] Feature: Filter by events, e.g. Ignore "delete" events?
- [ ] Integration Testing the "example" programs?
- [ ] Improve capabilities of DMS translator
https://github.com/daq-tools/commons-codec/issues/11
- https://github.com/supabase/pg_replicate
48 changes: 48 additions & 0 deletions docs/zyp/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Zyp Backlog

## Iteration +1
- Refactor module namespace to `zyp`
- Documentation
- CLI interface
- Apply to MongoDB Table Loader in CrateDB Toolkit

## Iteration +2
Demonstrate!
- math expressions
- omit key (recursively)
- combine keys
- filter on keys and/or values
- Pathological cases like "Not defined" in typed fields like `TIMESTAMP`
- Use simpleeval, like Meltano, and provide the same built-in functions
- https://sdk.meltano.com/en/v0.39.1/stream_maps.html#other-built-in-functions-and-names
- https://github.com/MeltanoLabs/meltano-map-transform/pull/255
- https://github.com/MeltanoLabs/meltano-map-transform/issues/252
- Use JSONPath, see https://sdk.meltano.com/en/v0.39.1/code_samples.html#use-a-jsonpath-expression-to-extract-the-next-page-url-from-a-hateoas-response

## Iteration +3
- Moksha transformations on Buckets
- Investigate using JSON Schema
- Fluent API interface
- https://github.com/Halvani/alphabetic
- Mappers do not support external API lookups.
To add external API lookups, you can either (a) land all your data and
then joins using a transformation tool like dbt, or (b) create a custom
mapper plugin with inline lookup logic.
=> Example from Luftdatenpumpe, using a reverse geocoder
- [ ] Define schema
https://sdk.meltano.com/en/latest/typing.html
- https://docs.meltano.com/guide/v2-migration/#migrate-to-an-adapter-specific-dbt-transformer
- https://github.com/meltano/sdk/blob/v0.39.1/singer_sdk/mapper.py

## Fluent API Interface

```python

from zyp.model.fluent import FluentTransformation

transformation = FluentTransformation()
.jmes("records[?starts_with(location, 'B')]")
.rename_fields({"_id": "id"})
.convert_values({"/id": "int", "/value": "float"}, type="pointer-python")
.jq(".[] |= (.value /= 100)")
```
186 changes: 186 additions & 0 deletions docs/zyp/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Zyp Transformations

## About
A data model and implementation for a compact transformation engine written
in [Python], based on [JSON Pointer] (RFC 6901), [JMESPath], and [transon],
implemented using [attrs] and [cattrs].

## Ideas
:Conciseness:
Define a multistep data refinement process with as little code as possible.
:Low Footprint:
Doesn't need any infrastructure or pipeline framework. It's just a little library.
:Interoperability:
Marshal transformation recipe definition to/from text-only representations (JSON,
YAML), in order to encourage implementations in other languages.
:Performance:
Well, it is written in Python. Fragments can be re-written in Rust, when applicable.
:Immediate:
Other ETL frameworks and concepts often need to first land your data in the target
system before applying subsequent transformations. Zyp is working directly within
the data pipeline, before data is inserted into the target system.

## Synopsis I
A basic transformation example for individual data records.

```python
from zyp.model.bucket import BucketTransformation, FieldRenamer, ValueConverter

# Consider a slightly messy collection of records.
data_in = [
{"_id": "123", "name": "device-foo", "reading": "42.42"},
{"_id": "456", "name": "device-bar", "reading": -84.01},
]

# Define a transformation that renames the `_id` field to `id`,
# casts its value to `int`, and casts the `reading` field to `float`.
transformation = BucketTransformation(
names=FieldRenamer().add(old="_id", new="id"),
values=ValueConverter()
.add(pointer="/id", transformer="builtins.int")
.add(pointer="/reading", transformer="builtins.float"),
)

for record in data_in:
print(transformation.apply(record))
```
The result is a transformed data collection.
```json
[
{"id": 123, "name": "device-foo", "reading": 42.42},
{"id": 456, "name": "device-bar", "reading": -84.01}
]
```

## Synopsis II
A more advanced transformation example for a collection of data records.

Consider a messy collection of input data.
- The actual collection is nested within the top-level `records` item.
- `_id` fields are conveyed in string format.
- `value` fields include both integer and string values.
- `value` fields are fixed-point values, using a scaling factor of `100`.
- The collection includes invalid `null` records.
Those records usually trip processing when, for example, filtering on object items.
```python
data_in = {
"message-source": "system-3000",
"message-type": "eai-warehouse",
"records": [
{"_id": "12", "meta": {"name": "foo", "location": "B"}, "data": {"value": "4242"}},
None,
{"_id": "34", "meta": {"name": "bar", "location": "BY"}, "data": {"value": -8401}},
{"_id": "56", "meta": {"name": "baz", "location": "NI"}, "data": {"value": 2323}},
{"_id": "78", "meta": {"name": "qux", "location": "NRW"}, "data": {"value": -580}},
None,
None,
],
}
```

Consider after applying a corresponding transformation, the expected outcome is a
collection of valid records, optionally filtered, and values adjusted according
to relevant type hints and other conversions.
```python
data_out = [
{"id": 12, "meta": {"name": "foo", "location": "B"}, "data": {"value": 42.42}},
{"id": 34, "meta": {"name": "bar", "location": "BY"}, "data": {"value": -84.01}},
]
```

Let's come up with relevant pre-processing rules to cleanse and mangle the shape of the
input collection. In order to make this example more exciting, let's include two special
needs:
- Filter input collection by value of nested element.
- Rename top-level fields starting with underscore `_`.

Other than those special rules, the fundamental ones to re-shape the data are:
- Unwrap `records` attribute from container dictionary into actual collection.
- Filter collection, both by omitting invalid/empty records, and by applying query
constrains.
- On each record, rename the top-level `_id` field to `id`.
- On each record, adjust the data types of the `id` and `value` fields.
- Postprocess collection, applying a custom scaling factor to the `value` field.

Zyp let's you concisely write those rules down, using the Python language.

```python
from zyp.model.bucket import BucketTransformation, FieldRenamer, ValueConverter
from zyp.model.collection import CollectionTransformation
from zyp.model.moksha import MokshaTransformation

transformation = CollectionTransformation(
pre=MokshaTransformation().jmes("records[?not_null(meta.location) && !starts_with(meta.location, 'N')]"),
bucket=BucketTransformation(
names=FieldRenamer().add(old="_id", new="id"),
values=ValueConverter()
.add(pointer="/id", transformer="builtins.int")
.add(pointer="/data/value", transformer="builtins.float"),
),
post=MokshaTransformation().jq(".[] |= (.data.value /= 100)"),
)

data_out = transformation.apply(data_in)
```
Alternatively, serialize the `zyp-collection` transformation description,
for example into YAML format.
```python
print(transformation.to_yaml())
```
```yaml
meta:
version: 1
type: zyp-collection
pre:
rules:
- expression: records[?not_null(meta.location) && !starts_with(meta.location, 'N')]
type: jmes
bucket:
names:
rules:
- new: id
old: _id
values:
rules:
- args: []
pointer: /id
transformer: builtins.int
- args: []
pointer: /data/value
transformer: builtins.float
post:
rules:
- expression: .[] |= (.data.value /= 100)
type: jq
```
## Prior Art
- [Singer Transformer]
- [PipelineWise Transformations]
- [singer-transform]
- [Meltano Inline Data Mapping]
- [Meltano Inline Stream Maps]
- [AWS DMS source filter rules]
- [AWS DMS table selection and transformation rules]
- ... and many more. Thanks for the inspirations.
## Etymology
With kudos to [Kris Zyp] for conceiving [JSON Pointer].
[attrs]: https://www.attrs.org/
[AWS DMS source filter rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.Filters.html
[AWS DMS table selection and transformation rules]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.html
[cattrs]: https://catt.rs/
[Kris Zyp]: https://github.com/kriszyp
[JMESPath]: https://jmespath.org/
[JSON Pointer]: https://datatracker.ietf.org/doc/html/rfc6901
[Meltano Inline Data Mapping]: https://docs.meltano.com/guide/mappers/
[Meltano Inline Stream Maps]: https://sdk.meltano.com/en/latest/stream_maps.html
[PipelineWise Transformations]: https://transferwise.github.io/pipelinewise/user_guide/transformations.html
[Python]: https://en.wikipedia.org/wiki/Python_(programming_language)
[Singer Transformer]: https://github.com/singer-io/singer-python/blob/master/singer/transform.py
[singer-transform]: https://github.com/dkarzon/singer-transform
[transon]: https://transon-org.github.io/
24 changes: 24 additions & 0 deletions docs/zyp/research.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Transformer Ingredients Research

## Toolbox
- jq, jsonpointer, jmespath, funcy, morph, boltons, toolz
- json-spec, jdata, jolt, json-document-transforms, transon


## Resources
- https://pypi.org/project/json-spec/
- https://pypi.org/project/transon/
- https://pypi.org/project/jdata/
- https://github.com/microsoft/json-document-transforms
- https://github.com/Microsoft/json-document-transforms/wiki
- https://github.com/bazaarvoice/jolt
- https://stackoverflow.com/questions/76303733/exploring-jolt-functions-for-json-to-json-transformations-an-overview
- https://github.com/microsoft/JsonToJsonMapper
- https://pypi.org/project/jdt/
- https://github.com/videntity/json-data-tools
- https://github.com/datavis-tech/json-templates
- https://github.com/google/jsonnet
- https://github.com/jsonata-js/jsonata
- https://github.com/pacifica/python-jsonpath2
- https://github.com/reagento/adaptix
- https://blog.panoply.io/best-data-transformation-tools
Loading

0 comments on commit 383d3f4

Please sign in to comment.