Skip to content

Latest commit

 

History

History
421 lines (305 loc) · 20 KB

planners.adoc

File metadata and controls

421 lines (305 loc) · 20 KB

Planners guide

A planner defines which mutations should be applied to an output for a given set of data. Envelope uses planners to isolate the logic of applying data to an output from the process that creates the data and from the output itself.

For application developers who are working with Envelope, planners allow them to write data to an output in various, and sometimes complex, ways without having to code that logic themselves. This is particularly useful in Envelope pipelines because these various methods are often very generic, and just need some basic configuration to drive their behavior for a particular data model. If Envelope does not provide a planner for a desired set of mutation logic there is a pluggable API for custom planners to be provided instead.

The data that is provided to a planner comes from the data of the step that the planner is specified in. That data can come from an input or from a deriver. This data that is coming into the planner from the step is considered the 'arriving' data that needs to be applied to the output.

In some cases a planner additionally needs the corresponding 'existing' data of the arriving data to generate all of the required mutations. This is handled automatically by Envelope.

Planners make heavy use of the concept of time — when did something occur in the real world, when did something get recorded in an output, data arriving out of order, etc. — and so they need to know how to read various representations of time on arriving records. By default the provided planners assume that time values are modeled as a long field of the number of milliseconds since epoch. To use other time models, see the below section Handling time.

There are a set of standardized mutation types, e.g. INSERT and UPDATE, that planners emit, and that allow planners and outputs to be completely independent of each other. Each planner specifies which of the mutation types it can emit, and typically a planner will not emit all of the mutation types. Conversely, each output specifies which of the mutation types that it can apply, and typically an output will not support applying all of the mutation types. For a planner and an output to be compatible all mutation types that the planner may emit must be supported by the output. There is a compatibility matrix for planners and outputs at the end of this guide.

One example could be for a step that contains the kafka output. The Kafka component does not have capability to update or delete previously created (i.e. existing) records in place, so the only mutation type that it supports is INSERT. For the step to write to Kafka it must then include a planner that only emits INSERTs. The only planner currently bundled with Envelope that meets this criteria is the append planner. The step may then look something like:

steps {
  ...
  writetokafka {
    deriver {
      ...
    }
    planner {
      type = append
    }
    output {
      type = kafka
      ...
    }
  }
  ...
}

Envelope-provided planners

There are a number of planners bundled with Envelope.

Append

The append planner simply takes all the arriving records and plans them collectively as an INSERT. This is useful for writing to outputs that can not update records in place, such as kafka and hive.

Upsert

The upsert planner simply takes all the arriving records and plans them collectively as an UPSERT. This is useful for writing to outputs that can update records in place, such as kudu, and where there can not be duplicate records for the same key.

The use of upsert logic is known as maintaining a Type 1 slowly changing dimension.

Note that the definition of the UPSERT mutation type does not consider the timestamp on the record, so using UPSERT out of event time (real world) order can lead the output to contain stale data. For event-time upsert logic, use the eventtimeupsert planner.

Overwrite

The overwrite planner simply takes all the arriving records and plans them collectively as an OVERWRITE. This is useful for indirectly doing updates to outputs that can not update records in place, but which can overwrite higher-level containers such as tables and partitions, such as hive and filesystem.

Delete

The delete planner simply takes all the arriving records and plans them collectively as a DELETE. This is useful for removing data in cases such as cleaning up intermediate data, or for implementing a retention policy.

Event-time upsert

The eventtimeupsert planner considers the timestamp on the arriving record, and the timestamp on the existing record (if one exists), to plan the mutations that achieve an upsert in event time. This allows records to arrive out of order and the output to always contain the freshest (by event time) data. An existing record is defined by one with the same natural key (as provided to the planner configuration) as the arriving record.

