Skip to content
Dave Parfitt edited this page Oct 3, 2013 · 23 revisions

General Notes

Note: I left out all supervisors in the links to source.

Ranch

[https://github.com/basho/ranch/tree/0.4.0-p1](Basho's repo )

A socket acceptor pool for TCP protocols.

As of 1.3/1.4/2.0, we are pinned at Ranch 0.4.0-p1 to support older version of Erlang.

Service Manager

Open sourced in Riak 1.3

Listens on a single TCP port and negotiates which protocol to start on a new connection. Ranch is used to create a connection pool and accept new socket connections. When a connection is accepted, the client supplies a hello with their revision and capabilities. The server replies in kind. The client then sends the service they wish to use, and which versions of the service they support. The server will find the highest major version in common, and highest major version in common. If there is no major version in common, the connection fails. Minor versions do not need to match. On a success, the server sends the Major version, Client minor version, and Host minor version to the client. After that, the registered module:function/5 is called and control of the socket passed to it.

Source

https://github.com/basho/riak_core/blob/develop/src/riak_core_service_mgr.erl

Connection Manager

  • connection manager (1.3 vs 1.4+) Open sourced in Riak 1.3

when a connection request comes in,

  • call the locator service to get the list of {transport, {address, port}}
  • create a linked helper process to call riak_core_connection (just once) on the next available connection (ignore blacklisted ones, they'll get picked up if a repeat is necessary)
  • on connection it transfers control of the socket back to the connmgr, casts a success message back to the connection manager and exits normally.
    • on success, the connection manager increments successful connects, reset the backoff timeout on that connection.
    • on failure, casts a failure message back to the connection manager (error, timeout etc) the connection manager marks the {Transport, {Address, Port}} as blacklisted, increases the failure counter and starts a timer for the backoff time (and updates it for next time). The connection manager checks for the next non--blacklisted endpoint in the connection request list to launch a new connection, if the list is empty call the locator service again to get a new list. If all connections are blacklisted, use send_after message to wake up and retry (perhaps with backoff time too).

Source

(Riak 1.3.2+) https://github.com/basho/riak_repl/blob/1.3/src/riak_core_connection.erl https://github.com/basho/riak_repl/blob/1.3/src/riak_core_connection_mgr.erl https://github.com/basho/riak_repl/blob/1.3/src/riak_core_connection_mgr_stats.erl

(Riak 1.4.0+) https://github.com/basho/riak_core/blob/develop/src/riak_core_connection.erl https://github.com/basho/riak_core/blob/develop/src/riak_core_connection_mgr.erl https://github.com/basho/riak_core/blob/develop/src/riak_core_connection_mgr_stats.erl

Cluster Manager

  • closed source

A cluster manager runs on every node. It registers a service via the riak_core_service_mgr with protocol 'cluster_mgr'. The service will either answer queries (if it's the leader), or foward them to the leader (if it's not the leader).

Every cluster manager instance (one per node in the cluster) is told who the leader is when there is a leader change. An outside agent is responsible for determining which instance of cluster manager is the leader. For example, the riak_repl2_leader server is probably a good place to do this from. Call set_leader_node(node(), pid()).

If I'm the leader, I answer local gen_server:call requests from non-leader cluster managers. I also establish out-bound connections to any IP address added via add_remote_cluster(ip_addr()), in order to resolve the name of the remote cluster and to collect any additional member addresses of that cluster. I keep a database of members per named cluster.

If I am not the leader, I proxy all requests to the actual leader because I probably don't have the latest inforamtion. I don't make outbound connections either.

The local cluster's members list is supplied by the members_fun in register_member_fun() API call. The cluster manager will call the registered function to get a list of the local cluster members; that function should return a list of {IP,Port} tuples in order of the least "busy" to most "busy". Busy is probably proportional to the number of connections it has for replication or handoff. The cluster manager will then hand out the full list to remote cluster managers when asked for its members, except that each time it hands our the list, it will rotate the list so that the fist "least busy" is moved to the end, and all others are pushed up the front of the list. This helps balance the load when the local connection manager asks the cluster manager for a list of IPs to connect for a single connection request. Thus, successive calls from the connection manager will appear to round-robin through the last known list of IPs from the remote cluster. The remote clusters are occasionaly polled to get a fresh list, which will also help balance the connection load on them.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_core_cluster_mgr.erl https://github.com/basho/riak_repl/blob/develop/src/riak_core_cluster_conn.erl

Realtime Replication

A Riak postcommit hook that replicates objects from a source cluster to a sink cluster. The put on the sink cluster disables all postcommit hooks for the object being written. Connectivity is established using the cluster manager, and then at a lower lever, the service manager and connection manager. Objects are queued on each node participating in the source cluster. If a node from the source cluster is gracefully shutdown, migrate all objects to other nodes in the cluster (round-robin) until the queue is empty. If the node crashes, the queue is lost, as it's stored in an ETS table. If a source cluster is connected to multiple sinks, the sinks share the same RTQ (realtime queue) on the source node. Detailed descriptions of queue overflow, overload, and queue migration below.

###TODO (LINK TO JOHN DAILY'S PRETTY DIAGRAMS WHEN THEY ARE MERGED)

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rt.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsource_conn.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsink_conn.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsink_helper.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtsource_helper.erl

Real time queue (RTQ)

A multiple consumer queue based on ETS (ie. in memory only)

The queue strives to reliably pass on realtime replication, with the aim of reducing the need to fullsync. Every item in the queue is given a sequence number when pushed. Consumers register with the queue, then pull passing in a function to receive items (executed on the queue process - it can cast/! as it desires).

Once the consumer has delievered the item, it must ack the queue with the sequence number. If multiple deliveries have taken place an ack of the highest seq number acknowledge all previous.

The queue is currently stored in a private ETS table. Once all consumers are done with an item it is removed from the table.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq.erl

1) Realtime Queue Overflow

  • if the # of objects in the queue > rtq_max_bytes, drop the oldest messages in the queue until the queue is < rtq_max_bytes

  • rtq_max_bytes documented here

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq.erl#L147

2) Realtime Queue Overload

