Task Compaction is a feature to hold a couple of tasks and decide which to process.
Decaton provides a key-based way to compact multiple tasks which are arrived within a given time. The key-based means that only the tasks have the same key are the targets of compaction.
When you don’t need to process all incoming task but pick some of them within a given time.
A location-based service collects a user’s geo data from her mobile device.
The device reports recent location many times within a short period but you only need the latest data because it is not necessary to record all of the user’s location. Suppose you are using the user’s id as a message key then you can use this feature to compact multiple geo data into one.
To use Task Compaction
, you need to do two things:
-
Instantiate
CompactionProcessor
-
Add
CompactionProcessor
to the head of your processor pipeline.
CompactionProcessor
can be seen as a selector that decides whether a task should be kept or compacted. CompactionProcessor
constructor takes two
arguments:
parameter | Description |
---|---|
|
You can think of it as a time window. When |
|
A function that takes two tasks and decides which task survived. |
Before getting started with the example, let’s create a LocationEvent
domain object which can be used to simulate the scenario we described:
public class LocationEvent {
private long timestamp; // (1)
private double latitude;
private double longitude;
// Getter and Setter
}
-
We will use
timestamp
to decide to keep which task.
Add CompactionProcessor
to the head of your processor pipeline, as shown in the following example CompactionProcessor
:
public class TaskCompactionMain {
public static void main(String[] args) {
// ...
ProcessorSubscription subscription =
SubscriptionBuilder.newBuilder("my-decaton-processor")
.processorsBuilder(
ProcessorsBuilder
.consuming("my-decaton-topic", extractor)
.thenProcess(TaskCompactionMain::createCompactionProcessor, // (1)
ProcessorScope.THREAD)
.thenProcess(LocationEventProcessor::new, // (2)
ProcessorScope.THREAD)
)
}
private static CompactionProcessor<LocationEvent> createCompactionProcessor() {
return new CompactionProcessor<>(1000L, (left, right) -> { // (3)
if (left.task().getTimestamp() == right.task().getTimestamp()) { // (4)
return CompactChoice.PICK_EITHER;
} else if (left.task().getTimestamp() > right.task().getTimestamp()) {
return CompactChoice.PICK_LEFT; // (5)
} else {
return CompactChoice.PICK_RIGHT; // (6)
}
});
}
}
-
Add
CompactionProcessor
before any other processors. -
Then your actual processing logic.
-
We set
lingerMillis
to1000L
. It means that if a task arrived and is not compacted after 1000 milliseconds then it will be processed by the following processorLocationEventProcessor
. Remember that the compactor takes two task, we name them left and right to follow the value ofCompactChoice
. This may give you a clear picture when choosing the task. -
You need to invoke
#task
to get concrete task. -
Return
CompactChoice.PICK_LEFT
means that left task survives. -
Return
CompactChoice.PICK_RIGHT
will keep the task which is newer. (i.e. arrived late)
In this section, we will briefly explain how is Task Compaction implemented.
All of the magic happened in CompactionProcessor
when a task comes to this processor the following things happen:
-
The task will be put into a in-memory window
-
When subsequent tasks arrive, if a task which has the same key as the one exists in the window,
CompactionProcessor
will choose which to preserve based on thecompactor
. The winner will be preserved in the window while loser will be marked as completed. -
The winner task will be passed to the downstream processor if the time it exists in the window exceeds
lingerMillis
.