Some of the planner rules are:

  • If there is no existing record, plan an INSERT.

  • If the arriving record has a timestamp before the existing record, do nothing (because the arriving record is stale).

  • If the arriving record has a timestamp the same or after the existing record, and the values on the record are different, plan an UPDATE.

  • If there are multiple arriving records at once for the same key, only the latest by timestamp is used.

History

The history planner maintains a history of all records of a key. Every unique state of the key becomes a record in the output, with metadata columns that include marking the range of event time that the record was active/effective/current for. The planner can accept records that are out of event time order, or that are replayed multiple times, and continue to maintain the history accurately.

The use of history tracking logic is known as maintaining a Type 2 slowly changing dimension. A Type 2 SCD is very efficient and easy to query for the current and for point-in-time records of a key. In many cases it is strongly recommended to use a Type 2 SCD instead of appending record in the history with only a single timestamp.

History planner example

An example could be for arriving records that contained the fields key, timestamp, and value. The corresponding output table columns could then be key, timestamp, value, startdate, enddate, currentflag. For the purpose of the example the output starts out with no existing records, and we will consider timestamps to be very short integers to keep the example simple.

First, an arriving record comes to the planner:

key timestamp value

A

10

hello

The planner generates the mutations that are applied to the output so that the output then looks like:

key timestamp value startdate enddate currentflag

A

10

hello

10

999999

Y

The use of 999999 represents a far future date that will never be reached. The planner actually uses '31-DEC-9999' but we will use '999999' to keep the example simple. The currentflag = Y represents that this record is the current record for the key.

Next, another arriving record comes to the planner:

key timestamp value

A

20

world

The planner again generates the mutations that are applied to the output so that the output then looks like:

key timestamp value startdate enddate currentflag

A

10

hello

10

19

N

A

20

world

20

999999

Y

The two records of the key have been maintained in the table. The range of time that the timestamp = 10 record was active has been altered to one timestamp before the new record, and its current flag has been set to N.

Finally, another arriving record comes to the planner, this time out of event time order:

key timestamp value

A

15

foo

The planner again generates the mutations that are applied to the output so that the output then looks like:

key timestamp value startdate enddate currentflag

A

10

hello

10

14

N

A

15

foo

15

19

N

A

20

world

20

999999

Y

From an end-user perspective, the table is easy to query for the active records:

SELECT key, timestamp, value FROM my_history WHERE current_flag = 'Y';
key timestamp value

A

20

world

And to query for any time in the past:

SELECT key, timestamp, value FROM my_history WHERE 17 BETWEEN startdate AND enddate;
key timestamp value

A

15

foo

Bi-temporal

The bitemporal planner is similar to the history planner, but instead it maintains the history of the records of a key in both event time and system time (i.e. bi-temporality). This allows end users to query the output for how the key changed over time in the real world (event time), and over time in the output table (system time), which may not be the same.

Event time and system time can be different for a number of reasons:

  • There is always some latency between when an event happens in the real world and when it is finally visible to end user queries.

  • Records can arrive to Envelope out of event time order, perhaps due to race conditions, or due to input replays.

  • Records can arrive to Envelope with the same key and timestamp as an existing record, but with different values, which would lead to a non-system-time-tracking planner to overwrite the old state of the timestamp with the correction.

Bi-temporal planner example

Similarly to the history planner example, an example could be for arriving records that contained the fields key, timestamp, and value. The corresponding output table columns could then be key, timestamp, value, eventtime_startdate, eventtime_enddate, systemtime_startdate, systemtime_enddate currentflag. For the purpose of the example the output starts out with no existing records, and we will consider timestamps to be very short integers to keep the example simple.

First, an arriving record comes to the planner at system time 13:

key timestamp value

A

10

hello

The planner generates the mutations that are applied to the output so that the output then looks like:

key timestamp value eventtime_startdate eventtime_enddate systemtime_startdate systemtime_enddate currentflag

A

10

hello

10

999999

13

999999

Y

The use of 999999 represents a far future date that will never be reached. The planner actually uses '31-DEC-9999' but we will use '999999' to keep the example simple. The currentflag = Y represents that this record is the current record for the key in both event time and system time.

