Skip to content


Convert orswot to use maps
Browse files Browse the repository at this point in the history
  • Loading branch information
sargun committed Nov 11, 2016
1 parent 26247f2 commit e1968e0
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 45 deletions.
1 change: 1 addition & 0 deletions c_src/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Based on from by Loic Hoguin <[email protected]>

.PHONY: all
CURDIR := $(shell pwd)
BASEDIR := $(abspath $(CURDIR)/..)

Expand Down
117 changes: 72 additions & 45 deletions src/riak_dt_orswot.erl
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@


-export_type([orswot/0, orswot_op/0, binary_orswot/0]).
-export_type([legacy_orswot/0, orswot/0, orswot_op/0, binary_orswot/0]).

-opaque legacy_orswot() :: {riak_dt_vclock:vclock(), legacy_entries(), deferred()}.
-opaque orswot() :: {riak_dt_vclock:vclock(), entries(), deferred()}.
%% Only removes can be deferred, so a list of members to be removed
%% per context.
Expand All @@ -108,7 +109,8 @@
%% a dict of member() -> minimal_clock() mappings. The
%% `minimal_clock()' is a more effecient way of storing knowledge
%% about adds / removes than a UUID per add.
-type entries() :: [{member(), minimal_clock()}].
-type legacy_entries() ::[{member(), minimal_clock()}].
-type entries() :: #{member() => minimal_clock()}.

%% a minimal clock is just the dots for the element, each dot being an
%% actor and event counter for when the element was added.
Expand All @@ -120,35 +122,41 @@

-spec new() -> orswot().
new() ->
{riak_dt_vclock:fresh(), orddict:new(), orddict:new()}.
{riak_dt_vclock:fresh(), maps:new(), orddict:new()}.

%% @doc sets the clock in the Set to that `Clock'. Used by a
%% containing Map for sub-CRDTs
-spec parent_clock(riak_dt_vclock:vclock(), orswot()) -> orswot().
parent_clock(Clock, {_SetClock, Entries, Deferred}) ->
{Clock, Entries, Deferred}.

-spec value(orswot()) -> [member()].
-spec value(orswot() | legacy_orswot()) -> [member()].
value({_Clock, Entries, _Deferred}) when is_list(Entries) ->
[K || {K, _Dots} <- orddict:to_list(Entries)];
value({_Clock, Entries, _Deferred}) ->
[K || {K, _Dots} <- orddict:to_list(Entries)].
[K || {K, _Dots} <- maps:to_list(Entries)].

-spec value(orswot_q(), orswot()) -> term().
-spec value(orswot_q(), orswot() | legacy_orswot()) -> term().
value(size, {_Clock, Entries, _Deferred}) when is_map(Entries) ->
value(size, ORset) ->
value({contains, Elem}, ORset) ->
lists:member(Elem, value(ORset)).

