This document shows the basic use of the Echo library, and explains its core concepts.
A site is an actor in a distributed system. It owns a log of events, which stores all the actions performed by all the sites in the distributed system. A CRDT is a data structure which can be replicated, and has a commutative, idempotent and associative merge function.
The echo library provides multiple APIs to create and replicate CRDT. It can be used to implement non-standard replicated data types, such as replicated growable arrays , or trees . This short tutorial walks you through some simple and advanced use-cases of the replication library.
Run the following code to create and replicate your first CRDT :
// a g-counter CRDT implementation
val GCounter = OneWayProjection { model: Int, _: EventIdentifier, event: Int ->
maxOf(model, event)
}
suspend fun main() {
// create two distinct actors that manage a g-counter
val alice = mutableSite(Random.nextSiteIdentifier(), 0, GCounter, strategy = SyncStrategy.Once)
val bob = mutableSite(Random.nextSiteIdentifier(), 0, GCounter)
alice.event { // this : EventScope<Int>
yield(42) // emit a new value to the g-counter
}
println("alice ${alice.value.value}, bob ${bob.value.value}")
sync(alice, bob) // let both sites converge
println("synced !")
println("alice ${alice.value.value}, bob ${bob.value.value}")
}
Check the sample out here.
You will see the following result :
alice 42, bob 0
synced !
alice 42, bob 42
Let's look together at what this code does.
GCounter
is a one-way projection. It aggregates a local state and events in a commutative, idempotent and associative fashion. Here, it implements the semantics of a G-Counter, a CRDT which always takes the maximum value between the local state and a remote state. That's why we take themaxOf
the local model, and the remote event.- alice and bob are some sites. We created both of them using the
mutableSite
site builder. Site builders specify a unique identifier for the site, an initial state, a _ projection_, and a sync strategy. alice.event { yield(42) }
emits a new event with value42
on the site alice .event { ... }
is a suspending function - it will not resume until the events will have been stored on the site. When the call resumes,alice.value
will have been updated by the projection with the new event. This is whyalice 42, bob 0
is printed.bob
still has its initial value, and alice has aggregated the event.sync
is a suspending function takes two sites and replicate their states. It makes use of a simple protocol that will ensure that all the events on the site alice are sent to the site ** bob**, and vice-versa. Because alice specified that it uses theSyncStrategy.Once
, thesync
call will resume once all the events present in both sites when the sync started will have been exchanged.
Finally, bob has received all the events generated by alice. This is why alice 42, bob 42
gets printed after synced !
.
A SyncStrategy
implements the replication protocol across multiple sites. Out-of-the-box, two
implementations are available : Once
and Continous
. The same protocol is used in both
implementations; however, the Once
strategy will stop automatically after all the events that were
available when the sync was started were sent, whereas the Continuous
strategy will only stop when
the coroutine is cancelled.
Specifying which strategy to use for a site is done in the site builder. This means that syncing
a Continuous
and a Once
will terminate, whereas syncing two Continous
sites will suspend
the sync()
call until cancellation.
val alice = mutableSite(Random.nextSiteIdentifier(), 0, GCounter, strategy = SyncStrategy.Once)
val bob = mutableSite(Random.nextSiteIdentifier(), 0, GCounter, strategy = SyncStrategy.Continuous)
val carol =
mutableSite(Random.nextSiteIdentifier(), 0, GCounter) // Defaults to SyncStrategy.Continuous
When working with continuous sync, you will probably be interested in knowing when the aggregated
value of a site changes. All sites expose a value
, which is a StateFlow<M>
of the aggregate
of type M
.
You can then collect this state to observe the changes :
val bob = mutableSite(Random.nextSiteIdentifier(), 0, GCounter)
val carol = mutableSite(Random.nextSiteIdentifier(), 0, GCounter)
launch {
bob.value.collect { model ->
println("Current model is $model") // called with the latest state
}
}
// somewhere else, in a different coroutine.
sync(bob, carol)
Check the sample out here.
As we've seen in the g-counter example, one can emit events on instances of MutableSite
. It's
possible to emit multiple events at once.
val alice = mutableSite(Random.nextSiteIdentifier(), 0, GCounter)
alice.event { state ->
println("current $state")
val (seqno, site) = yield(state + 1) // the unique identifier of the event
// we can emit multiple events at once too
// yieldAll(emptyList())
}
Check the sample out here.
All the events emitted in a single event { }
block are added atomically to the site. This means
that no remote events will be received, even if the site is currently syncing. It's therefore
possible to call yield()
multiple times in a single block, which is useful if you want to perform
multiple fine-grain operations (
say, create a tree node, move it and name it)
.
Additionally, the current state aggregate of the site is provided as a parameter. In the above example, the state value is atomically incremented by one, implementing a proper g-counter.
When an event is emitted, its identifier is returned. This identifier is globally unique, and corresponds to the Lamport timestamp of emission of the event.
Each time an event is received, it is appended to a local log of events, and the projection then updates the aggregate.
Each event has a unique identifier, composed of a sequence number, and a site identifier. All the events are sorted by their unique identifier in the event log :
flowchart LR
ev1a([Event 1:A])
ev2a([Event 2:A])
ev2b([Event 2:B])
ev3a([Event 3:A])
ev1a --> ev2a
ev2a --> ev2b
ev2b --> ev3a
rest[etc.]
ev3a -.-> rest
style rest fill:transparent,stroke-width:0px
Additionally, a site owns an aggregated model, which points to the latest known event.
flowchart LR
ev1a([Event 1:A])
ev2a([Event 2:A])
ev2b([Event 2:B])
ev3a([Event 3:A])
ev4a([Event 4:A])
ev1a --> ev2a
ev2a --> ev2b
ev2b --> ev3a
ev3a --> ev4a
model(Model)
model -. "(Aggregate)" .-> ev4a
When a new event is generated locally or received via the sync protocol, it is inserted at the right position in the event log, and the model is updated accordingly. However, the behavior differs slightly depending on the kind of projection you use.
In a one-way projection, events are required to be commutative, associative and ** idempotent**. Therefore, when the event gets inserted, all the ulterior events are applied again in the projection :
1. We want to insert the event.
+---------+ +---------+ +---------+ +---------+ +---------+
| Ev. 1@a |-->| Ev. 2@a |-->| Ev. 3@a |-->| Ev. 4@a | (insert) | Ev. 2@b |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Model = M4
(Aggregate)
2. The event is inserted, and the model is brought to the newly inserted event and updates the model.
+---------+ +---------+ +---------+ +---------+ +---------+
| Ev. 1@a |-->| Ev. 2@a |-->| Ev. 2@b |-->| Ev. 3@a |-->| Ev. 4@a |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Updated Model = M4'
(Aggregate)
+---------+ +---------+ +---------+ +---------+ +---------+
| Ev. 1@a |-->| Ev. 2@a |-->| Ev. 2@b |-->| Ev. 3@a |-->| Ev. 4@a |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Updated Model = M4'
(Aggregate)
+---------+ +---------+ +---------+ +---------+ +---------+
| Ev. 1@a |-->| Ev. 2@a |-->| Ev. 2@b |-->| Ev. 3@a |-->| Ev. 4@a |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Updated Model = M4'
(Aggregate)
In this example, it's clear that the projection applies some events multiple times.
A OneWayProjection
is therefore well suited to model any CRDT, such
as replicated growable arrays
, which are relevant for text editing. Here is a simple example with a grow-only set :
class GSet<T> : OneWayProjection<Set<T>, T> {
override fun forward(model: Set<T>, identifier: EventIdentifier, event: T): Set<T> {
return model + event
}
}
Check the sample out here.
Sometimes, it's not possible to model a replicated data type with commutative, idempotent and associative events. However, we can still store the events in an ordered replicated log, and use a projection to aggregate these events. When an event gets inserted in the past of the log, we'll simply have to "reverse" the events up to the insertion point, and then "replay" the future events :
1. We want to insert the event.
+---------+ c1 +---------+ c2 +---------+ c3 +---------+ +---------+
| Ev. 1@a |----->| Ev. 2@a |----->| Ev. 3@a |----->| Ev. 4@a | (insert) | Ev. 2@b |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Model = M3
2. We use the changes c[.] to revert the model to its previous state, until we reach the insertion point.
+---------+ c1 +---------+ c2 +---------+ +---------+ +---------+
| Ev. 1@a |----->| Ev. 2@a |----->| Ev. 3@a |----->| Ev. 4@a | (insert) | Ev. 2@b |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Model = M2
3. We're at the insertion point, add the event and update the model.
+---------+ c1 +---------+ c2 +---------+ +---------+ +---------+
| Ev. 1@a |----->| Ev. 2@a |----->| Ev. 2@b |----->| Ev. 3@a |----->| Ev. 4@a |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Model = M2'
+---------+ c1 +---------+ c2 +---------+ c3' +---------+ +---------+
| Ev. 1@a |----->| Ev. 2@a |----->| Ev. 2@b |----->| Ev. 3@a |----->| Ev. 4@a |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Model = M3'
+---------+ c1 +---------+ c2 +---------+ c3' +---------+ c4' +---------+
| Ev. 1@a |----->| Ev. 2@a |----->| Ev. 2@b |----->| Ev. 3@a |----->| Ev. 4@a |
+---------+ +---------+ +---------+ +---------+ +---------+
^
|
|
Model = M4'
Doesn't this look extremely similar to what we did with one-way projections ? Yes ! That's because the two-way projection is just a generalization of the one-way projection case. Each time an event is aggregated, the change(s) to the model are recorded, so they can be reverted if an event has to be inserted before the end of the event log.
Nothing is this model forbids us from emitting 0, 1, or multiple changes when the projection applies
an event. This is reflected in the TwoWayProjection
API, where the forward
function may emit as
many changes as it wants, which will be reversed in the backward
function when new events get
inserted in the past :
class TwoWaySet<T> : TwoWayProjection<MutableSet<T>, T, T> {
override fun ChangeScope<T>.forward(
model: MutableSet<T>,
id: EventIdentifier,
event: T
): MutableSet<T> {
if (model.add(event)) push(event) // record the change; in this case, the inserted event
return model
}
override fun backward(
model: MutableSet<T>,
id: EventIdentifier,
event: T,
change: T
): MutableSet<T> {
return model.apply { remove(change) } // revert the change
}
}
Check the sample out here.
The TwoWayProjection
guarantees eventual convergence of all the sites, because all the events
will end up being applied in the same order on all the sites. In fact, the replicated event log can
be seen as a CRDT, whose value is simply the result of applying the projection to all the events
in a deterministic order guaranteed by the Lamport timestamps.
You'll probably have noticed that in the g-counter example, the whole state of the CRDT is essentially stored in each event, and these past events could be deleted to reduce the meta-data overhead of the log.
The Echo core library offers two helper classes to let you do that : AbstractMutableEventLog
and AbstractMutableHistory
. A mutable history is just an event log with an aggregate. By extending
either of these two classes, you'll then be able to override the fun partialInsert()
method, and
eventually call partialRemove()
to remove previously added events.
class MyHistory :
AbstractMutableHistory<MutableMarkdownParty>(
initial = ...,
projection = ...,
) {
override fun partialInsert(
id: EventIdentifier,
array: ByteArray,
from: Int,
until: Int,
) {
// eventually call super.partialRemove(seqno, site) here
// you can also and store additional state in the history if needed
// finally, you can insert actually insert the operation, if appropriate
super.partialInsert(id, array, from, until)
}
}
You can find a real-life use-case here , where this API is used to compact cursor move events in markdown.party