Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Clustering #38

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 218 additions & 0 deletions text/0000-clustering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
- Feature Name: clustering
- Start Date: 2020-12-16
- Tremor Issue: [tremor-rs/tremor-runtime#0000](https://github.com/tremor-rs/tremor-runtime/issues/0000)
- RFC PR: [tremor-rs/tremor-rfcs#0000](https://github.com/tremor-rs/tremor-rfcs/pull/0000)

# Summary
[summary]: #summary

TODO expand

Implement basic clustering for Tremor.

# Motivation
[motivation]: #motivation

TODO expand more

Add clustering to Tremor, so that it’s a true distributed system. This will add
resiliency for the pipelines deployed under Tremor, in terms of both host
failures as well as application logic (eg: correct distributed throttling for
logs/metrics even when some Tremor nodes go down). Moreover, it allows for
denser Tremor deployments with regards to the total host footprint (thus saving
more on hardware resources), and also eases the propagation/synchronization of
changes that need to touch all hosts, opening up nicer ways of doing tremor
pipeline deployments.

# Guide-level explanation
[guide-level-explanation]: #guide-level-explanation

Tremor can be operated on a clustered mode, which allows operators to use a
group of tremor nodes collaboratively for their event processing needs, giving
the illusion of a single running tremor instance to the outside world.

A tremor cluster is composed of two sets of nodes: `coordinators` and `workers`.

Coordinator nodes are the ones coordinating the addition and removal of nodes to
the cluster. They also intercept all changes to a running cluster (basically new
workload submissions, or changes to the tremor [repository and registry
state](https://docs.tremor.rs/operations/configuration/#introduction)) and
ensure that all nodes eventually apply the changes -- anytime the [tremor
api](https://docs.tremor.rs/api/) used to submit changes from a node, the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
api](https://docs.tremor.rs/api/) used to submit changes from a node, the
api](https://docs.tremor.rs/api/) is used to submit changes from a node, the

request is routed first to the coordinator node (if the node accepting the
request is not already a coordinator). Thus, the group of coordinator nodes can
be thought of as the brain of the cluster. For the initial clustering
implementation, the allowed number of dedicated coordinator nodes is 3.

If coordinators form the brain, worker nodes are the brawn of the cluster,
hosting the actual running instances of the deployed sources, pipelines and
sinks. Worker nodes accept changes from the coordinator nodes for this and
communicate directly to the cluster-external world as their workload demands
(i.e. as part of the sources/sinks running on the node). There is no set limit
on the number of worker nodes in the cluster, though it will be bound in
practicality by the network constraints of cluster-wide communication.

An example of starting a tremor server process in these two modes:

```sh
# assuming this command is run on host1 and the 3 coordinator nodes are on
# host1, host2, host3 (host1 can be left out here if desired)
tremor server run --coordinator --peers "host1:8139,host2:8139,host3:8139"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add discovery mechanism rather than explicit host:port pairs ( a simple static lookup discovery mechanism is a perfectly valid discovery implementation mechanism but we may also require integration with consul, etcd etc... for some deployments


# assuming this command is run on host4 (all 3 coordinator nodes have to be specified)
tremor server run --worker --coordinators "host1:8139,host2:8139,host3:8139"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity, and for small clusters, mostly for testing, wouldn't it make sense to allow a node to be coordinator and worker at the same time. This way we can form a simple cluster with 3 nodes where either 1 or all three are coordinators.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it would, that would also remove a extra mainline for a non clustered version (it'd effectively be a cluster of 1)

```

The static host list specified above is the current means of cluster discovery
(i.e. how a node finds the tremor cluster to join). The use of `--coordinator`
and `--worker` flag mandates the use of `--peers` and `--coordinators`
respectively -- the server will exit with an error during initialization if one
is used without the other. The default port for cluster communication is 8139
(if port is left-out as part of host details above, that’s the assumed port),
and can be overridden by a `--cluster-port` flag for both coordinators and
workers. Example:

```sh
# cluster-port flag is relevant only when coordinator or worker flag is specified
tremor server run --coordinator --peers "host1:8139,host2:8139,host3:8139" --cluster-port 4242
```

When started without these flags, the server process simulates the behavior of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets revisit this and see if overloading server run makes sense verses adding tremor cluster run ... later. Maintaining standalone mode of operation is well captured and important for some use cases.

tremor from its pre-clustering days (i.e. multiple tremor server instances in
this mode behave independently of each other). You can think of it as a [cluster
of one](https://www.youtube.com/watch?v=wW9RCrmOOkI) too, behaving as a
coordinator and worker both):

```sh
# party of one
tremor server run
```

Each worker node runs all the sources, sinks and pipelines deployed in the
cluster. Incoming event stream is distributed among the available worker nodes
for processing, after the initial interception from the running source (TODO
document various distribution strategies here -- round-robin and consistently
hashed to start with -- and how to specify this from Troy). This is made
possible by the way pipelines are hosted. A tremor cluster starts with a
pre-defined number of vnodes (or virtual nodes) that the coordinator nodes try
to distribute evenly among the available physical worker nodes. For a pipeline
in a given deployment mapping, instances are spawned off as part of each
vnode. Taking a typical source -> pipeline -> sink flow as an example, a
source instance running on the physical node will forward the events to
pipeline instances running on the vnodes, where the actual location of the
vnode may or not be the same physical node hosting the source instance
(depending on no of vnodes and their distribution among the available nodes),
and from the pipeline vnode, events end up in the sink instance hosted in the
same physical node where the vnode was. Thus, the unit of pipeline processing
in a tremor cluster is a vnode and the number of vnodes in the cluster
effectively determines the overall number of pipeline instances running at a
time for a given mapping (irrespective of how many worker nodes there are!).

The no of vnodes in the cluster can be set at tremor server startup time (TODO a
sane default to be determined. Can make this mandatory too). It’s best to size
the cluster so that vnode count is the same (or around the same) in all the
nodes (i.e. vnode count as a multiple of the total number of worker nodes). The
cluster will continue to work even if there are less vnodes than the actual
number of nodes -- it just means the extra worker nodes there won’t be involved
in the pipeline processing.

```sh
# if there are 16 worker nodes, each worker node here gets 4 vnodes
tremor server run --worker --coordinators "host1:8139,host2:8139,host3:8139" --no-of-vnodes 64
```

The cluster can handle a failure of 1 coordinator node (out of the set 3) and if
there’s less than 2 of those nodes available, the whole cluster is deemed
unavailable. If a worker node fails (or is removed consciously from the
cluster), its pipeline workload (i.e. the vnodes) is distributed among the
remaining nodes and once the node comes online again, it receives the workload
back. Adding new worker nodes triggers a similar rebalancing of the vnodes in
the cluster.

In principle, the cluster can handle failures of all worker nodes (if there’s no
worker node, the cluster is just on a stand-by mode, accepting the workload
submissions and waiting for a worker to be available), but in practice, the
actual functionality of the cluster will be limited by the total resources
available in the cluster and the kind of workloads running there before node
failures (and if your workload is such that only one worker is needed, you are
better-off using a stand-alone tremor node, without the division of worker vs
coordinator nodes).

# Reference-level explanation
[reference-level-explanation]: #reference-level-explanation

TODO expand

* microring of coordinator nodes using Raft for consensus and a KV store for
cluster state
* macroring of worker nodes
* strongly consistent microring vs highly available macroring
* communication between coordinator and workers, and eventual consistency for
worker nodes
* hinted handoffs during node failures
* testing focus

# Drawbacks
[drawbacks]: #drawbacks

TODO

* complexity and the curse of distributed systems (both for implementation as
well as operation)
* difficulty of integrating with the current tremor featureset and semantics
* performance implications for current usecases

# Rationale and alternatives
[rationale-and-alternatives]: #rationale-and-alternatives

TODO

* paxos for consensus
* non-ring topologies

# Prior art
[prior-art]: #prior-art

TODO

(Old) prototype exploring our architecture:

* https://github.com/tremor-rs/uring

Inspirations:

* https://github.com/async-raft/async-raft
* https://github.com/tikv/raft-rs
* https://github.com/basho/riak_core
* https://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf

# Unresolved questions
[unresolved-questions]: #unresolved-questions

TODO

* Troy overlap, especially for data distribution strategies to the pipeline vnodes
* Network protocol overlap

* Default for number of vnodes (or should we make it mandatory)
* Make number of coordinator nodes configurable (eg: 3, 5, 7) even in the initial clustering offering
* Introduce a configuration file to ease setting cluster details at startup (mode, peers/coordinators, port, vnode count)
* Add cluster name configuration
* Alternate terms for `coordinator` and `worker` nodes

# Future possibilities
[future-possibilities]: #future-possibilities

TODO

* Restrict deployments by node (as opposed to having everything available
everywhere)
* Store repository only on the coordinator nodes
* Dynamic means of cluster discovery, without having to hardcode them everywhere
(especially for workers which are the ones that will mostly see node changes)
* gdrl, load balancing and other smarter usecases
* Migratable sources/sinks
* Replication for pipelines
* Option for strong consistency in worker nodes
* Ability to specify other microrings (eg: for running source or sink specific
logic)