Next, another arriving record comes to the planner at system time 22:

key timestamp value

A

20

world

The planner again generates the mutations that are applied to the output so that the output then looks like:

key timestamp value eventtime_startdate eventtime_enddate systemtime_startdate systemtime_enddate currentflag

A

10

hello

10

999999

13

21

N

A

10

hello

10

19

22

999999

N

A

20

world

20

999999

22

999999

Y

The latest state of the two event time versions of the key, and the old state of the first event time version of the key, have been maintained in the output. This is done by:

  • Not updating the existing state of timestamp = 10, because this is needed for tracking this state over system time. Instead we only update the system time end date and the current flag.

  • Inserting the new state of timestamp = 10, so that with both states we can query how this key at this timestamp existed in the table over system time.

  • Inserting the only state we have seen for timestamp = 20.

Finally, another arriving record comes to the planner at system time 29, and this time as a correction to the state of the key at event timestamp 10:

key timestamp value

A

10

foo

The planner again generates the mutations that are applied to the output so that the output then looks like:

key timestamp value eventtime_startdate eventtime_enddate systemtime_startdate systemtime_enddate currentflag

A

10

hello

10

999999

13

21

N

A

10

hello

10

19

22

28

N

A

10

foo

10

19

29

999999

N

A

20

world

20

999999

22

999999

Y

There are now three states for how the key at event timestamp 10 has been represented in the output.

From an end-user perspective, the table is easy to query for the active records:

-- What is the latest known state for each key?
SELECT key, timestamp, value FROM my_history WHERE current_flag = 'Y';
key timestamp value

A

20

world

And to query for our latest known state of an arbitrary event time in the past:

-- What is the latest known state for each key at real world time 17?
-- Assuming that NOW() returns an integer > 29 (because we are querying after the last record was applied) and < 999999
SELECT key, timestamp, value FROM my_history WHERE 17 BETWEEN eventtime_startdate AND eventtime_enddate AND NOW() BETWEEN systemtime_startdate AND systemtime_enddate;
key timestamp value

A

10

foo

And to query for what was at the previous system time our latest known state of an arbitrary event time in the past:

-- What was the state of each key for real world time 17 in this output at system time 24?
SELECT key, timestamp, value FROM my_history WHERE 17 BETWEEN eventtime_startdate AND eventtime_enddate AND 24 BETWEEN systemtime_startdate AND systemtime_enddate;
key timestamp value

A

10

hello

User-provided planners

Custom developed planners can be provided by giving the fully-qualified class name (or alias—​see below) of the planner to the type configuration. The class must implement BulkPlanner or RandomPlanner.

Using aliases

To use an alias in configuration files, Envelope needs to be able to find your class. First, your class will need to implement the ProvidesAlias interface. Next, place the implementation’s fully qualified class name in a META-INF/services/com.cloudera.labs.envelope.planner.Planner file on the class path - the usual method is to package the file with your JAR.

Bulk vs random planners

Under the hood each planner is either a bulk or random planner.

A bulk planner applies the same mutation type to all mutations of the plan (e.g. all records of the step become UPSERTs). These planners do not require the existing records of the arriving records, and so typically plan very quickly.

A random planner applies individual mutation types to the mutations of the plan (e.g. one record may be an INSERT and another may be an UPDATE), and in a specific order for the output to apply them in. These planners do require the existing records of the arriving records, and so typically plan more slowly because they first require a read from the output (this is done automatically by Envelope and not by the planner) and also tend to have more complex mutation logic.

Planner Type

append

Bulk

upsert

Bulk

overwrite

Bulk

delete

Bulk

eventtimeupsert

Random

history

Random

bitemporal

Random

Handling time

Planners interpret values that encode time using 'time models'. By default Envelope-provided planners use the longmillis time model which reads a single long field as the number of milliseconds since epoch. In total there are five time models that are provided by Envelope. Custom time models can be provided in much the same way as a custom planner. Time models can handle a time value that is encoded over a single field or over multiple fields.

