diff --git a/src/riak_dt.erl b/src/riak_dt.erl index 2a13699..07a3bd4 100644 --- a/src/riak_dt.erl +++ b/src/riak_dt.erl @@ -43,12 +43,13 @@ %% the clock and the crdt, and if relevant, returns to crdt with the %% given clock as it's own. -callback parent_clock(riak_dt_vclock:vclock(), crdt()) -> - crdt(). + crdt(). +-callback get_deferred(crdt()) -> [context()]. -callback merge(crdt(), crdt()) -> crdt(). -callback equal(crdt(), crdt()) -> boolean(). -callback to_binary(crdt()) -> binary(). -callback to_binary(TargetVers :: pos_integer(), crdt()) -> - binary(). + binary(). -callback from_binary(binary()) -> crdt(). -callback from_binary(TargetVers :: pos_integer(), binary()) -> crdt(). @@ -76,7 +77,6 @@ to_binary(Term) -> from_binary(Binary) -> binary_to_term(Binary). - %% @private -spec dict_to_orddict(dict()) -> orddict:orddict(). dict_to_orddict(Dict) -> diff --git a/src/riak_dt_disable_flag.erl b/src/riak_dt_disable_flag.erl index 6447185..9a48fc6 100644 --- a/src/riak_dt_disable_flag.erl +++ b/src/riak_dt_disable_flag.erl @@ -30,7 +30,7 @@ -behaviour(riak_dt). -export([new/0, value/1, value/2, update/3, merge/2, equal/2, from_binary/1, to_binary/1, stats/1, stat/2]). --export([update/4, parent_clock/2]). +-export([update/4, parent_clock/2, get_deferred/1]). -export([to_binary/2, from_binary/2]). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -65,7 +65,7 @@ update(disable, _Actor, _Flag) -> {ok, off}. -spec update(disable_flag_op(), riak_dt:actor(), disable_flag(), riak_dt:context()) -> - {ok, disable_flag()}. + {ok, disable_flag()}. update(Op, Actor, Flag, _Ctx) -> update(Op, Actor, Flag). @@ -82,6 +82,9 @@ equal(FA,FB) -> parent_clock(_Clock, Flag) -> Flag. +-spec get_deferred(disable_flag()) -> []. +get_deferred(_Flag) -> []. + -spec from_binary(binary()) -> disable_flag(). from_binary(<>) -> off; from_binary(<>) -> on. diff --git a/src/riak_dt_emcntr.erl b/src/riak_dt_emcntr.erl index f559b8a..88ef61e 100644 --- a/src/riak_dt_emcntr.erl +++ b/src/riak_dt_emcntr.erl @@ -50,7 +50,7 @@ -export([to_binary/1, from_binary/1]). -export([to_binary/2, from_binary/2]). -export([stats/1, stat/2]). --export([parent_clock/2, update/4]). +-export([update/4, parent_clock/2, get_deferred/1]). %% EQC API -ifdef(EQC). @@ -85,6 +85,8 @@ new() -> parent_clock(Clock, {_, Cntr}) -> {Clock, Cntr}. +get_deferred(_CRDT) -> []. + %% @doc the current integer value of the counter -spec value(emcntr()) -> integer(). value({_Clock, PNCnt}) -> diff --git a/src/riak_dt_enable_flag.erl b/src/riak_dt_enable_flag.erl index 29a41dc..a3d7966 100644 --- a/src/riak_dt_enable_flag.erl +++ b/src/riak_dt_enable_flag.erl @@ -30,7 +30,7 @@ -behaviour(riak_dt). -export([new/0, value/1, value/2, update/3, merge/2, equal/2, from_binary/1, to_binary/1, stats/1, stat/2]). --export([update/4, parent_clock/2]). +-export([update/4, parent_clock/2, get_deferred/1]). -export([to_binary/2, from_binary/2]). -ifdef(EQC). @@ -74,6 +74,9 @@ update(enable, _Actor, _Flag, _Ctx) -> parent_clock(_Clock, Flag) -> Flag. +-spec get_deferred(enable_flag()) -> []. +get_deferred(_CRDT) -> []. + -spec merge(enable_flag(), enable_flag()) -> enable_flag(). merge(FA, FB) -> flag_or(FA, FB). diff --git a/src/riak_dt_gcounter.erl b/src/riak_dt_gcounter.erl index 544ddd9..3eca767 100644 --- a/src/riak_dt_gcounter.erl +++ b/src/riak_dt_gcounter.erl @@ -37,7 +37,7 @@ -module(riak_dt_gcounter). -behaviour(riak_dt). -export([new/0, new/2, value/1, value/2, update/3, merge/2, equal/2, to_binary/1, from_binary/1, stats/1, stat/2]). --export([update/4, parent_clock/2]). +-export([update/4, parent_clock/2, get_deferred/1]). -export([to_binary/2, from_binary/2]). %% EQC API @@ -95,6 +95,9 @@ update(Op, Actor, GCnt, _Ctx) -> parent_clock(_Clock, GCnt) -> GCnt. +-spec get_deferred(gcounter()) -> []. +get_deferred(_CRDT) -> []. + %% @doc Merge two `gcounter()'s to a single `gcounter()'. This is the Least Upper Bound %% function described in the literature. -spec merge(gcounter(), gcounter()) -> gcounter(). diff --git a/src/riak_dt_gset.erl b/src/riak_dt_gset.erl index ee79d7f..f8c044c 100644 --- a/src/riak_dt_gset.erl +++ b/src/riak_dt_gset.erl @@ -35,7 +35,7 @@ %% API -export([new/0, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1, value/2, stats/1, stat/2]). --export([update/4, parent_clock/2]). +-export([update/4, parent_clock/2, get_deferred/1]). -export([to_binary/2, from_binary/2]). -ifdef(EQC). @@ -92,6 +92,9 @@ update(Op, Actor, GSet, _Ctx) -> parent_clock(_Clock, GSet) -> GSet. +-spec get_deferred(gset()) -> []. +get_deferred(_CRDT) -> []. + -spec merge(gset(), gset()) -> gset(). merge(GSet1, GSet2) -> ordsets:union(GSet1, GSet2). diff --git a/src/riak_dt_lwwreg.erl b/src/riak_dt_lwwreg.erl index 4f7c470..53c4571 100644 --- a/src/riak_dt_lwwreg.erl +++ b/src/riak_dt_lwwreg.erl @@ -33,7 +33,7 @@ -export([new/0, value/1, value/2, update/3, merge/2, equal/2, to_binary/1, from_binary/1, stats/1, stat/2]). --export([parent_clock/2, update/4]). +-export([update/4, parent_clock/2, get_deferred/1]). -export([to_binary/2, from_binary/2]). %% EQC API @@ -63,6 +63,9 @@ new() -> parent_clock(_Clock, Reg) -> Reg. +-spec get_deferred(lwwreg()) -> []. +get_deferred(_CRDT) -> []. + %% @doc The single total value of a `gcounter()'. -spec value(lwwreg()) -> term(). value({Value, _TS}) -> diff --git a/src/riak_dt_map.erl b/src/riak_dt_map.erl index 8e970d4..112e9b9 100644 --- a/src/riak_dt_map.erl +++ b/src/riak_dt_map.erl @@ -180,7 +180,7 @@ -export([merge/2, equal/2, to_binary/1, from_binary/1]). -export([to_binary/2, from_binary/2]). -export([precondition_context/1, stats/1, stat/2]). --export([parent_clock/2]). +-export([parent_clock/2, get_deferred/1]). %% EQC API -ifdef(EQC). @@ -192,57 +192,59 @@ -type binary_map() :: binary(). %% A binary that from_binary/1 will accept -type map() :: {riak_dt_vclock:vclock(), entries(), deferred()}. -type entries() :: dict(field_name(), field_value()). --type field() :: {field_name(), field_value()}. -type field_name() :: {Name :: binary(), CRDTModule :: crdt_mod()}. --type field_value() :: {crdts(), tombstone()}. - --type crdts() :: [entry()]. --type entry() :: {riak_dt:dot(), crdt()}. - -%% Only for present fields, ensures removes propogate --type tombstone() :: crdt(). +-type field_meta() :: {riak_dt_vclock:vclock(), seen(), tombstone()}. +-type field_value() :: {field_meta(), crdt()}. %% Only field removals can be deferred. CRDTs stored in the map may %% have contexts and deferred operations, but as these are part of the %% state, they are stored under the field as an update like any other. --type deferred() :: dict(context(), [field()]). +-type seen() :: dots(). +-type deferred() :: dict(riak_dt:context(), [field_name()]). +-type tombstone() :: riak_dt_vclock:vclock(). -%% used until we move to erlang 17 and can use dict:dict/2 --type dict(_A, _B) :: dict(). +-type dots() :: [dot()]. +-type dot() :: riak_dt:dot(). %% limited to only those mods that support both a shared causal %% context, and by extension, the reset-remove semantic. -type crdt_mod() :: riak_dt_emcntr | riak_dt_lwwreg | - riak_dt_od_flag | - riak_dt_map | riak_dt_orswot. +riak_dt_od_flag | riak_dt_map | riak_dt_orswot. -type crdt() :: riak_dt_emcntr:emcntr() | riak_dt_od_flag:od_flag() | - riak_dt_lwwreg:lwwreg() | - riak_dt_orswot:orswot() | - riak_dt_map:map(). +riak_dt_lwwreg:lwwreg() | riak_dt_orswot:orswot() | riak_dt_map:map(). -type map_op() :: {update, [map_field_update() | map_field_op()]}. --type map_field_op() :: {remove, field()}. --type map_field_update() :: {update, field(), crdt_op()}. +-type map_field_op() :: {remove, field_name()}. +-type map_field_update() :: {update, field_name(), crdt_op()}. -type crdt_op() :: riak_dt_emcntr:emcntr_op() | - riak_dt_lwwreg:lwwreg_op() | - riak_dt_orswot:orswot_op() | riak_dt_od_flag:od_flag_op() | - riak_dt_map:map_op(). - --type context() :: riak_dt_vclock:vclock() | undefined. +riak_dt_lwwreg:lwwreg_op() | +riak_dt_orswot:orswot_op() | riak_dt_od_flag:od_flag_op() | +riak_dt_map:map_op() | riak_dt_map:map_op(). -type values() :: [value()]. --type value() :: {field(), riak_dt_map:values() | integer() | [term()] | boolean() | term()}. --type precondition_error() :: {error, {precondition, {not_present, field()}}}. +-type value() :: {field_name(), riak_dt_map:values() | integer() | [term()] | boolean() | term()}. +-type precondition_error() :: {error, {precondition, {not_present, field_name()}}}. + +-define(FRESH_CLOCK, riak_dt_vclock:fresh()). + +-ifdef(EQC). +-define(DICT, orddict). +-type dict(_A, _B) :: orddict:orddict(). + +-else. +%% used until we move to erlang 17 and can use dict:dict/2 +-type dict(_A, _B) :: dict(). -define(DICT, dict). +-endif. %% @doc Create a new, empty Map. -spec new() -> map(). new() -> - {riak_dt_vclock:fresh(), ?DICT:new(), ?DICT:new()}. + {?FRESH_CLOCK, ?DICT:new(), ?DICT:new()}. %% @doc sets the clock in the map to that `Clock'. Used by a %% containing Map for sub-CRDTs @@ -250,33 +252,49 @@ new() -> parent_clock(Clock, {_MapClock, Values, Deferred}) -> {Clock, Values, Deferred}. +%% @doc get all deferred operations for the map. +%% Does not evaluate recursively - I think this is not necessary right now, +%% because we do not compose map with other data-types than maps. +-spec get_deferred(map()) -> [riak_dt:context()]. +get_deferred({_, _, Deferred}) -> + lists:map(fun({Key, _}) -> Key end, ?DICT:to_list(Deferred)). + %% @doc get the current set of values for this Map -spec value(map()) -> values(). -value({_Clock, Values, _Deferred}) -> - lists:sort(?DICT:fold(fun({Name, Type}, CRDTs, Acc) -> - Merged = merge_crdts(Type, CRDTs), - [{{Name, Type}, Type:value(Merged)} | Acc] end, - [], - Values)). - -%% @private merge entry for field, if present, or return new if not -merge_field({_Name, Type}, error) -> - Type:new(); -merge_field({_Name, Type}, {ok, CRDTs}) -> - merge_crdts(Type, CRDTs); -merge_field(Field, Values) -> - merge_field(Field, ?DICT:find(Field, Values)). - -%% @private merge the CRDTs of a type -merge_crdts(Type, {CRDTs, TS}) -> - V = ?DICT:fold(fun(_Dot, CRDT, CRDT0) -> - Type:merge(CRDT0, CRDT) end, - Type:new(), - CRDTs), - %% Merge with the tombstone to drop any removed dots - Type:merge(TS, V). +value({Clock, Values, _Deferred}) -> + %%Selects the visible elements from the map + lists:sort(?DICT:fold( + fun({Name, Type}, {{Dots, _, Tombstone}, CRDT0}, Acc) -> + CRDT = Type:parent_clock(Clock, CRDT0), + case Tombstone of + [] -> [{{Name, Type}, Type:value(CRDT)} | Acc]; + _ -> + case riak_dt_vclock:descends(Tombstone, Dots) of + %No new dots in this branch + true -> + Acc; + %Dots in the branch + false -> + case Type of + %Recursive search + riak_dt_map -> + SubTree = value(CRDT), + case SubTree of + [] -> + Acc; + _ -> [{{Name, Type}, Type:value(CRDT)} | Acc] + end; + _ -> + [{{Name, Type}, Type:value(CRDT)} | Acc] + end + end + end + end, + [], + Values)). %% @doc query map (not implemented yet) +%% -spec value(term(), map()) -> values(). value(_, Map) -> value(Map). @@ -298,7 +316,7 @@ value(_, Map) -> %% %% Atomic, all of `Ops' are performed successfully, or none are. -spec update(map_op(), riak_dt:actor() | riak_dt:dot(), map()) -> - {ok, map()} | precondition_error(). + {ok, map()} | precondition_error(). update(Op, ActorOrDot, Map) -> update(Op, ActorOrDot, Map, undefined). @@ -309,7 +327,7 @@ update(Op, ActorOrDot, Map) -> %% %% @see parent_clock/2 -spec update(map_op(), riak_dt:actor() | riak_dt:dot(), map(), riak_dt:context()) -> - {ok, map()}. + {ok, map()}. update({update, Ops}, ActorOrDot, {Clock0, Values, Deferred}, Ctx) -> {Dot, Clock} = update_clock(ActorOrDot, Clock0), apply_ops(Ops, Dot, {Clock, Values, Deferred}, Ctx). @@ -318,7 +336,7 @@ update({update, Ops}, ActorOrDot, {Clock0, Values, Deferred}, Ctx) -> %% means that field removals increment the clock too. -spec update_clock(riak_dt:actor() | riak_dt:dot(), riak_dt_vclock:vclock()) -> - {riak_dt:dot(), riak_dt_vclock:vclock()}. + {riak_dt:dot(), riak_dt_vclock:vclock()}. update_clock(Dot, Clock) when is_tuple(Dot) -> NewClock = riak_dt_vclock:merge([[Dot], Clock]), {Dot, NewClock}; @@ -327,25 +345,33 @@ update_clock(Actor, Clock) -> Dot = {Actor, riak_dt_vclock:get_counter(Actor, NewClock)}, {Dot, NewClock}. +-spec get_entry(field_name(), entries(), riak_dt_vclock:vclock()) -> + field_value(). +get_entry({_Name, Type}=Field, Fields, Clock) -> + {{Dots, _S, Tombstone}, CRDT} = case ?DICT:find(Field, Fields) of + {ok, Entry} -> + Entry; + error -> + {{[], [], ?FRESH_CLOCK}, Type:new()} + end, + {{Dots, _S, Tombstone}, Type:parent_clock(Clock, CRDT)}. + %% @private -spec apply_ops([map_field_update() | map_field_op()], riak_dt:dot(), - {riak_dt_vclock:vclock(), entries() , deferred()}, context()) -> - {ok, map()} | precondition_error(). + {riak_dt_vclock:vclock(), entries() , deferred()}, riak_dt:context()) -> + {ok, map()} | precondition_error(). apply_ops([], _Dot, Map, _Ctx) -> {ok, Map}; -apply_ops([{update, {_Name, Type}=Field, Op} | Rest], Dot, {Clock, Values, Deferred}, Ctx) -> - CRDT = merge_field(Field, Values), - CRDT1 = Type:parent_clock(Clock, CRDT), - case Type:update(Op, Dot, CRDT1, Ctx) of - {ok, Updated} -> - NewValues = ?DICT:store(Field, {?DICT:store(Dot, Updated, ?DICT:new()), - %% old tombstone was - %% merged into current - %% value so create a new - %% empty one - Type:new()} - , Values), - apply_ops(Rest, Dot, {Clock, NewValues, Deferred}, Ctx); +apply_ops([{update, {_Name, Type}=Field, Op} | Rest], Dot, {Clock, Values, Deferred}=_ALL, Ctx) -> + {{_,_, Tombstone}, CRDT} = get_entry(Field, Values, Clock), + case Type:update(Op, Dot, CRDT, Ctx) of + {ok, Updated0} -> + Updated = Type:parent_clock(?FRESH_CLOCK, Updated0), + NewValues = ?DICT:store(Field, {{[Dot], [], Tombstone}, Updated}, Values), + %% Propagate previous remove operations that were tombstoned. + %% This is expensive. + UpdtCRDT = apply_deferred({Clock, NewValues, Deferred}), + apply_ops(Rest, Dot, UpdtCRDT, Ctx); Error -> Error end; @@ -369,8 +395,8 @@ apply_ops([{remove, Field} | Rest], Dot, Map, Ctx) -> %% %% @see defer_remove/4 for handling of removes of fields that are %% _not_ present --spec remove_field(field(), map(), context()) -> - {ok, map()} | precondition_error(). +-spec remove_field(field_name(), map(), riak_dt:context()) -> + {ok, map()} | precondition_error(). remove_field(Field, {Clock, Values, Deferred}, undefined) -> case ?DICT:find(Field, Values) of error -> @@ -378,21 +404,34 @@ remove_field(Field, {Clock, Values, Deferred}, undefined) -> {ok, _Removed} -> {ok, {Clock, ?DICT:erase(Field, Values), Deferred}} end; -%% Context removes -remove_field(Field, {Clock, Values, Deferred0}, Ctx) -> + +remove_field({_,_Type}=Field, {Clock, Values, Deferred0}, Ctx) -> Deferred = defer_remove(Clock, Ctx, Field, Deferred0), - NewValues = case ctx_rem_field(Field, Values, Ctx, Clock) of - empty -> - ?DICT:erase(Field, Values); - CRDTs -> - ?DICT:store(Field, CRDTs, Values) + {DefCtx, UpdtValues} = propagate_remove(Field, Values, Clock, Ctx), + NewValues = case ?DICT:find(Field, UpdtValues) of + %Element is removed but has deferred operations + {ok, empty} when DefCtx =/= no_deferred -> + ?DICT:update(Field, fun(Found) -> + case Found of + {{Dots, _S, Tombstone}, CRDT} -> + Tombstone = riak_dt_vclock:merge([DefCtx, Tombstone]), + {{Dots, _S, Tombstone}, CRDT}; + error -> + UpdtValues + end + end,UpdtValues); + {ok, empty} -> + ?DICT:erase(Field, UpdtValues); + {ok, CRDT} -> + ?DICT:store(Field, CRDT, UpdtValues); + error -> + UpdtValues end, {ok, {Clock, NewValues, Deferred}}. %% @private drop dominated fields -ctx_rem_field(_Field, error, _Ctx_, _Clock) -> - empty; -ctx_rem_field({_, Type}, {ok, {CRDTs, TS0}}, Ctx, MapClock) -> +-spec ctx_rem_field(field_name(), field_value(), riak_dt:context(), riak_dt_vclock:vclock()) -> empty | field_value(). +ctx_rem_field({_, Type}, {{Dots, _S, Tombstone}, CRDT}, Ctx, MapClock) -> %% Drop dominated fields, and update the tombstone. %% %% If the context is removing a field at dot {a, 1} and the @@ -403,19 +442,77 @@ ctx_rem_field({_, Type}, {ok, {CRDTs, TS0}}, Ctx, MapClock) -> %% TombstoneClock = riak_dt_vclock:glb(Ctx, MapClock), %% GLB is events seen by both clocks only TS = Type:parent_clock(TombstoneClock, Type:new()), - Remaining = ?DICT:filter(fun(Dot, _CRDT) -> - is_dot_unseen(Dot, Ctx) - end, - CRDTs), - case ?DICT:size(Remaining) of - 0 -> %% Ctx remove removed all dots for field + SurvivingDots = riak_dt_vclock:subtract_dots(Dots, Ctx), + case SurvivingDots of + [] -> %% Ctx remove removed all dots for field empty; _ -> %% Update the tombstone with the GLB clock - {Remaining, Type:merge(TS, TS0)} + CRDT2 = Type:merge(TS, Type:parent_clock(MapClock, CRDT)), + %% Always reset to empty clock so we don't duplicate storage + {{SurvivingDots, _S, Tombstone}, Type:parent_clock(?FRESH_CLOCK, CRDT2)} + end. + +%% Value is a map: +%% Remove fields that don't have deferred operations; +%% Compute the removal tombstone for this field. +-spec propagate_remove(field_name(), entries() | field_value(), riak_dt_vclock:vclock(), riak_dt:context()) -> + {riak_dt_vclock:vclock(), entries() | empty}. +propagate_remove({_, riak_dt_map}, {{Dots, _S, Tombstone}, {Clock, Value0, Deferred} = CRDT}, MapClock, Ctx)-> + {SubMergedDef, SubEntries} = + ?DICT:fold(fun(K, V, {UpdtClock, UpdtEntries}) -> + case propagate_remove(K, V, MapClock, Ctx) of + {_, empty} -> + {UpdtClock, UpdtEntries}; + {TombstoneClock, Value} -> + %%Some deferred operation in subtree + %% - keep entry, update tombstones + {riak_dt_vclock:merge([TombstoneClock, UpdtClock]), + ?DICT:store(K, Value, UpdtEntries)} + end + end, {?FRESH_CLOCK, ?DICT:new()}, Value0), + UncoveredDeferred = lists:filter(fun(DefOp) -> + riak_dt_vclock:descends(Clock, DefOp) + end, get_deferred(CRDT)), + Descends = riak_dt_vclock:descends(Ctx, Dots), + %Clear map if all entries are empty + case ?DICT:size(SubEntries) of + 0 when length(UncoveredDeferred) == 0, Descends -> + {SubMergedDef, empty}; + _ -> + {SubMergedDef, {{Dots, _S, riak_dt_vclock:merge([SubMergedDef, Tombstone])}, {Clock, SubEntries, Deferred}}} end; -ctx_rem_field(Field, Values, Ctx, MapClock) -> - ctx_rem_field(Field, ?DICT:find(Field, Values), Ctx, MapClock). + +%% Value is a leaf: +%% Merge deferred operations' context with Value clock (Tombstone) and send it upstream +propagate_remove({_, Type}=Field, {{Dots, _S, TombstoneIn}, CRDT}, MapClock, Ctx) -> + case Type:get_deferred(CRDT) of + [] when length(Dots) > 0 -> + {[], ctx_rem_field(Field, {{Dots, _S, TombstoneIn}, CRDT}, Ctx, MapClock)}; + Deferred -> + Intersection = riak_dt_vclock:glb(MapClock,Ctx), + Tombstone = riak_dt_vclock:merge([Intersection, TombstoneIn | Deferred]), + + %% Clear CRDT + TombstoneClock = riak_dt_vclock:glb(MapClock, Ctx), + TS = Type:parent_clock(TombstoneClock, Type:new()), + ClearedCRDT = Type:merge(TS, Type:parent_clock(TombstoneClock, CRDT)), + + {Tombstone, {{Dots, _S, Tombstone}, Type:parent_clock(?FRESH_CLOCK, ClearedCRDT)}} + end; + +propagate_remove(Field, Values, MapClock, Ctx) -> + case ?DICT:find(Field, Values) of + {ok,Value} -> + {UpdtClock,UpdtValue} = propagate_remove(Field, Value, MapClock, Ctx), + case UpdtValue of + empty -> + {UpdtClock, ?DICT:erase(Field, Values)}; + _ -> + {UpdtClock, ?DICT:store(Field, UpdtValue, Values)} + end; + error -> {MapClock, Values} + end. %% @private If we're asked to remove something we don't have (or have, %% but maybe not all 'updates' for it), is it because we've not seen @@ -430,162 +527,134 @@ ctx_rem_field(Field, Values, Ctx, MapClock) -> %% result in deferred operations on the parent Map. This simulates %% causal delivery, in that an `update' must be seen before it can be %% `removed'. --spec defer_remove(riak_dt_vclock:vclock(), riak_dt_vclock:vclock(), field(), deferred()) -> - deferred(). +-spec defer_remove(riak_dt_vclock:vclock(), riak_dt_vclock:vclock(), field_name(), deferred()) -> + deferred(). defer_remove(Clock, Ctx, Field, Deferred) -> case riak_dt_vclock:descends(Clock, Ctx) of %% no need to save this remove, we're done true -> Deferred; false -> ?DICT:update(Ctx, - fun(Fields) -> - ordsets:add_element(Field, Fields) end, - ordsets:add_element(Field, ordsets:new()), - Deferred) + fun(Fields) -> + ordsets:add_element(Field, Fields) end, + ordsets:add_element(Field, ordsets:new()), + Deferred) end. - -%% @doc merge two `map()'s. -spec merge(map(), map()) -> map(). -merge(Map, Map) -> - Map; -%% @TODO is there a way to optimise this, based on clocks maybe? -merge({LHSClock, LHSEntries, LHSDeferred}, {RHSClock, RHSEntries, RHSDeferred}) -> +merge({LHSClock0, LHSEntries0, LHSDeferred0}, {RHSClock0, RHSEntries0, RHSDeferred0}) -> + %% Clear entries that do not use dots to identify updates + {LHSClock, LHSEntries, LHSDeferred} = apply_deferred({LHSClock0, LHSEntries0, RHSDeferred0}), + {RHSClock, RHSEntries, RHSDeferred} = apply_deferred({RHSClock0, RHSEntries0, LHSDeferred0}), + Clock = riak_dt_vclock:merge([LHSClock, RHSClock]), - {CommonKeys, LHSUnique, RHSUnique} = key_sets(LHSEntries, RHSEntries), - Acc0 = filter_unique(LHSUnique, LHSEntries, RHSClock, ?DICT:new()), - Acc1 = filter_unique(RHSUnique, RHSEntries, LHSClock, Acc0), - Entries = merge_common(CommonKeys, LHSEntries, RHSEntries, LHSClock, RHSClock, Acc1), - Deferred = merge_deferred(RHSDeferred, LHSDeferred), - apply_deferred(Clock, Entries, Deferred). - -%% @private filter the set of fields that are on one side of a merge -%% only. --spec filter_unique(set(), entries(), riak_dt_vclock:vclock(), entries()) -> entries(). -filter_unique(FieldSet, Entries, Clock, Acc) -> - sets:fold(fun({_Name, Type}=Field, Keep) -> - {Dots, TS} = ?DICT:fetch(Field, Entries), - KeepDots = ?DICT:filter(fun(Dot, _CRDT) -> - is_dot_unseen(Dot, Clock) - end, - Dots), - - case ?DICT:size(KeepDots) of - 0 -> - Keep; - _ -> - %% create a tombstone since the - %% otherside does not have this field, - %% it either removed it, or never had - %% it. If it never had it, the removing - %% dots in the tombstone will have no - %% impact on the value, if the otherside - %% removed it, then the removed dots - %% will be propogated by the tombstone. - Tombstone = Type:merge(TS, Type:parent_clock(Clock, Type:new())), - ?DICT:store(Field, {KeepDots, Tombstone}, Keep) - end - end, - Acc, - FieldSet). - -%% @private predicate function, `true' if the provided `dot()' is -%% concurrent with the clock, `false' if the clock has seen the dot. --spec is_dot_unseen(riak_dt:dot(), riak_dt_vclock:vclock()) -> boolean(). -is_dot_unseen(Dot, Clock) -> - not riak_dt_vclock:descends(Clock, [Dot]). - -%% @doc Get the keys from an ?DICT as a set --spec key_set(?DICT()) -> set(). -key_set(Dict) -> - sets:from_list(?DICT:fetch_keys(Dict)). - -%% @doc break the keys from an two ?DICTs out into three sets, the -%% common keys, those unique to one, and those unique to the other. --spec key_sets(?DICT(), ?DICT()) -> {set(), set(), set()}. -key_sets(LHS, RHS) -> - LHSet = key_set(LHS), - RHSet = key_set(RHS), - {sets:intersection(LHSet, RHSet), - sets:subtract(LHSet, RHSet), - sets:subtract(RHSet, LHSet)}. - - -%% @private for a set of dots (that are unique to one side) decide -%% whether to keep, or drop each. --spec filter_dots(set(), ?DICT(), riak_dt_vclock:vclock()) -> entries(). -filter_dots(Dots, CRDTs, Clock) -> - DotsToKeep = sets:filter(fun(Dot) -> - is_dot_unseen(Dot, Clock) - end, - Dots), - - ?DICT:filter(fun(Dot, _CRDT) -> - sets:is_element(Dot, DotsToKeep) - end, - CRDTs). - -%% @private merge the common fields into a set of surviving dots and a -%% tombstone per field. If a dot is on both sides, keep it. If it is -%% only on one side, drop it if dominated by the otheride's clock. -merge_common(FieldSet, LHS, RHS, LHSClock , RHSClock, Acc) -> - sets:fold(fun({_, Type}=Field, Keep) -> - {LHSDots, LHTS} = ?DICT:fetch(Field, LHS), - {RHSDots, RHTS} = ?DICT:fetch(Field, RHS), - {CommonDots, LHSUniqe, RHSUnique} = key_sets(LHSDots, RHSDots), - TS = Type:merge(RHTS, LHTS), - - CommonSurviving = sets:fold(fun(Dot, Common) -> - L = ?DICT:fetch(Dot, LHSDots), - ?DICT:store(Dot, L, Common) - end, - ?DICT:new(), - CommonDots), - - LHSSurviving = filter_dots(LHSUniqe, LHSDots, RHSClock), - RHSSurviving = filter_dots(RHSUnique, RHSDots, LHSClock), - - Dots = ?DICT:from_list(lists:merge([?DICT:to_list(CommonSurviving), - ?DICT:to_list(LHSSurviving), - ?DICT:to_list(RHSSurviving)])), - - case ?DICT:size(Dots) of - 0 -> - Keep; - _ -> - ?DICT:store(Field, {Dots, TS}, Keep) - end - - end, - Acc, - FieldSet). + Fields = lists:umerge(?DICT:fetch_keys(LHSEntries), ?DICT:fetch_keys(RHSEntries)), + Entries = lists:foldl(fun({_Name, Type}=Field, Acc) -> + {{LHSDots, LHSS, LHDTomb}, LHSCRDT} = get_entry(Field, LHSEntries, LHSClock), + {{RHSDots, RHSS, RHDTomb}, RHSCRDT} = get_entry(Field, RHSEntries, RHSClock), + case keep_dots(LHSDots, RHSDots, LHSClock, RHSClock) of + [] -> + Acc; + Dots -> + MergedCRDT = Type:merge(LHSCRDT, RHSCRDT), + MergedTombstone = riak_dt_vclock:merge([LHDTomb,RHDTomb]), + MergedSeen = lists:umerge(LHSS, RHSS), + %% Yes! Reset the clock, again + ?DICT:store(Field, {{Dots, MergedSeen, MergedTombstone}, Type:parent_clock(?FRESH_CLOCK, MergedCRDT)}, Acc) + end + end, + ?DICT:new(), + Fields), + + Deferred = merge_deferred(LHSDeferred, RHSDeferred), + CRDT = apply_deferred({Clock, Entries, Deferred}), + %% What should be done first: clear tombstones or apply deferred? + %% Can we remove this apply deferred? + clear_tombstones(CRDT). + +-spec keep_dots(riak_dt_vclock:vclock(), riak_dt_vclock:vclock(), riak_dt_vclock:vclock(), riak_dt_vclock:vclock()) -> riak_dt_vclock:vclock(). +keep_dots(LHSDots, RHSDots, LHSClock, RHSClock) -> + CommonDots = sets:intersection(sets:from_list(LHSDots), sets:from_list(RHSDots)), + LHSUnique = sets:to_list(sets:subtract(sets:from_list(LHSDots), CommonDots)), + RHSUnique = sets:to_list(sets:subtract(sets:from_list(RHSDots), CommonDots)), + LHSKeep = riak_dt_vclock:subtract_dots(LHSUnique, RHSClock), + RHSKeep = riak_dt_vclock:subtract_dots(RHSUnique, LHSClock), + riak_dt_vclock:merge([sets:to_list(CommonDots), LHSKeep, RHSKeep]). + %% @private -spec merge_deferred(deferred(), deferred()) -> deferred(). merge_deferred(LHS, RHS) -> ?DICT:merge(fun(_K, LH, RH) -> - ordsets:union(LH, RH) end, - LHS, RHS). + ordsets:union(LH, RH) end, + LHS, RHS). %% @private apply those deferred field removals, if they're %% preconditions have been met, that is. --spec apply_deferred(riak_dt_vclock:vclock(), entries(), deferred()) -> - {riak_dt_vclock:vclock(), entries(), deferred()}. -apply_deferred(Clock, Entries, Deferred) -> +-spec apply_deferred(map()) -> map(). +apply_deferred({Clock, Entries, Deferred}) -> ?DICT:fold(fun(Ctx, Fields, Map) -> - remove_all(Fields, Map, Ctx) + lists:foldl(fun(Field, {Ci,Ei,Defi}=Mapi) -> + case ?DICT:is_key(Field, Ei) of + true -> + {ok, Res} = remove_field(Field, Mapi, Ctx), + Res; + false -> + %%If there is no key, applying the deferred operation + %%would not affect the state of the object. + Def = defer_remove(Clock, Ctx, Field, Defi), + {Ci,Ei,Def} + end + end, Map, Fields) end, {Clock, Entries, ?DICT:new()}, Deferred). %% @private --spec remove_all([field()], map(), context()) -> - map(). -remove_all(Fields, Map, Ctx) -> - lists:foldl(fun(Field, MapAcc) -> - {ok, MapAcc2}= remove_field(Field, MapAcc, Ctx), - MapAcc2 - end, - Map, - Fields). +%%Eliminates the tombstone if it has been integrated in the object's clock. +-spec clear_tombstones(map()) -> map(). +clear_tombstones({Clock, Entries, Deferred}) -> + FilteredEntries = + ?DICT:fold(fun(Field_i, Value_i, FilteredEntriesAcc) -> + clear_tombstones_handle(Field_i, Value_i, FilteredEntriesAcc, Clock) + end, ?DICT:new(), Entries), + {Clock, FilteredEntries, Deferred}. + +clear_tombstones_handle({_, riak_dt_map}=Field, {{Dots, _S, Tombstone}, {Clock, CRDT, Deferred}}, NewMap, MapClock) -> + FilteredEntries = + ?DICT:fold(fun(Field_i, Value_i, FilteredEntriesAcc) -> + clear_tombstones_handle(Field_i, Value_i, FilteredEntriesAcc, MapClock) + end, ?DICT:new(), CRDT), + %%Distinguish between empty map and removed map. --- this was changed, maybe do the same to map. + TombstoneCovered = riak_dt_vclock:descends(MapClock, Tombstone), + case ?DICT:size(FilteredEntries) == 0 of + true when length(Tombstone) > 0 andalso TombstoneCovered -> + % No childs, a tombstone was set (remove executed) and the clock dominates tombstone + NewMap; + _ -> + case TombstoneCovered of + true -> + % New entries were added to the map - keep entries, clear tomb + ?DICT:store(Field, {{Dots, _S, []}, {Clock, FilteredEntries, Deferred}}, NewMap); + false -> + % A tombstone was set, but the tombstone is still newer + % No childs, but no tombstonte was set (empty field) - keep all + ?DICT:store(Field, {{Dots, _S, Tombstone}, {Clock, FilteredEntries, Deferred}}, NewMap) + end + end; + +clear_tombstones_handle(Field, {{Dots, _S, Tombstone}, CRDT}=Value, NewMap, MapClock) -> + TombstoneCovered = riak_dt_vclock:descends(MapClock, Tombstone), + case TombstoneCovered of + true -> + ReceivedUpdates = riak_dt_vclock:subtract_dots(Dots, Tombstone), + case ReceivedUpdates of + [] -> + NewMap; + _ -> + ?DICT:store(Field, {{Dots, _S, []}, CRDT}, NewMap) + end; + false -> + ?DICT:store(Field, Value, NewMap) + end. %% @doc compare two `map()'s for equality of structure Both schemas %% and value list must be equal. Performs a pariwise equals for all @@ -593,30 +662,22 @@ remove_all(Fields, Map, Ctx) -> -spec equal(map(), map()) -> boolean(). equal({Clock1, Values1, Deferred1}, {Clock2, Values2, Deferred2}) -> riak_dt_vclock:equal(Clock1, Clock2) andalso - Deferred1 == Deferred2 andalso - pairwise_equals(lists:sort(?DICT:to_list(Values1)), - lists:sort(?DICT:to_list(Values2))). + Deferred1 == Deferred2 andalso + pairwise_equals(lists:sort(?DICT:to_list(Values1)), + lists:sort(?DICT:to_list(Values2))). --spec pairwise_equals([field()], [field()]) -> boolean(). +-spec pairwise_equals(entries() | [], entries() | []) -> boolean(). pairwise_equals([], []) -> true; -pairwise_equals([{{Name, Type}, {Dots1, TS1}}| Rest1], [{{Name, Type}, {Dots2, TS2}}|Rest2]) -> - %% Tombstones don't need to be equal. When we merge with a map - %% where one side is absent, we take the absent sides clock, when - %% we merge where both sides have a field, we merge the - %% tombstones, and apply deferred. The deferred remove uses a glb - %% of the context and the clock, meaning we get a smaller - %% tombstone. Both are correct when it comes to determining the - %% final value. As long as tombstones are not conflicting (that is - %% A == B | A > B | B > A) - case {?DICT:fetch_keys(Dots1) == ?DICT:fetch_keys(Dots2), Type:equal(TS1, TS2)} of - {true, true} -> +pairwise_equals([{{Name, Type}, {{Dots1, S1, Tombstone1}, CRDT1}}|Rest1], [{{Name, Type}, {{Dots2, S2, Tombstone2}, CRDT2}}|Rest2]) -> + case {riak_dt_vclock:equal(Dots1, Dots2), S1 =:= S2, Type:equal(CRDT1, CRDT2), riak_dt_vclock:equal(Tombstone1, Tombstone2)} of + {true, true, true, true} -> pairwise_equals(Rest1, Rest2); _ -> false end; -pairwise_equals(_, _) -> - false. +pairwise_equals(_, _) -> false. + %% @doc an opaque context that can be passed to `update/4' to ensure %% that only seen fields are removed. If a field removal operation has @@ -636,25 +697,18 @@ precondition_context({Clock, _Field, _Deferred}) -> %% of lag/staleness. -spec stats(map()) -> [{atom(), integer()}]. stats(Map) -> - [ {S, stat(S, Map)} || S <- [actor_count, field_count, duplication, deferred_length]]. + [ {S, stat(S, Map)} || S <- [actor_count, field_count, deferred_length]]. -spec stat(atom(), map()) -> number() | undefined. stat(actor_count, {Clock, _, _}) -> length(Clock); stat(field_count, {_, Fields, _}) -> ?DICT:size(Fields); -stat(duplication, {_, Fields, _}) -> - %% Number of duplicated fields - {FieldCnt, Duplicates} = ?DICT:fold(fun(_Field, {Dots ,_}, {FCnt, DCnt}) -> - {FCnt+1, DCnt + ?DICT:size(Dots)} - end, - {0, 0}, - Fields), - Duplicates - FieldCnt; stat(deferred_length, {_, _, Deferred}) -> ?DICT:size(Deferred); stat(_,_) -> undefined. + -include("riak_dt_tags.hrl"). -define(TAG, ?DT_MAP_TAG). -define(V1_VERS, 1). @@ -685,9 +739,9 @@ to_binary(?V1_VERS, Map0) -> %% @private transpose a v1 map (orddicts) to a v2 (dicts) -spec to_v2({riak_dt_vclock:vclock(), orddict:orddict() | dict(), orddict:orddict() | dict()}) -> - {riak_dt_vclock:vclock(), dict(), dict()}. + {riak_dt_vclock:vclock(), dict(), dict()}. to_v2({Clock, Fields0, Deferred0}) when is_list(Fields0), - is_list(Deferred0) -> + is_list(Deferred0) -> Fields = ?DICT:from_list(Fields0), Deferred = ?DICT:from_list(Deferred0), {Clock, Fields, Deferred}; @@ -697,9 +751,9 @@ to_v2(S) -> %% @private transpose a v2 map (dicts) to a v1 (orddicts) -spec to_v1({riak_dt_vclock:vclock(), orddict:orddict() | dict(), orddict:orddict() | dict()}) -> - {riak_dt_vclock:vclock(), orddict:orddict(), orddict:orddict()}. + {riak_dt_vclock:vclock(), orddict:orddict(), orddict:orddict()}. to_v1({_Clock, Fields0, Deferred0}=S) when is_list(Fields0), - is_list(Deferred0) -> + is_list(Deferred0) -> S; to_v1({Clock, Fields0, Deferred0}) -> %% Must be dicts, there is no is_dict test though @@ -736,11 +790,229 @@ from_binary(?V2_VERS, <>) -> Map= riak_dt:from_binary(B), to_v2(Map). + %% =================================================================== %% EUnit tests %% =================================================================== -ifdef(TEST). +-define(FIELD, {'X', riak_dt_map}). +-define(FIELD_Y, {'Y', riak_dt_map}). +-define(FIELD_X, {'X', riak_dt_od_flag}). +-define(FIELD_A, {'X.A', riak_dt_od_flag}). +-define(FIELD_B, {'X.B', riak_dt_od_flag}). +-define(FIELD_S, {'X.S', riak_dt_orswot}). + +-define(ENABLE_FLAG_A, {update, [{update, ?FIELD, {update, [{update, ?FIELD_A, enable}]}}]}). +-define(ENABLE_FLAG_B, {update, [{update, ?FIELD, {update, [{update, ?FIELD_B, enable}]}}]}). +-define(DISABLE_FLAG_A, {update, [{update, ?FIELD, {update, [{update, ?FIELD_A, disable}]}}]}). +-define(DISABLE_FLAG_B, {update, [{update, ?FIELD, {update, [{update, ?FIELD_B, disable}]}}]}). + +-define(REMOVE_FIELD_X,{update, [{remove, ?FIELD}]}). +-define(REMOVE_FIELD_XA, {update, [{update, ?FIELD, {update, [{remove, ?FIELD_A}]}}]}). +-define(REMOVE_FIELD_XB, {update, [{update, ?FIELD, {update, [{remove, ?FIELD_B}]}}]}). + +-define(ENABLE_FLAG_XYA, {update, [{update, ?FIELD, {update, [{update, ?FIELD_Y, {update, [{update, ?FIELD_A, enable}]}}]}}]}). + + + +%% Issue 99 test case +keep_deferred_test() -> + InitialState = new(), + + %Update at node A + {ok, {CtxA1, _, _}=StateA1} = update({update, [{update, ?FIELD_X, enable}]}, a, InitialState), + %Update with the context of node A on node B generates a deferred. + {ok, {CtxB1, _, _}=StateB1} = update({update, [{update, ?FIELD_X, disable}]}, b, InitialState, CtxA1), + %Remove field in B is causal with the disable op - it removes the field and the deferred. + {ok, StateB2} = update({update, [{remove, ?FIELD_X}]}, b, StateB1, CtxB1), + StateAB = merge(StateA1,StateB2), + ?assertEqual([],value(StateAB)). + +%% Test that the field is preserved if a deferred arrives after a remove +keep_multiple_deferred_test() -> + InitialState = new(), + + %Update at node A + {ok, {CtxA1, _, _}=StateA1} = update({update, [{update, ?FIELD_X, enable}]}, a, InitialState), + %Update with the context of node A on node B generates a deferred. + {ok, {CtxB1, _, _}=StateB1} = update({update, [{update, ?FIELD_X, disable}]}, b, InitialState, CtxA1), + %Remove field in B is causal with the disable op - it removes the field and the deferred. + {ok, StateB2} = update({update, [{remove, ?FIELD_X}]}, b, StateB1, CtxB1), + {ok, StateB3} = update({update, [{update, ?FIELD_X, disable}]}, b, StateB2, [{c,1}]), + StateAB = merge(StateA1,StateB3), + ?assertEqual([{{'X',riak_dt_od_flag},false}],value(StateAB)). + +%% Concurrently enable the flag +keep_deferred_with_concurrent_add_test() -> + InitialState = new(), + + %Update at node A + {ok, {CtxA1, _, _}=StateA1} = update({update, [{update, ?FIELD_X, enable}]}, a, InitialState), + %Update with the context of node A on node B generates a deferred. + {ok, {_, _, _}=StateA2} = update({update, [{update, ?FIELD_X, enable}]}, a, StateA1, CtxA1), + {ok, {CtxB1, _, _}=StateB1} = update({update, [{update, ?FIELD_X, disable}]}, b, InitialState, CtxA1), + %Remove field in B is causal with the disable op - it removes the field and the deferred. + {ok, StateB2} = update({update, [{remove, ?FIELD_X}]}, b, StateB1, CtxB1), + StateAB = merge(StateA2,StateB2), + ?assertEqual([{{'X',riak_dt_od_flag},true}],value(StateAB)). + +%% Remove using a context that does not descend from the object. +%% Remove concurrent with deferred delete --- preserve remove +keep_deferred_context_test() -> + InitialState = new(), + {ok, {CtxA1, _, _}=StateA1} = update({update, [{update, ?FIELD_X, enable}]}, a, InitialState), + {ok, {_, _, _}=StateB1} = update({update, [{update, ?FIELD_X, disable}]}, b, InitialState, CtxA1), + {ok, StateB2} = update({update, [{remove, ?FIELD_X}]}, b, StateB1, CtxA1), + ?assertEqual([{{'X',riak_dt_od_flag},false}],value(StateB2)), + StateAB = merge(StateA1,StateB2), + ?assertEqual([{{'X',riak_dt_od_flag},false}],value(StateAB)). + +%% Remove a field that is a map with a deferred operation. +%% Map should be removed in the end +remove_subtree_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, StateB2} = update(?REMOVE_FIELD_X, b, StateB1, CtxB1), + StateAB = merge(StateA1,StateB2), + ?assertEqual([],value(StateAB)). + +%% Remove a field inside a map that has a deferred operation. +remove_entry_in_subtree_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, StateB2} = update(?REMOVE_FIELD_XA, b, StateB1, CtxB1), + StateAB = merge(StateA1,StateB2), + ?assertEqual([{{'X',riak_dt_map},[]}],value(StateAB)). + +%% Remove a field X that is a map that has a entry with a deferred +%% and a flag that is enable after remove X but before receiving the deferred. +%% The idea is to test that the flag is preserved after clearing the tombstones +remove_entry_in_subtree_2_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, {CtxB2,_,_}=StateB2} = update(?REMOVE_FIELD_X, b, StateB1, CtxB1), + {ok, StateB3} = update(?ENABLE_FLAG_B, b, StateB2, CtxB2), + StateAB = merge(StateA1,StateB3), + ?assertEqual([{{'X',riak_dt_map}, [{{'X.B',riak_dt_od_flag},true}]}],value(StateAB)). + +%% The same as before, but the enable flag is a remote operation. +remove_entry_in_subtree_3_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, {_,_,_}=StateB2} = update(?REMOVE_FIELD_X, b, StateB1, CtxB1), + {ok, StateA2} = update(?ENABLE_FLAG_B, a, StateA1, CtxA1), + StateAB = merge(StateA2,StateB2), + ?assertEqual([{{'X',riak_dt_map}, [{{'X.B',riak_dt_od_flag},true}]}],value(StateAB)). + +two_deferred_entries_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {CtxC1,_,_}=StateC1} = update(?ENABLE_FLAG_B, c, InitialState), + {ok, {_CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, {CtxB2,_,_}=StateB2} = update(?DISABLE_FLAG_B, b, StateB1, CtxC1), + {ok, {CtxB3,_,_}=StateB3} = update(?REMOVE_FIELD_XA, b, StateB2, CtxB2), + {ok, {_CtxB4,_,_}=StateB4} = update(?REMOVE_FIELD_XB, b, StateB3, CtxB3), + {_,Map,_}=StateAB = merge(StateA1,StateB4), + + %%Check that the element is there + {ok, {{_,_,_},{_,X,_}}} = ?DICT:find(?FIELD,Map), + ?assertEqual(true,?DICT:is_key(?FIELD_B,X)), + StateABC = merge(StateAB,StateC1), + ?assertEqual([{{'X',riak_dt_map},[]}],value(StateABC)). + +two_deferred_entries_2_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {CtxC1,_,_}=StateC1} = update(?ENABLE_FLAG_B, c, InitialState), + {ok, {CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, {_CtxB2,_,_}=StateB2} = update(?REMOVE_FIELD_XA, b, StateB1, CtxB1), + {ok, {CtxB3,_,_}=StateB3} = update(?DISABLE_FLAG_B, b, StateB2, CtxC1), + StateAB = merge(StateA1,StateB3), + {ok, {_,_,_}=StateAB1} = update(?REMOVE_FIELD_XB, b, StateAB, CtxB3), + StateABC = merge(StateAB1,StateC1), + ?assertEqual([{{'X',riak_dt_map},[]}],value(StateABC)). + +clear_invisible_after_merge_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {_,_,_}=StateA2} = update(?ENABLE_FLAG_A, a, StateA1), + {ok, {CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, {CtxB2,_,_}=StateB2} = update(?ENABLE_FLAG_B, b, StateB1, CtxB1), + {ok, {_,_,_}=StateB3} = update(?REMOVE_FIELD_X, b, StateB2, CtxB2), + StateAB1 = merge(StateB3,StateA1), + StateAB2 = merge(StateB3,StateA2), + ?assertEqual([], value(StateAB1)), + ?assertEqual([{{'X',riak_dt_map},[{{'X.A',riak_dt_od_flag},true}]}], value(StateAB2)). + +clear_invisible_after_merge_2_test() -> + InitialState = new(), + {ok, {CtxA1,_,_}=_StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {_CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_B, b, InitialState,CtxA1), + {ok, {CtxB2,Map,_}=StateB2} = update(?ENABLE_FLAG_XYA, b, StateB1), + + %%Check that the element is there + {ok, {{_,_,_},{_,X,_}}} = ?DICT:find(?FIELD,Map), + {ok, {{_,_,_},{_,Y,_}}} = ?DICT:find(?FIELD_Y, X), + ?assertEqual(true,?DICT:is_key(?FIELD_A,Y)), + {ok, {_,_,_}=StateB3} = update(?REMOVE_FIELD_X, b, StateB2, CtxB2), + ?assertEqual([], value(StateB3)). + +clear_invisible_after_merge_set_test() -> + AddElemToS = fun(Elem) -> + {update, [{update, ?FIELD, {update, [{update, ?FIELD_S, {add, Elem}}]}}]} + end, + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(?ENABLE_FLAG_A, a, InitialState), + {ok, {CtxB1,_,_}=StateB1} = update(?DISABLE_FLAG_A, b, InitialState, CtxA1), + {ok, {CtxB2,_,_}=StateB2} = update(AddElemToS(0), b, StateB1, CtxB1), + {ok, {CtxB3,_,_}=StateB3} = update(?REMOVE_FIELD_X, b, StateB2, CtxB2), + {ok, {_,_,_}=StateB4} = update(AddElemToS(1), b, StateB3, CtxB3), + StateAB = merge(StateA1,StateB4), + ?assertEqual([{{'X',riak_dt_map}, [{{'X.S',riak_dt_orswot},[1]}]}], value(StateAB)). + +clear_invisible_after_merge_set_2_test() -> + AddElemToS = fun(Elem) -> + {update, [{update, ?FIELD, {update, [{update, ?FIELD_S, {add, Elem}}]}}]} + end, + RemElemFromS = fun(Elem) -> + {update, [{update, ?FIELD, {update, [{update, ?FIELD_S, {remove, Elem}}]}}]} + end, + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(AddElemToS(0), a, InitialState), + {ok, {CtxB1,_,_}=StateB1} = update(RemElemFromS(0), b, InitialState, CtxA1), + {ok, {CtxB2,_,_}=StateB2} = update(AddElemToS(1), b, StateB1, CtxB1), + {ok, {CtxB3,_,_}=StateB3} = update(?REMOVE_FIELD_X, b, StateB2, CtxB2), + {ok, {_,_,_}=StateB4} = update(AddElemToS(2), b, StateB3, CtxB3), + StateAB = merge(StateA1,StateB4), + ?assertEqual([{{'X',riak_dt_map}, [{{'X.S',riak_dt_orswot},[2]}]}], value(StateAB)). + +transaction_1_test() -> + Updt1 = {update, [{update, ?FIELD, {update, [{update, ?FIELD_A, enable}, {update, ?FIELD_B, enable}]}}]}, + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(Updt1, a, InitialState), + ?assertEqual([{{'X',riak_dt_map},[{{'X.A',riak_dt_od_flag},true}, + {{'X.B',riak_dt_od_flag},true}]}], value(StateA1)), + {ok, StateB1} = update(?REMOVE_FIELD_X, b, InitialState, CtxA1), + StateAB = merge(StateA1,StateB1), + ?assertEqual([], value(StateAB)). + +transaction_2_test() -> + Updt1 = {update, [{update, ?FIELD, {update, [{update, ?FIELD_A, enable}, {update, ?FIELD_B, enable}]}}]}, + InitialState = new(), + {ok, {CtxA1,_,_}=StateA1} = update(Updt1, a, InitialState), + {ok, StateB1} = update(?REMOVE_FIELD_XA, b, InitialState, CtxA1), + StateAB = merge(StateA1,StateB1), + ?assertEqual([{{'X',riak_dt_map},[{{'X.B',riak_dt_od_flag},true}]}], value(StateAB)), + {ok, StateA2} = update(?ENABLE_FLAG_A, a, StateA1), + StateA2B = merge(StateA2,StateB1), + ?assertEqual([{{'X',riak_dt_map},[{{'X.A',riak_dt_od_flag},true}, + {{'X.B',riak_dt_od_flag},true}]}], value(StateA2B)). + %% This fails on previous version of riak_dt_map assoc_test() -> Field = {'X', riak_dt_orswot}, @@ -886,18 +1158,14 @@ stat_test() -> {ok, Map2} = update({update, [{update, {l, riak_dt_lwwreg}, {assign, <<"foo">>, 3}}]}, a2, Map1), {ok, Map3} = update({update, [{update, {l, riak_dt_lwwreg}, {assign, <<"bar">>, 4}}]}, a3, Map1), Map4 = merge(Map2, Map3), - ?assertEqual([{actor_count, 0}, {field_count, 0}, {duplication, 0}, {deferred_length, 0}], stats(Map)), + ?assertEqual([{actor_count, 0}, {field_count, 0}, {deferred_length, 0}], stats(Map)), ?assertEqual(3, stat(actor_count, Map4)), ?assertEqual(5, stat(field_count, Map4)), ?assertEqual(undefined, stat(waste_pct, Map4)), - ?assertEqual(1, stat(duplication, Map4)), {ok, Map5} = update({update, [{update, {l3, riak_dt_lwwreg}, {assign, <<"baz">>, 5}}]}, a3, Map4), ?assertEqual(6, stat(field_count, Map5)), - ?assertEqual(1, stat(duplication, Map5)), - %% Updating field {l, riak_dt_lwwreg} merges the duplicates to a single field %% @see apply_ops {ok, Map6} = update({update, [{update, {l, riak_dt_lwwreg}, {assign, <<"bim">>, 6}}]}, a2, Map5), - ?assertEqual(0, stat(duplication, Map6)), {ok, Map7} = update({update, [{remove, {l, riak_dt_lwwreg}}]}, a1, Map6), ?assertEqual(5, stat(field_count, Map7)). @@ -910,24 +1178,6 @@ equals_test() -> ?assert(equal(C, D)), ?assert(equal(A, A)). -%% up/downgrade tests -v1_v2_test() -> - {ok, Map} = update({update, [{update, {<<"set">>, riak_dt_orswot}, {add_all, [<<"bar">>, <<"baz">>]}}]}, a, new()), - V2Bin = to_binary(?V2_VERS, Map), - V1Bin = to_binary(?V1_VERS, Map), - Map2 = from_binary(?V2_VERS, V1Bin), - V1Map = from_binary(?V1_VERS, V1Bin), - V1MapFromV2 = from_binary(?V1_VERS, V2Bin), - ?assertMatch(<>, V1Bin), - ?assertMatch(<>, V2Bin), - ?assert(equal(Map, Map2)), - ?assertEqual(V1MapFromV2, V1Map), - ?assertEqual(V2Bin, to_binary(Map)), - ?assertEqual(V1Bin, to_binary(?V1_VERS, V1Map)), - ?assert(equal(Map, from_binary(V2Bin))), - ?assert(equal(Map, from_binary(V1Bin))), - ?assert(equal(Map, from_binary(?V2_VERS, V2Bin))). - -ifdef(EQC). -define(NUMTESTS, 1000). @@ -940,7 +1190,7 @@ size(Map) -> byte_size(term_to_binary(Map)) div 10. generate() -> - ?LET({Ops, Actors}, {non_empty(list(gen_op())), non_empty(list(bitstring(16*8)))}, + ?LET({Ops, Actors}, {non_empty(list(gen_op())), non_empty(list(bitstring(16*8)))}, lists:foldl(fun(Op, Map) -> Actor = case length(Actors) of 1 -> hd(Actors); @@ -971,12 +1221,12 @@ gen_field() -> gen_field(Size) -> {growingelements(['A', 'B', 'C', 'X', 'Y', 'Z']) %% Macro? Bigger? - , elements([ - riak_dt_emcntr, - riak_dt_orswot, - riak_dt_lwwreg, - riak_dt_od_flag - ] ++ [riak_dt_map || Size > 0])}. + , elements([ + riak_dt_emcntr, + riak_dt_orswot, + %% riak_dt_lwwreg, + riak_dt_od_flag + ] ++ [?MODULE || Size > 0])}. gen_field_op({_Name, Type}, Size) -> Type:gen_op(Size). diff --git a/src/riak_dt_od_flag.erl b/src/riak_dt_od_flag.erl index 327d007..ca4469a 100644 --- a/src/riak_dt_od_flag.erl +++ b/src/riak_dt_od_flag.erl @@ -30,7 +30,7 @@ -export([to_binary/1, stats/1, stat/2]). -export([to_binary/2, from_binary/2]). -export([precondition_context/1]). --export([parent_clock/2]). +-export([parent_clock/2, get_deferred/1, clear/1, clear/2]). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -58,6 +58,9 @@ new() -> parent_clock(Clock, {_SetClock, Flag , Deferred}) -> {Clock, Flag, Deferred}. +-spec get_deferred(od_flag()) -> [riak_dt:context()]. +get_deferred({_, _, Deferred}) -> Deferred. + -spec value(od_flag()) -> boolean(). value({_, [], _}) -> false; value({_, _, _}) -> true. @@ -148,6 +151,11 @@ apply_deferred(Clock, Flag, Deferred) -> {Clock, Flag, []}, Deferred). +%No longer needed +clear(Flag) -> disable(Flag, undefined). + +clear(Flag, Ctx) -> disable(Flag, Ctx). + -spec equal(od_flag(), od_flag()) -> boolean(). equal({C1,D1, Def1},{C2,D2, Def2}) -> riak_dt_vclock:equal(C1,C2) andalso diff --git a/src/riak_dt_oe_flag.erl b/src/riak_dt_oe_flag.erl index 50ad0cf..0cc95e6 100644 --- a/src/riak_dt_oe_flag.erl +++ b/src/riak_dt_oe_flag.erl @@ -26,7 +26,7 @@ -behaviour(riak_dt). -export([new/0, value/1, value/2, update/3, merge/2, equal/2, from_binary/1, to_binary/1, stats/1, stat/2]). --export([update/4, parent_clock/2]). +-export([update/4, parent_clock/2, get_deferred/1]). -export([to_binary/2, from_binary/2]). -ifdef(EQC). @@ -72,6 +72,9 @@ update(Op, Actor, Flag, _Ctx) -> parent_clock(_Clock, Flag) -> Flag. +-spec get_deferred(oe_flag()) -> []. +get_deferred(_CRDT) -> []. + -spec merge(oe_flag(), oe_flag()) -> oe_flag(). merge({C1, F}, {C2,F}) -> %% When they are the same result (true or false), just merge the diff --git a/src/riak_dt_orset.erl b/src/riak_dt_orset.erl index 4a4afa2..e637050 100644 --- a/src/riak_dt_orset.erl +++ b/src/riak_dt_orset.erl @@ -27,7 +27,7 @@ %% API -export([new/0, value/1, update/3, merge/2, equal/2, to_binary/1, from_binary/1, value/2, precondition_context/1, stats/1, stat/2]). --export([update/4, parent_clock/2]). +-export([update/4, parent_clock/2, get_deferred/1]). -export([to_binary/2, from_binary/2]). -ifdef(EQC). @@ -118,6 +118,9 @@ update(Op, Actor, ORDict, _Ctx) -> parent_clock(_Clock, ORSet) -> ORSet. +-spec get_deferred(orset()) -> []. +get_deferred(_CRDT) -> []. + -spec merge(orset(), orset()) -> orset(). merge(ORDictA, ORDictB) -> orddict:merge(fun(_Elem,TokensA,TokensB) -> diff --git a/src/riak_dt_orswot.erl b/src/riak_dt_orswot.erl index 006dbc4..7eacb08 100644 --- a/src/riak_dt_orswot.erl +++ b/src/riak_dt_orswot.erl @@ -80,7 +80,7 @@ -export([to_binary/1, from_binary/1]). -export([to_binary/2, from_binary/2]). -export([precondition_context/1, stats/1, stat/2]). --export([parent_clock/2]). +-export([parent_clock/2, get_deferred/1, clear/1, clear/2]). %% EQC API -ifdef(EQC). @@ -132,6 +132,10 @@ new() -> parent_clock(Clock, {_SetClock, Entries, Deferred}) -> {Clock, Entries, Deferred}. +-spec get_deferred(orswot()) -> [riak_dt:context()]. +get_deferred({_, _, Deferred}) -> + lists:map(fun({Key, _}) -> Key end, ?DICT:to_list(Deferred)). + -spec value(orswot()) -> [member()]. value({_Clock, Entries, _Deferred}) -> lists:sort([K || {K, _Dots} <- ?DICT:to_list(Entries)]). @@ -375,6 +379,15 @@ remove_elem({ok, _VClock}, Elem, {Clock, Dict, Deferred}) -> remove_elem(_, Elem, _ORSet) -> {error, {precondition, {not_present, Elem}}}. +%No longer needed +clear({_, Entries, _}=ORSet) -> + Elems = ?DICT:fold(fun(Elem, _, Acc) -> [Elem | Acc] end, [], Entries), + remove_all(Elems, undefined, ORSet). + +clear({_, Entries, _}=ORSet, Ctx) -> + Elems = ?DICT:fold(fun(Elem, _, Acc) -> [Elem | Acc] end, [], Entries), + remove_all(Elems, undefined, ORSet, Ctx). + %% @doc the precondition context is a fragment of the CRDT that %% operations requiring certain pre-conditions can be applied with. %% Especially useful for hybrid op/state systems where the context of diff --git a/src/riak_dt_pncounter.erl b/src/riak_dt_pncounter.erl index cf5ffb5..a2d8c82 100644 --- a/src/riak_dt_pncounter.erl +++ b/src/riak_dt_pncounter.erl @@ -38,7 +38,7 @@ -export([new/0, new/2, value/1, value/2, update/3, merge/2, equal/2, to_binary/1, from_binary/1, stats/1, stat/2]). -export([to_binary/2, from_binary/2, current_version/1, change_versions/3]). --export([parent_clock/2, update/4]). +-export([update/4, parent_clock/2, get_deferred/1]). %% EQC API -ifdef(EQC). @@ -81,6 +81,9 @@ new(_Actor, _Zero) -> parent_clock(_Clock, Cntr) -> Cntr. +-spec get_deferred(pncounter()) -> []. +get_deferred(_CRDT) -> []. + %% @doc The single, total value of a `pncounter()' -spec value(pncounter()) -> integer(). value(PNCnt) ->