Periodically check to see if objects are being sent too quickly to the realtime queue. If rtq_overload_threshold (default: 2000ms) messages are received within rtq_drop_report_interval (default: 5000 ms), drop and log incoming messages to prevent overloading the source node. If the rtq is in overload state, check for overload state again in rtq_overload_recover(default: 1000ms).

  • if rtq state is overloadad and the Erlang message queue <= rtq_overload_recover

    • Recovered from overloaded condition
  • if rtq is NOT overloadad and the Erlang message queue > rtq_overload_threshold

    • set rtq state to overload, drop objects until recovery

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq_overload_counter.erl

3) multiple sinks sharing the same queue

Multiple sink clusters configured on a source cluster share the same realtime queue.

4) RTQ shutdown (queue migration)

During graceful shutdown via riak stop, a node that has a realtime queue size > 0 will distribute all objects in it's queue to other nodes in the same cluster before shutting down.

#####RTQ proxy A proxy process that handles realtime messages received while and after the riak_repl application has shut down. This allows us to avoid dropping realtime messages around shutdown events.

#####RTQ Migration gen_server

A gen_server process that's spawned upon graceful shutdown of Riak. Handles distribution of objects to other nodes in the cluster (round-robin). Supports a timeout to prevent long queue migrations.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_migration.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq_proxy.erl

RT Frame

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtframe.erl

Source

RTQ Overload Counter

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq_overload_counter.erl

Source

RTQ Proxy

A proxy process that handles realtime messages received while and after the riak_repl application has shut down. This allows us to avoid dropping realtime messages around shutdown events.

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rtq_proxy.erl

Source

RT Cascading

Docs

Source

Repl hooks

Docs

Special thanks to @cv

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_util.erl#L195 https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_rt.erl#L140

Fullsync

  • proxy_get (v2 vs v3)

Fullsync Coordinator

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_fssink.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_fssource.erl

Keylist sync strategy

Not using merkle trees :-(

Source

AAE sync strategy

Docs

Currently a Technology Preview.

Built using existing AAE trees. If AAE isn't enabled on the source and sink clusters, default back to keylist strategy.

Source

Riak CS proxy_get

Allows Riak CS to pull CS blocks across datacenters given a CS manifest and associated cluster_id. Requests are initiated from CS and sent to a proxy_get block requester running on a sink cluster. The proxy_get block requester communicates with the proxy_get block provider, which runs on a source cluster. Note, this DOES appear backwards, but simplifies firewall config. All connections are initiated from the source cluster -> sink cluster. The source cluster must have {proxy_get, enabled} in the riak_repl section of app.config. Additionally, the user must run the riak-repl proxy-get command on the source cluster. The source cluster initially establishes communications with the sink cluster to spin up the block request on the sink cluster.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg_block_provider.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg_block_requester.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_pg_proxy.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_pb_get.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_cs.erl

Reduced N

A riak_kv_mutator to shrink down objects on a put, then expand them on a get. Shrinking is based either on cluster or bucket settings.

Either due to size or legal constraints, it is sometimes desirable to reduce the amount of data stored on a sink cluster. This mutator can take objects that are stored fully on another cluster and reduce the disk space used. It can then pull the full object from the source cluster if needed. In short, one is trading CPU and (possibly) bandwidth to reduce storage cost.

Like other kv_mutators, this module is used on every put and any get when the module was used on the object's put.

Configuration

When a put is done, the object is checked for the `cluster_of_record' metadata. If that data is missing or has the same name as the cluster, the object is not reduced. Beyond that, when to reduce can be configured.

By default the mutator never reduces an object. To enable reduction cluster-wide, use riak_core_metadata and set {'riak_repl', 'reduced_n'}, 'full_objects}''' to one of 'always''', ``'never''', or a positive integer.

The ``'full_objects''' bucket property override cluster settings, and uses the same option values.

The ``'always''' option means to always use full_objects; in other words never reduce the objects. This is the default value.

A positive integer means keep that many or N full objects, whichever is smaller. N is the usual N val on put. If a full value cannot be found, a proxy get is attempted.

The ``'never''' option means all requests for the object will use proxy get to the cluster of record. If the object cannot be retrieved from there, a not found value is returned.

For proxy_get to work, the source cluster must enable proxy_get to the sink cluster.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_reduced.erl

Replication Ring utilities

This is where replication stores persistent config info, such as rt_enabled, fs_enabled, replication modes, nat maps, and rt cascades info.

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_ring.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_ring_handler.erl

gen_leader

Source

  • gen_leader
  • riak_repl2_leader

Source (v2 replication)

https://github.com/basho/riak_repl/blob/develop/src/gen_leader.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_leader.erl

Source (v3 replication)

https://github.com/basho/riak_repl/blob/develop/src/gen_leader.erl (shared between v2+v3) https://github.com/basho/riak_repl/blob/develop/src/riak_repl2_leader.erl

Replication stats

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_console.erl https://github.com/basho/riak_repl/blob/develop/src/riak_repl_wm_stats.erl

Command line interface

Source

https://github.com/basho/riak_repl/blob/develop/src/riak_repl_console.erl https://github.com/basho/riak_ee/blob/develop/rel/files/riak-repl Note: The riak-repl script is in the riak_ee repo.

Per bucket replication settings

Source

http://docs.basho.com/riakee/latest/cookbooks/Multi-Data-Center-Replication-Per-Bucket/

Clarification of cluster_id vs cluster_name