Envelope-provided time models

Long milliseconds

The longmillis time model reads a long field as the number of milliseconds since epoch (1970-01-01 00:00:00.000 UTC).

Nanoseconds with sequence number

The nanoswithseqnum time model reads two fields — a decimal(38,0) field of the number of nanoseconds since epoch, and an integer field of the 1-indexed sequence number of the records within a single nanosecond. This can be useful for financial use cases where multiple events may occur within a single nanosecond. When applying the system time with this time model (e.g. on a last updated or system time effective from attribute) the time will only have millisecond precision.

String date

The stringdate time model reads a string field as a date with no time component. Operations with this time model are at the whole date granularity. The default format for interpreting the string value is "yyyy-MM-dd". A custom format can be provided with the format property. Date formats are defined by Java’s SimpleDateFormat.

String date-time

The stringdatetime time model reads a string field as a date with a time component. Operations with this time model are at the millisecond granularity. The default format for interpreting the string value is "yyyy-MM-dd HH:mm:ss.SSS". A custom format can be provided with the format property. Date formats are defined by Java’s SimpleDateFormat.

Timestamp

The timestamp time model reads a Spark SQL timestamp field. Operations with this time model are at the nanosecond granularity.

Example

Consider an example where the bi-temporal planner is being used in a step, and the event time on the arriving record is encoded as a string date-time with a custom format, and the system time is to be recorded on the output is a Spark SQL timestamp. We could configure the planner accordingly:

steps {
  ...
  step_name {
    deriver {
      ...
    }
    planner {
      type = bitemporal
      fields.key = [name]
      fields.timestamp = [time]
      fields.values = [score]
      fields.event.time.effective.from = [event_start_ts]
      fields.event.time.effective.to = [event_end_ts]
      fields.system.time.effective.from = [system_start_ts]
      fields.system.time.effective.to = [system_end_ts]
      field.current.flag = current_flag
      time.model {
        event {
          type = stringdatetime
          format = "dd-MMM-yyyy HH.mm.ss:SSS"
        }
        system {
          type = timestamp
        }
      }
    }
    output {
      ...
    }
  }
  ...
}

Mutation types

Envelope defines five standardized mutation types. Internally there is also a NONE mutation but this is not emitted by the provided planners.

Mutation type Definition

INSERT

Insert the mutation as new rows. Do not impact existing rows.

UPDATE

Update the matching existing rows with the values of the mutation. Do not add new rows.

UPSERT

Insert or update the mutation based on whether the key of the mutation already exists.

DELETE

Delete the existing rows that match the mutation. May contain non-key fields.

OVERWRITE

Replace all existing rows with the mutation.

Planner emitted mutation types

This table defines the mutation types that each provided planner can emit.

INSERT

UPDATE

UPSERT

DELETE

OVERWRITE

append

Yes

upsert

Yes

overwrite

Yes

delete

Yes

eventtimeupsert

Yes

Yes

history

Yes

Yes

bitemporal

Yes

Yes

Output supported mutation types

This table defines the mutation types that each provided output can support.

INSERT

UPDATE

UPSERT

DELETE

OVERWRITE

kudu

Yes

Yes

Yes

Yes

kafka

Yes

log

Yes

filesystem

Yes

Yes

hive

Yes

Yes

jdbc

Yes

hbase

Yes

Yes

Planner/output compatibility

This table defines which planners are compatible with each output. Attempting to use incompatible planners and outputs will lead Envelope to throw an error pointing out the incompatibility.

kudu

kafka

log

filesystem

hive

jdbc

hbase

append

Yes

Yes

Yes

Yes

Yes

Yes

upsert

Yes

Yes

overwrite

Yes

Yes

delete

Yes

Yes

eventtimeupsert

Yes

history

Yes

bitemporal

Yes