General Notes

Toplevel Mindmap

Note: I left out all supervisors in the links to source.

High level description + diagrams for replication

Replication Constants


[](Basho's repo )

"A socket acceptor pool for TCP protocols."

As of 1.3/1.4/2.0, we are pinned at Ranch 0.4.0-p1 to support older version of Erlang in riak_core.

Service Manager

Open sourced in Riak 1.3

Listens on a single TCP port and negotiates which protocol to start on a new connection. Ranch is used to create a connection pool and accept new socket connections. When a connection is accepted, the client supplies a hello with their revision and capabilities. The server replies in kind. The client then sends the service they wish to use, and which versions of the service they support. The server will find the highest major version in common, and highest major version in common. If there is no major version in common, the connection fails. Minor versions do not need to match. On a success, the server sends the Major version, Client minor version, and Host minor version to the client. After that, the registered module:function/5 is called and control of the socket passed to it.

Services typically run on the sink side of a connection.


Connection Manager

  • connection manager (1.3 vs 1.4+) Open sourced in Riak 1.3

when a connection request comes in,

  • call the locator service to get the list of {transport, {address, port}}
  • create a linked helper process to call riak_core_connection (just once) on the next available connection (ignore blacklisted ones, they'll get picked up if a repeat is necessary)
  • on connection it transfers control of the socket back to the connection manager, casts a success message back to the connection manager and exits normally.
    • on success, the connection manager increments successful connects, reset the backoff timeout on that connection.
    • on failure, casts a failure message back to the connection manager (error, timeout etc) the connection manager marks the {Transport, {Address, Port}} as blacklisted, increases the failure counter and starts a timer for the backoff time (and updates it for next time). The connection manager checks for the next non--blacklisted endpoint in the connection request list to launch a new connection, if the list is empty call the locator service again to get a new list. If all connections are blacklisted, use send_after message to wake up and retry (perhaps with backoff time too).


Secures your internets.


(Riak 1.3.2+)

(Riak 1.4.0+)

Cluster Manager

closed source

A cluster manager runs on every node. It registers a service via the riak_core_service_mgr with protocol 'cluster_mgr'. The service will either answer queries (if it's the leader), or foward them to the leader (if it's not the leader).

Every cluster manager instance (one per node in the cluster) is told who the leader is when there is a leader change. An outside agent is responsible for determining which instance of cluster manager is the leader. For example, the riak_repl2_leader server is probably a good place to do this from. Call set_leader_node(node(), pid()).

If I'm the leader, I answer local gen_server:call requests from non-leader cluster managers. I also establish out-bound connections to any IP address added via add_remote_cluster(ip_addr()), in order to resolve the name of the remote cluster and to collect any additional member addresses of that cluster. I keep a database of members per named cluster.

If I am not the leader, I proxy all requests to the actual leader because I probably don't have the latest inforamtion. I don't make outbound connections either.

The local cluster's members list is supplied by the members_fun in register_member_fun() API call. The cluster manager will call the registered function to get a list of the local cluster members; that function should return a list of {IP,Port} tuples in order of the least "busy" to most "busy". Busy is probably proportional to the number of connections it has for replication or handoff. The cluster manager will then hand out the full list to remote cluster managers when asked for its members, except that each time it hands our the list, it will rotate the list so that the fist "least busy" is moved to the end, and all others are pushed up the front of the list. This helps balance the load when the local connection manager asks the cluster manager for a list of IPs to connect for a single connection request. Thus, successive calls from the connection manager will appear to round-robin through the last known list of IPs from the remote cluster. The remote clusters are occasionaly polled to get a fresh list, which will also help balance the connection load on them.

Service manager protocol name: cluster_mgr


Realtime Replication

A Riak postcommit hook that replicates objects from a source cluster to a sink cluster. The put on the sink cluster disables all postcommit hooks for the object being written. Connectivity is established using the cluster manager, and then at a lower lever, the service manager and connection manager. Objects are queued on each node participating in the source cluster. If a node from the source cluster is gracefully shutdown, migrate all objects to other nodes in the cluster (round-robin) until the queue is empty. If the node crashes, the queue is lost, as it's stored in an ETS table. If a source cluster is connected to multiple sinks, the sinks share the same RTQ (realtime queue) on the source node. Detailed descriptions of queue overflow, overload, and queue migration below.

-define(REPL_HOOK_BNW, {struct,
                    [{<<"mod">>, <<"riak_repl2_rt">>},
                     {<<"fun">>, <<"postcommit">>}]}).

-define(REPL_HOOK12, {struct,
                    [{<<"mod">>, <<"riak_repl_leader">>},
                     {<<"fun">>, <<"postcommit">>}]}).

-define(REPL_MODES, [{mode_repl12,?REPL_HOOK12}, {mode_repl13,?REPL_HOOK_BNW}]).

Service manager protocol: realtime


Real time queue (RTQ)

A multiple consumer queue based on ETS (ie. in memory only)

The queue strives to reliably pass on realtime replication, with the aim of reducing the need to fullsync. Every item in the queue is given a sequence number when pushed. Consumers register with the queue, then pull passing in a function to receive items (executed on the queue process - it can cast/! as it desires).

Once the consumer has delievered the item, it must ack the queue with the sequence number. If multiple deliveries have taken place an ack of the highest seq number acknowledge all previous.

The queue is currently stored in a private ETS table. Once all consumers are done with an item it is removed from the table.


Realtime Queue Overflow

  • if the # of objects in the queue > rtq_max_bytes, drop the oldest messages in the queue until the queue is < rtq_max_bytes

  • rtq_max_bytes documented here


multiple sinks sharing the same queue

Multiple sink clusters configured on a source cluster share the same realtime queue.

RTQ shutdown (queue migration)

During graceful shutdown via riak stop, a node that has a realtime queue size > 0 will distribute all objects in it's queue to other nodes in the same cluster before shutting down.

RTQ proxy

A proxy process that handles realtime messages received while and after the riak_repl application has shut down. This allows us to avoid dropping realtime messages around shutdown events.

RTQ Migration gen_server

A gen_server process that's spawned upon graceful shutdown of Riak. Handles distribution of objects to other nodes in the cluster (round-robin). Supports a timeout to prevent long queue migrations.


RT Frame

Encodes realtime repl requests as binaries.



RTQ Overload Counter

Periodically check to see if objects are being sent too quickly to the realtime queue. If rtq_overload_threshold (default: 2000ms) messages are received within rtq_drop_report_interval (default: 5000 ms), drop and log incoming messages to prevent overloading the source node. If the rtq is in overload state, check for overload state again in rtq_overload_recover(default: 1000ms).

  • if rtq state is overloadad and the Erlang message queue <= rtq_overload_recover

    • Recovered from overloaded condition
  • if rtq is NOT overloadad and the Erlang message queue > rtq_overload_threshold

    • set rtq state to overload, drop objects until recovery


RTQ Proxy

A proxy process that handles realtime messages received while and after the riak_repl application has shut down. This allows us to avoid dropping realtime messages around shutdown events.


RT Cascading



Repl hooks


Special thanks to @cv



Fullsync performs science.

Starting fullsync is idempotent, ie. only one fullsync coordinator per sink is started.


Fullsync Coordinator

Coordinates full sync replication parallelism. Uses 3 riak_repl application env's: fullsync_on_connect, max_fssource_cluster, and max_fssource_node (documented here.

{fullsync_on_connect, boolean()}

If true, as soon as a connection to the remote cluster is established, fullsync starts. If false, then an explicit start must be sent. Defaults to true.

{max_fssource_cluster, pos_integer()}

How many sources can be started across all nodes in the local cluster. Defaults to 5.

{max_fssource_node, pos_integer()}

How many sources can be started on a single node, provided starting one wouldn't exceede the max_fssource_cluster setting. Defaults to 1.

{max_fssink_node, pos_integer()}

Limits the number of fullsync workers allowed to run on each individual node in a sink cluster. This is a hard limit for all fullsync sources interacting with the sink cluster. Thus, multiple simultaneous source connections to the sink cluster will have to share the sink node’s number of maximum connections. Only affects nodes on the sink cluster on which this parameter is defined via the configuration file or command line.

Service manager protocol: fullsync


Fullsync Coordinator_srv

Service which replies to requests for the IP:Port of the node where a given partition lives. Responsible for determining if the node for a partition is available for use as the sink. Reservations and actual running sinks are used to determine availability. Once a reservation is issued, it is up to the keylist_client to claim it.

Service manager protocol: fs_coordinate


Node Reserver

Hold reservations for new sink processes on the node. It takes into account the running sinks and how many reservations there are for that node.

See the max_fssink_node fullsync setting:


Keylist sync strategy

Ask the sink node to build a keylist and return to the source node. Not using merkle trees.

  • Bloom fold capability, fallback to non-bloom if source + sink don't support it

This diagram is for v2, but still relevant for keylisting.

alt text


AAE sync strategy


Currently a Technology Preview.

  • Implement the AAE exchange over TCP/IP
  • Built using existing AAE LevelDB/riak_kv trees. If AAE isn't enabled on the source and sink clusters, default back to keylist strategy.
  • Different ring sizes between source and sink are going to fallback to keylist strategy
  • Expect compare time to be linear with %differences
  • No additional read load from a fold
  • AAE/repl is not folding over keyspace, so it doesn't have to compute the hashes by fetching each object
    • this == fast!


  • if source and sink are significantly different, then it will take more time to walk through the tree
  • if one side is empty, then extremely fast
    • useful for priming new clusters


{riak_kv, [ {anti_entropy, {on, []}}, ... ]}

{riak_repl, [ {fullsync_strategy, aae}, ... ]}

fullsync_strategy keylist set to AAE on BOTH SIDES


Riak CS proxy_get

Allows Riak CS to pull CS blocks across datacenters given a CS manifest and associated cluster_id. Requests are initiated from CS and sent to a proxy_get block requester running on a sink cluster. The proxy_get block requester communicates with the proxy_get block provider, which runs on a source cluster. Note, this DOES appear backwards, but simplifies firewall config. All TCP connections are initiated from the source cluster -> sink cluster. The source cluster must have {proxy_get, enabled} in the riak_repl section of app.config. Additionally, the user must run the riak-repl proxy-get command on the source cluster. The source cluster initially establishes communications with the sink cluster to spin up the block requester on the sink cluster.

proxy_get diagram

  • Thanks to John Daily for the pretty diagram

  • Service manager protocol: proxy_get


pg_proxy keeps track of which node is servicing proxy_get block requester requests in the cluster. A client can always make requests to the leader pg_proxy, which will then be routed to the appropriate node in the cluster.

V2 vs V3

  • v2 proxy_get is weak and bought the farm.

    • only ran on the current source cluster leader
  • v3 proxy_get is implemented using the new "Brave New World" service/connection manager. Each node in the source cluster can provide CS blocks to a node in the sink cluster. Thus, v3 proxy_get removes the bottleneck from getting CS blocks for the leader.



closed source

This module is magic and one should not attempt to understant it.

This module provides tunneled proxy-get over the erlang client API. This allows us to define custom protobuffs messages for enterprise use cases with affecting public API's. Currently, only provides a get and clusterid implementation.

-define(PB_MSG_PROXY_GET, 128).
-define(PB_MSG_GET_CLUSTER_ID, 129).
-define(PB_MSG_RESP_CLUSTER_ID, 130).



Replication Ring utilities

This is where replication stores persistent config info, such as rt_enabled, fs_enabled, replication modes, nat maps, and rt cascades info.




  • gen_leader
  • riak_repl2_leader

Source (v2 replication)

Source (v3 replication) (shared between v2+v3)

Replication stats

Clarification of cluster_id vs cluster_name

  • in the context of riak_repl
    • cluster_id is a timestamp that indicates when a ring for a cluster was "born". This is a pretty darn unique value.
    • cluster_name is a string used to give a friendly name to a cluster from the riak-repl clustername command.
  • in the context of riak_core/riak_kv, cluster_name may actually indicate cluster_id
  • be careful!

Command line interface

Source Note: The riak-repl script is in the riak_ee repo.

Per bucket replication settings

A repl bucket property to control if objects in the bucket participate in realtime, fullsync, or bolth.

Example of disabling:

curl -v -XPUT \
  -H "Content-Type: application/json" \
  -d '{"props":{"repl":false}}'

Example of enabling:

curl -v -XPUT \
  -H "Content-Type: application/json" \
  -d '{"props":{"repl":true}}'


Riak Enterprise 2.0 features

Reduced N

A riak_kv_mutator to shrink down objects on a put, then expand them on a get. Shrinking is based either on cluster or bucket settings.

Either due to size or legal constraints, it is sometimes desirable to reduce the amount of data stored on a sink cluster. This mutator can take objects that are stored fully on another cluster and reduce the disk space used. It can then pull the full object from the source cluster if needed. In short, one is trading CPU and (possibly) bandwidth to reduce storage cost.

Like other kv_mutators, this module is used on every put and any get when the module was used on the object's put.


When a put is done, the object is checked for the `cluster_of_record' metadata. If that data is missing or has the same name as the cluster, the object is not reduced. Beyond that, when to reduce can be configured.

By default the mutator never reduces an object. To enable reduction cluster-wide, use riak_core_metadata and set {'riak_repl', 'reduced_n'}, 'full_objects}''' to one of 'always''', ``'never''', or a positive integer.

The ``'full_objects''' bucket property override cluster settings, and uses the same option values.

The ``'always''' option means to always use full_objects; in other words never reduce the objects. This is the default value.

A positive integer means keep that many or N full objects, whichever is smaller. N is the usual N val on put. If a full value cannot be found, a proxy get is attempted.

The ``'never''' option means all requests for the object will use proxy get to the cluster of record. If the object cannot be retrieved from there, a not found value is returned.

For proxy_get to work, the source cluster must enable proxy_get to the sink cluster.

The cluster-wide option can be configured using the riak-repl script:

$ riak-repl full_objects [always | never | <positive_integer>]

Leaving off the parameter simply prints the current setting.


all hail Micah Warren (@lordnull)


Handle decommissioned clusterid for replicated CS manifests

all hail Jon Anderson (@bowrocker)

Provide a redirection mechanism, via configuration, allowing proxy_get to be directed to an alternate cluster id from one that may be decommissioned or otherwise unreachable.

Initially, the configuration will be accomplished using "raw" cluster ids, instead of cluster names, since there is no existing, non dist-erlang API for getting a remote cluster name from a connected cluster. This will be addressed in follow-on versions.


Four riak-repl sub-commands are added:

  • show-local-cluster-id: displays the local cluster_id as a tuple, to be used in further configurations
  • add-block-provider-redirect [from-cluster-id] [to-cluster-id]
  • delete-block-provider-redirect [from-cluster-id]
  • show-block-provider-redirect