%% @doc take a list of Set operations and apply them to the set.
%% NOTE: either _all_ are applied, or _none_ are.
-spec update(orswot_op(), actor() | dot(), orswot()) -> {ok, orswot()} |
-spec update(orswot_op(), actor() | dot(), orswot() | legacy_orswot()) -> {ok, orswot()} |
update(Op, Actor, {Clock, Entries0, Deferred}) when is_list(Entries0) ->
Entries1 = maps:from_list(Entries0),
update(Op, Actor, {Clock, Entries1, Deferred});
update({update, Ops}, Actor, ORSet) ->
apply_ops(Ops, Actor, ORSet);
update({add, Elem}, Actor, ORSet) ->
{ok, add_elem(Actor, ORSet, Elem)};
update({remove, Elem}, _Actor, ORSet) ->
{_Clock, Entries, _Deferred} = ORSet,
remove_elem(orddict:find(Elem, Entries), Elem, ORSet);
update({remove, Elem}, _Actor, ORSet = {_Clock, Entries, _Deferred}) ->
remove_elem(maps:find(Elem, Entries), Elem, ORSet);
update({add_all, Elems}, Actor, ORSet) ->
ORSet2 = lists:foldl(fun(E, S) ->
add_elem(Actor, S, E) end,
Expand All @@ -160,8 +168,11 @@ update({add_all, Elems}, Actor, ORSet) ->
update({remove_all, Elems}, Actor, ORSet) ->
remove_all(Elems, Actor, ORSet).

-spec update(orswot_op(), actor() | dot(), orswot(), riak_dt:context()) ->
-spec update(orswot_op(), actor() | dot(), legacy_orswot() | orswot(), riak_dt:context()) ->
{ok, orswot()} | precondition_error().
update(Op, Actor, {Clock, Entries0, Deferred}, Context) when is_list(Entries0) ->
Entries1 = map:from_list(Entries0),
update(Op, Actor, {Clock, Entries1, Deferred}, Context);
update(Op, Actor, ORSet, undefined) ->
update(Op, Actor, ORSet);
update({add, Elem}, Actor, ORSet, _Ctx) ->
Expand All @@ -172,14 +183,14 @@ update({remove, Elem}, _Actor, {Clock, Entries, Deferred}, Ctx) ->
%% have this element, we can drop any dots it has that the
%% Context has seen.
Deferred2 = defer_remove(Clock, Ctx, Elem, Deferred),
case orddict:find(Elem, Entries) of
case maps:find(Elem, Entries) of
{ok, ElemClock} ->
ElemClock2 = riak_dt_vclock:subtract_dots(ElemClock, Ctx),
case ElemClock2 of
[] ->
{ok, {Clock, orddict:erase(Elem, Entries), Deferred2}};
{ok, {Clock, maps:remove(Elem, Entries), Deferred2}};
_ ->
{ok, {Clock, orddict:store(Elem, ElemClock2, Entries), Deferred2}}
{ok, {Clock, maps:put(Elem, ElemClock2, Entries), Deferred2}}
error ->
%% Do we not have the element because we removed it
Expand Down Expand Up @@ -260,16 +271,22 @@ remove_all([Elem | Rest], Actor, ORSet, Ctx) ->
{ok, ORSet2} = update({remove, Elem}, Actor, ORSet, Ctx),
remove_all(Rest, Actor, ORSet2, Ctx).

-spec merge(orswot(), orswot()) -> orswot().
-spec merge(orswot() | legacy_orswot(), orswot() | legacy_orswot()) -> orswot().
merge({LHSClock, LHSEntries0, LHSDeferred}, RHS) when is_list(LHSEntries0) ->
LHSEntries1 = maps:from_list(LHSEntries0),
merge({LHSClock, LHSEntries1, LHSDeferred}, RHS);
merge(LHS, {RHSClock, RHSEntries0, RHSDeferred}) when is_list(RHSEntries0) ->
RHSEntries1 = maps:from_list(RHSEntries0),
merge(LHS, {RHSClock, RHSEntries1, RHSDeferred});
merge({Clock, Entries, Deferred}, {Clock, Entries, Deferred}) ->
{Clock, Entries, Deferred};
merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferred}=RHS) ->
Clock = riak_dt_vclock:merge([LHSClock, RHSClock]),
%% If an element is in both dicts, merge it. If it occurs in one,
%% then see if its dots are dominated by the others whole set
%% clock. If so, then drop it, if not, keep it.
LHSKeys = sets:from_list(orddict:fetch_keys(LHSEntries)),
RHSKeys = sets:from_list(orddict:fetch_keys(RHSEntries)),
LHSKeys = sets:from_list(maps:keys(LHSEntries)),
RHSKeys = sets:from_list(maps:keys(RHSEntries)),
CommonKeys = sets:intersection(LHSKeys, RHSKeys),
LHSUnique = sets:subtract(LHSKeys, CommonKeys),
RHSUnique = sets:subtract(RHSKeys, CommonKeys),
Expand All @@ -279,7 +296,7 @@ merge({LHSClock, LHSEntries, LHSDeferred}=LHS, {RHSClock, RHSEntries, RHSDeferre
Entries = merge_disjoint_keys(RHSUnique, RHSEntries, LHSClock, Entries0),

Deffered = merge_deferred(LHSDeferred, RHSDeferred),
apply_deferred(Clock, dict:to_list(Entries), Deffered).
apply_deferred(Clock, Entries, Deffered).

%% @private merge the deffered operations for both sets.
-spec merge_deferred(deferred(), deferred()) -> deferred().
Expand All @@ -305,18 +322,17 @@ apply_deferred(Clock, Entries, Deferred) ->

%% @doc check if each element in `Entries' should be in the merged
%% set.
-spec merge_disjoint_keys(set(), orddict:orddict(),
riak_dt_vclock:vclock(), dict:dict()) -> dict:dict().
merge_disjoint_keys(Keys, Entries0, SetClock, Accumulator) ->
Entries1 = dict:from_list(Entries0),
-spec merge_disjoint_keys(set(), entries(),
riak_dt_vclock:vclock(), entries()) -> entries().
merge_disjoint_keys(Keys, Entries, SetClock, Accumulator) ->
sets:fold(fun(Key, Acc) ->
Dots = dict:fetch(Key, Entries1),
Dots = maps:get(Key, Entries),
case riak_dt_vclock:descends(SetClock, Dots) of
false ->
%% Optimise the set of stored dots to
%% include only those unseen
NewDots = riak_dt_vclock:subtract_dots(Dots, SetClock),
dict:store(Key, NewDots, Acc);
Acc#{Key => NewDots};
true ->
Expand All @@ -327,7 +343,7 @@ merge_disjoint_keys(Keys, Entries0, SetClock, Accumulator) ->
%% @doc merges the minimal clocks for the common entries in both sets.
-spec merge_common_keys(set(), {riak_dt_vclock:vclock(), entries(), deferred()},
{riak_dt_vclock:vclock(), entries(), deferred()}) ->
merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries, _}) ->

%% If both sides have the same values, some dots may still need to
Expand All @@ -338,8 +354,8 @@ merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries,
%% dominated by the other sides clock

sets:fold(fun(Key, Acc) ->
V1 = orddict:fetch(Key, LHSEntries),
V2 = orddict:fetch(Key, RHSEntries),
V1 = maps:get(Key, LHSEntries),
V2 = maps:get(Key, RHSEntries),

CommonDots = sets:intersection(sets:from_list(V1), sets:from_list(V2)),
LHSUnique = sets:to_list(sets:subtract(sets:from_list(V1), CommonDots)),
Expand All @@ -350,47 +366,58 @@ merge_common_keys(CommonKeys, {LHSClock, LHSEntries, _}, {RHSClock, RHSEntries,
%% Perfectly possible that an item in both sets should be dropped
case V of
[] ->
dict:erase(Key, Acc);
maps:remove(Key, Acc);
_ ->
dict:store(Key, V, Acc)
maps:put(Key, V, Acc)

-spec equal(orswot(), orswot()) -> boolean().
-spec equal(orswot() | legacy_orswot(), orswot() | legacy_orswot()) -> boolean().
equal({Clock, Entries0, Deferred}, Rhs) when is_list(Entries0) ->
Entries1 = maps:from_list(Entries0),
equal({Clock, Entries1, Deferred}, Rhs);
equal(Lhs, {Clock, Entries0, Deferred}) when is_list(Entries0) ->
Entries1 = maps:from_list(Entries0),
equal(Lhs, {Clock, Entries1, Deferred});
equal({Clock1, Entries1, _}, {Clock2, Entries2, _}) ->
riak_dt_vclock:equal(Clock1, Clock2) andalso
orddict:fetch_keys(Entries1) == orddict:fetch_keys(Entries2) andalso
Entries1 == Entries2 andalso
clocks_equal(Entries1, Entries2).

-spec clocks_equal(orddict:orddict(), orddict:orddict()) -> boolean().
clocks_equal([], _) ->
-spec clocks_equal(entries(), entries()) -> boolean().
clocks_equal(EntriesLhs, EntriesRhs) ->
KeysLhs = maps:keys(EntriesLhs),
clocks_equal(KeysLhs, EntriesLhs, EntriesRhs).

clocks_equal([], _, _) ->
clocks_equal([{Elem, Clock1} | Rest], Entries2) ->
Clock2 = orddict:fetch(Elem, Entries2),
case riak_dt_vclock:equal(Clock1, Clock2) of
clocks_equal([Elem|Rest], EntriesLhs, EntriesRhs) ->
ClockLhs = maps:get(Elem, EntriesLhs),
ClockRhs = maps:get(Elem, EntriesRhs),
case riak_dt_vclock:equal(ClockLhs, ClockRhs) of
true ->
clocks_equal(Rest, Entries2);
clocks_equal(Rest, EntriesLhs, EntriesRhs);
false ->

%% Private
-spec add_elem(actor() | dot(), orswot(), member()) -> orswot().
add_elem(Dot, {Clock, Entries, Deferred}, Elem) when is_tuple(Dot) ->
{riak_dt_vclock:merge([Clock, [Dot]]), orddict:store(Elem, [Dot], Entries), Deferred};
{riak_dt_vclock:merge([Clock, [Dot]]), maps:put(Elem, [Dot], Entries), Deferred};
add_elem(Actor, {Clock, Entries, Deferred}, Elem) ->
NewClock = riak_dt_vclock:increment(Actor, Clock),
Dot = [{Actor, riak_dt_vclock:get_counter(Actor, NewClock)}],
{NewClock, orddict:store(Elem, Dot, Entries), Deferred}.
{NewClock, maps:put(Elem, Dot, Entries), Deferred}.

-spec remove_elem({ok, riak_dt_vclock:vclock()} | error,
member(), {riak_dt_vclock:vclock(), orddict:orddict(), deferred()}) ->
member(), orswot()) ->
{ok, {riak_dt_vclock:vclock(), orddict:orddict(), deferred()}} |
remove_elem({ok, _VClock}, Elem, {Clock, Dict, Deferred}) ->
{ok, {Clock, orddict:erase(Elem, Dict), Deferred}};
remove_elem({ok, _VClock}, Elem, {Clock, Entries, Deferred}) ->
{ok, {Clock, maps:remove(Elem, Entries), Deferred}};
remove_elem(_, Elem, _ORSet) ->
{error, {precondition, {not_present, Elem}}}.

Expand All @@ -414,9 +441,9 @@ stats(ORSWOT) ->
stat(actor_count, {Clock, _Dict, _}) ->
stat(element_count, {_Clock, Dict, _}) ->
stat(max_dot_length, {_Clock, Dict, _}) ->
orddict:fold(fun(_K, Dots, Acc) ->
maps:fold(fun(_K, Dots, Acc) ->
max(length(Dots), Acc)
end, 0, Dict);
stat(deferred_length, {_Clock, _Dict, Deferred}) ->
Expand Down

0 comments on commit e1968e0

Please sign in to comment.