Skip to content

Commit

Permalink
Report only confirmed log entries as matching in AppendEntriesResponse
Browse files Browse the repository at this point in the history
Summary:
For successful appends, only report up to those log entries that were actually
confirmed to match the leader's log entries in the current append rather than
all log entries. This avoid establish false quorum when a prefix of the log matches.

Reviewed By: jaher

Differential Revision: D67120345

fbshipit-source-id: b6c55104547157a9c0a235671c35a0f9f0e49364
  • Loading branch information
hsun324 authored and facebook-github-bot committed Dec 12, 2024
1 parent ebc2504 commit 9870562
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 77 deletions.
12 changes: 7 additions & 5 deletions src/wa_raft_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ append(#log_view{last = Last} = View, Entries) ->
%% all log entries after the mismatching log will be replaced with the new log
%% entries provided.
-spec append(View :: view(), Start :: log_index(), Entries :: [log_entry()]) ->
{ok, LastIndex :: log_index(), NewView :: view()} | wa_raft:error().
{ok, MatchIndex :: log_index(), NewView :: view()} | wa_raft:error().
append(View, Start, _Entries) when Start =< 0 ->
?LOG_ERROR("[~p] rejecting append starting at invalid start index ~p", [log_name(View), Start], #{domain => [whatsapp, wa_raft]}),
{error, invalid_start_index};
Expand All @@ -336,8 +336,9 @@ append(#log_view{log = Log, last = Last} = View0, Start, Entries) ->
ok ->
?RAFT_COUNT('raft.log.append.ok'),
View1 = update_config_cache(View0, Start, Entries),
NewLast = max(Last, Start + length(Entries) - 1),
{ok, NewLast, View1#log_view{last = NewLast}};
NewMatch = Start + length(Entries) - 1,
NewLast = max(Last, NewMatch),
{ok, NewMatch, View1#log_view{last = NewLast}};
{mismatch, Index} ->
?RAFT_COUNT('raft.log.append.mismatch'),
case truncate(View0, Index) of
Expand All @@ -347,8 +348,9 @@ append(#log_view{log = Log, last = Last} = View0, Start, Entries) ->
ok ->
?RAFT_COUNT('raft.log.append.ok'),
View2 = update_config_cache(View1, Start, NewEntries),
NewLast = max(Index - 1, Start + length(Entries) - 1),
{ok, NewLast, View2#log_view{last = NewLast}};
NewMatch = Start + length(Entries) - 1,
NewLast = max(Index - 1, NewMatch),
{ok, NewMatch, View2#log_view{last = NewLast}};
{error, Reason} ->
?RAFT_COUNT('raft.log.append.error'),
{error, Reason}
Expand Down
92 changes: 20 additions & 72 deletions src/wa_raft_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -914,83 +914,33 @@ leader(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, ?APPE
_ -> cancel_bulk_logs_for_follower(Sender, State1)
end,

% Here, the RAFT protocol expects that the MatchIndex for the follower be set to
% the log index of the last log entry replicated by the AppendEntries RPC that
% triggered this AppendEntriesResponse RPC. However, we do not have enough state
% here to figure out that log index so instead assume that the follower's log
% matches completely after a successful AppendEntries RPC.
%
% This is perfectly valid during normal operations after the leadership has been
% stable for a while since all replication at that point occurs at the end of
% the log, and so FollowerEndIndex =:= PrevLogIndex + length(Entries). However,
% this may not be true at the start of a term.
%
% In our implementation of the RAFT protocol, the leader of a new term always
% appends a new log entry created by itself (with the new term) to the end of
% the log before starting replication (hereinafter the "initial log entry").
% We store the index of the initial log entry in FirstCurrentTermLogIndex.
% For all followers, NextIndex is initialized to FirstCurrentTermLogIndex so
% replication for the new term always starts from the initial log entry. In
% addition, the leader will refuse to commit any log entries until it finds
% a quorum that contains at least the initial log entry has been established.
%
% Note that since the initial log entry is created by the RAFT leader at the
% start of a new term, it is impossible for followers with a log at least as
% long as the leader's to match. After the first round of AppendEntries, all
% followers will either match the leader or will have a log whose last log
% index is lesser than FirstCurrentTermLogIndex.
%
% * For any followers whose log matches, the condition is trivial.
% * For any followers whose log does not match and whose log ends with a log
% entry with an index lesser than (FirstCurrentTermLogIndex - 1), the first
% AppendEntries will fail due to the the previous log entry being missing.
% * For any followers whose log does not match and whose log ends with a log
% entry with an index at least (FirstCurrentTermLogIndex - 1), the first
% AppendEntries RPC will contain the initial log entry, which is guaranteed
% to not match, resulting in the log being truncated to end with the log
% entry at (FirstCurrentTermLogIndex - 1). Subsequent mismatches of this
% type will be detected by mismatching PrevLogIndex and PrevLogTerm.
%
% By the liveness of the RAFT protocol, since the AppendEntries step always
% results in a FollowerEndIndex that is less than FirstCurrentTermLogIndex
% if the follower's log does not match the leader's, we can conclude that
% once FollowerEndIndex reaches FirstCurrentTermLogIndex, the follower must
% have a matching log. Thus, once the quorum MatchIndex reaches a log index
% at least FirstCurrentTermLogIndex (a.k.a. the initial log entry), we can
% be sure that a quorum has been formed even when setting MatchIndex to
% FollowerEndIndex for all AppendEntries.

MatchIndex1 = maps:put(FollowerId, FollowerEndIndex, MatchIndex0),
OldNextIndex = maps:get(FollowerId, NextIndex0, TermStartIndex),
NextIndex1 = maps:put(FollowerId, erlang:max(OldNextIndex, FollowerEndIndex + 1), NextIndex0),

State2 = State1#raft_state{match_index = MatchIndex1, next_index = NextIndex1},
State3 = maybe_apply(FollowerEndIndex, State2),
State3 = maybe_apply(State2),
?RAFT_GATHER('raft.leader.apply.func', timer:now_diff(os:timestamp(), StartT)),
{keep_state, maybe_heartbeat(State3), ?HEARTBEAT_TIMEOUT(State3)};

%% and failures.
leader(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, false, FollowerEndIndex)),
#raft_state{name = Name, current_term = CurrentTerm, next_index = NextIndex0, match_index = MatchIndex0} = State0) ->
leader(cast, ?REMOTE(?IDENTITY_REQUIRES_MIGRATION(_, FollowerId) = Sender, ?APPEND_ENTRIES_RESPONSE(_PrevLogIndex, false, FollowerEndIndex)),
#raft_state{name = Name, current_term = CurrentTerm, next_index = NextIndex0} = State0) ->
?RAFT_COUNT('raft.leader.append.failure'),
?LOG_DEBUG("Server[~0p, term ~0p, leader] append failure for follower ~p. Follower reports local log ends at ~0p.",
?LOG_DEBUG("Server[~0p, term ~0p, leader] append failure for follower ~p. Follower reports log matches up to ~0p.",
[Name, CurrentTerm, Sender, FollowerEndIndex], #{domain => [whatsapp, wa_raft]}),

select_follower_replication_mode(FollowerEndIndex, State0) =:= snapshot andalso
request_snapshot_for_follower(FollowerId, State0),
cancel_bulk_logs_for_follower(Sender, State0),

% See comment in successful branch of AppendEntriesResponse RPC handling for
% reasoning as to why it is safe to set MatchIndex to FollowerEndIndex for this
% RAFT implementation.
MatchIndex1 = maps:put(FollowerId, FollowerEndIndex, MatchIndex0),
% We must trust the follower's last log index here because the follower may have
% applied a snapshot since the last successful heartbeat. In such case, we need
% to fast-forward the follower's next index so that we resume replication at the
% point after the snapshot.
NextIndex1 = maps:put(FollowerId, FollowerEndIndex + 1, NextIndex0),
State1 = State0#raft_state{next_index = NextIndex1, match_index = MatchIndex1},
State2 = maybe_apply(min(PrevLogIndex, FollowerEndIndex), State1),
State1 = State0#raft_state{next_index = NextIndex1},
State2 = maybe_apply(State1),
{keep_state, maybe_heartbeat(State2), ?HEARTBEAT_TIMEOUT(State2)};

%% [RequestVote RPC] We are already leader for the current term, so always decline votes (5.1, 5.2)
Expand Down Expand Up @@ -2131,15 +2081,15 @@ apply_single_node_cluster(#raft_state{name = Name, log_view = View0} = State0) -
{ok, L} -> L;
_ -> View0
end,
maybe_apply(infinity, State0#raft_state{log_view = View1});
maybe_apply(State0#raft_state{log_view = View1});
_ ->
State0
end.

%% Leader - check quorum and apply logs if necessary
-spec maybe_apply(EndIndex :: infinity | wa_raft_log:log_index(), State0 :: #raft_state{}) -> State1 :: #raft_state{}.
maybe_apply(EndIndex, #raft_state{name = Name, log_view = View, current_term = CurrentTerm,
match_index = MatchIndex, commit_index = LastCommitIndex, last_applied = LastAppliedIndex} = State0) when EndIndex > LastCommitIndex ->
-spec maybe_apply(Data :: #raft_state{}) -> #raft_state{}.
maybe_apply(#raft_state{name = Name, log_view = View, current_term = CurrentTerm,
match_index = MatchIndex, commit_index = LastCommitIndex, last_applied = LastAppliedIndex} = State0) ->
% Raft paper section 5.4.3 - Only log entries from the leader’s current term are committed
% by counting replicas; once an entry from the current term has been committed in this way,
% then all prior entries are committed indirectly because of the View Matching Property
Expand All @@ -2156,17 +2106,15 @@ maybe_apply(EndIndex, #raft_state{name = Name, log_view = View, current_term = C
{ok, Term} when Term < CurrentTerm ->
% Raft paper section 5.4.3 - as a leader, don't commit entries from previous term if no log entry of current term has applied yet
?RAFT_COUNT('raft.apply.delay.old'),
?LOG_WARNING("Server[~0p, term ~0p, leader] delays commit of log entry ~0p with old term ~0p.",
[Name, CurrentTerm, EndIndex, Term], #{domain => [whatsapp, wa_raft]}),
?LOG_WARNING("Server[~0p, term ~0p, leader] delays commit of log up to ~0p due to old term ~0p.",
[Name, CurrentTerm, CommitIndex, Term], #{domain => [whatsapp, wa_raft]}),
State0;
_ ->
State0
end;
_ ->
State0 %% no quorum yet
end;
maybe_apply(_EndIndex, State) ->
State.
end.

% Return the max index to potentially apply on the leader. This is the latest log index that
% has achieved replication on at least a quorum of nodes in the current RAFT cluster.
Expand Down Expand Up @@ -2503,17 +2451,17 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi
[Name, CurrentTerm, State, EntryCount, PrevLogIndex + 1, PrevLogIndex + EntryCount, wa_raft_log:last_index(View)], #{domain => [whatsapp, wa_raft]}),

case append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, Data0) of
{ok, Accepted, NewLastIndex, Data1} ->
send_rpc(Leader, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, Accepted, NewLastIndex), Data1),
reply(Event, ?LEGACY_APPEND_ENTRIES_RESPONSE_RPC(CurrentTerm, node(), PrevLogIndex, Accepted, NewLastIndex)),
{ok, Accepted, NewMatchIndex, Data1} ->
send_rpc(Leader, ?APPEND_ENTRIES_RESPONSE(PrevLogIndex, Accepted, NewMatchIndex), Data1),
reply(Event, ?LEGACY_APPEND_ENTRIES_RESPONSE_RPC(CurrentTerm, node(), PrevLogIndex, Accepted, NewMatchIndex)),

LocalTrimIndex = case ?RAFT_LOG_ROTATION_BY_TRIM_INDEX(App) of
true -> TrimIndex;
false -> infinity
end,
Data2 = Data1#raft_state{leader_heartbeat_ts = erlang:monotonic_time(millisecond)},
Data3 = case Accepted of
true -> apply_log(Data2, min(CommitIndex, NewLastIndex), LocalTrimIndex, undefined);
true -> apply_log(Data2, min(CommitIndex, NewMatchIndex), LocalTrimIndex, undefined);
_ -> Data2
end,
check_follower_lagging(CommitIndex, Data3),
Expand All @@ -2529,7 +2477,7 @@ handle_heartbeat(State, Event, Leader, PrevLogIndex, PrevLogTerm, Entries, Commi
%% is encountered, returns a diagnostic that can be used as a reason to
%% disable the current replica.
-spec append_entries(State :: state(), PrevLogIndex :: wa_raft_log:log_index(), PrevLogTerm :: wa_raft_log:log_term(), Entries :: [wa_raft_log:log_entry()], EntryCount :: non_neg_integer(), Data :: #raft_state{}) ->
{ok, Accepted :: boolean(), NewLastIndex :: wa_raft_log:log_index(), NewData :: #raft_state{}} | {fatal, Reason :: term()}.
{ok, Accepted :: boolean(), NewMatchIndex :: wa_raft_log:log_index(), NewData :: #raft_state{}} | {fatal, Reason :: term()}.
append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_state{name = Name, log_view = View, last_applied = LastApplied, current_term = CurrentTerm, leader_id = LeaderId} = Data) ->
% Inspect the locally stored term associated with the previous log entry to discern if
% appending the provided range of log entries is allowed.
Expand All @@ -2538,8 +2486,8 @@ append_entries(State, PrevLogIndex, PrevLogTerm, Entries, EntryCount, #raft_stat
% If the term of the log entry previous the entries to be applied matches the term stored
% with the previous log entry in the local RAFT log, then this follower can proceed with
% appending to the log.
{ok, NewLastIndex, NewView} = wa_raft_log:append(View, PrevLogIndex + 1, Entries),
{ok, true, NewLastIndex, Data#raft_state{log_view = NewView}};
{ok, NewMatchIndex, NewView} = wa_raft_log:append(View, PrevLogIndex + 1, Entries),
{ok, true, NewMatchIndex, Data#raft_state{log_view = NewView}};
{ok, LocalPrevLogTerm} ->
% If the term of the log entry proceeding the entries to be applied does not match the log
% entry stored with the previous log entry in the local RAFT log, then we need to truncate
Expand Down

0 comments on commit 9870562

Please sign in to comment.