Skip to content

Commit

Permalink
raft: export LogSlice
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Oct 14, 2024
1 parent a0f39e7 commit ed7709b
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 103 deletions.
2 changes: 1 addition & 1 deletion pkg/raft/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
// TODO(tbg): remove StartNode and give the application the right tools to
// bootstrap the initial membership in a cleaner way.
rn.raft.becomeFollower(1, None)
app := logSlice{term: 1, entries: make([]pb.Entry, 0, len(peers))}
app := LogSlice{term: 1, entries: make([]pb.Entry, 0, len(peers))}
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
data, err := cc.Marshal()
Expand Down
18 changes: 9 additions & 9 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type LogSnapshot struct {
// storage contains the stable log entries.
storage LogStorage
// unstable contains the unstable log entries.
unstable logSlice
unstable LogSlice
// logger gives access to logging errors.
logger raftlogger.Logger
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func (l *raftLog) accTerm() uint64 {
// the log (so this log slice is insufficient to make our log consistent with
// the leader log), the slice is out of bounds (appending it would introduce a
// gap), or a.term is outdated.
func (l *raftLog) maybeAppend(a logSlice) bool {
func (l *raftLog) maybeAppend(a LogSlice) bool {
match, ok := l.match(a)
if !ok {
return false
Expand All @@ -179,7 +179,7 @@ func (l *raftLog) maybeAppend(a logSlice) bool {
//
// Returns false if the operation can not be done: entry a.prev does not match
// the lastEntryID of this log, or a.term is outdated.
func (l *raftLog) append(a logSlice) bool {
func (l *raftLog) append(a LogSlice) bool {
return l.unstable.append(a)
}

Expand All @@ -191,8 +191,8 @@ func (l *raftLog) append(a logSlice) bool {
//
// All the entries up to the returned index are already present in the log, and
// do not need to be rewritten. The caller can safely fast-forward the appended
// logSlice to this index.
func (l *raftLog) match(s logSlice) (uint64, bool) {
// LogSlice to this index.
func (l *raftLog) match(s LogSlice) (uint64, bool) {
if !l.matchTerm(s.prev) {
return 0, false
}
Expand Down Expand Up @@ -558,13 +558,13 @@ func (l LogSnapshot) LogSlice(lo, hi uint64, maxSize uint64) (LogSlice, error) {
if err != nil {
// The log is probably compacted at index > lo (err == ErrCompacted), or it
// can be a custom storage error.
return logSlice{}, err
return LogSlice{}, err
}
ents, err := l.slice(lo, hi, entryEncodingSize(maxSize))
if err != nil {
return logSlice{}, err
return LogSlice{}, err
}
return logSlice{
return LogSlice{
term: l.unstable.term,
prev: entryID{term: prevTerm, index: lo},
entries: ents,
Expand Down Expand Up @@ -659,7 +659,7 @@ func (l *raftLog) snap(storage LogStorage) LogSnapshot {
return LogSnapshot{
first: l.firstIndex(),
storage: storage,
unstable: l.unstable.logSlice,
unstable: l.unstable.LogSlice,
logger: l.logger,
}
}
26 changes: 13 additions & 13 deletions pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestMatch(t *testing.T) {
ids[i] = entryID{term: init.termAt(uint64(i)), index: uint64(i)}
}
for _, tt := range []struct {
sl logSlice
sl LogSlice
notOk bool
want uint64
}{
Expand All @@ -42,7 +42,7 @@ func TestMatch(t *testing.T) {
{sl: entryID{term: 4, index: 1}.append(4, 4), notOk: true},
{sl: entryID{term: 5, index: 2}.append(5, 6), notOk: true},
// no conflict, empty slice
{sl: logSlice{}, want: 0},
{sl: LogSlice{}, want: 0},
// no conflict
{sl: ids[0].append(1, 2, 3), want: 3},
{sl: ids[1].append(2, 3), want: 3},
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestFindConflictByTerm(t *testing.T) {
noSnap := entryID{}
snap10 := entryID{term: 3, index: 10}
for _, tt := range []struct {
sl logSlice
sl LogSlice
index uint64
term uint64
want uint64
Expand Down Expand Up @@ -155,12 +155,12 @@ func TestIsUpToDate(t *testing.T) {
func TestAppend(t *testing.T) {
init := entryID{}.append(1, 2, 2)
for _, tt := range []struct {
app logSlice
want logSlice
app LogSlice
want LogSlice
notOk bool
}{
// appends not at the end of the log
{app: logSlice{}, notOk: true},
{app: LogSlice{}, notOk: true},
{app: entryID{term: 1, index: 1}.append(3), notOk: true},
{app: entryID{term: 2, index: 4}.append(3), notOk: true},
// appends at the end of the log
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestLogMaybeAppend(t *testing.T) {
commit := uint64(1)

for _, tt := range []struct {
app logSlice
app LogSlice
want []pb.Entry
notOk bool
panic bool
Expand All @@ -222,7 +222,7 @@ func TestLogMaybeAppend(t *testing.T) {
{app: last.append(4), want: index(1).terms(1, 2, 3, 4)},
{app: last.append(4, 4), want: index(1).terms(1, 2, 3, 4, 4)},
// appends from before the end of the log
{app: logSlice{}, want: init.entries},
{app: LogSlice{}, want: init.entries},
{app: entryID{term: 1, index: 1}.append(4), want: index(1).terms(1, 4)},
{app: entryID{term: 1, index: 1}.append(4, 4), want: index(1).terms(1, 4, 4)},
{app: entryID{term: 2, index: 2}.append(4), want: index(1).terms(1, 2, 4)},
Expand Down Expand Up @@ -507,7 +507,7 @@ func TestAppliedTo(t *testing.T) {
// the entries correctly, before and after making them stable.
func TestNextUnstableEnts(t *testing.T) {
init := entryID{}.append(1, 2)
for _, tt := range []logSlice{
for _, tt := range []LogSlice{
init.lastEntryID().append(),
init.lastEntryID().append(2, 2),
init.lastEntryID().append(3, 4, 5, 6),
Expand Down Expand Up @@ -590,7 +590,7 @@ func TestStableToWithSnap(t *testing.T) {
snapID := entryID{term: 2, index: 5}
snap := pb.Snapshot{Metadata: pb.SnapshotMetadata{Term: snapID.term, Index: snapID.index}}
for _, tt := range []struct {
sl logSlice
sl LogSlice
to LogMark
want uint64 // prev.index
}{
Expand Down Expand Up @@ -953,15 +953,15 @@ func (i index) termRange(from, to uint64) []pb.Entry {
return i.terms(intRange(from, to)...)
}

// append generates a valid logSlice of entries appended after the given entry
// append generates a valid LogSlice of entries appended after the given entry
// ID, at indices [id.index+1, id.index+len(terms)], with the given terms of
// each entry. Terms must be >= id.term, and non-decreasing.
func (id entryID) append(terms ...uint64) logSlice {
func (id entryID) append(terms ...uint64) LogSlice {
term := id.term
if ln := len(terms); ln != 0 {
term = terms[ln-1]
}
ls := logSlice{
ls := LogSlice{
term: term,
prev: id,
entries: index(id.index + 1).terms(terms...),
Expand Down
50 changes: 25 additions & 25 deletions pkg/raft/log_unstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
// "log" can be represented by a snapshot, and/or a contiguous slice of entries.
//
// The possible states:
// 1. Both the snapshot and the entries logSlice are empty. This means the log
// is fully in Storage. The logSlice.prev is the lastEntryID of the log.
// 2. The snapshot is empty, and the logSlice is non-empty. The state up to
// (including) logSlice.prev is in Storage, and the logSlice is pending.
// 3. The snapshot is non-empty, and the logSlice is empty. The snapshot
// 1. Both the snapshot and the entries LogSlice are empty. This means the log
// is fully in Storage. The LogSlice.prev is the lastEntryID of the log.
// 2. The snapshot is empty, and the LogSlice is non-empty. The state up to
// (including) LogSlice.prev is in Storage, and the LogSlice is pending.
// 3. The snapshot is non-empty, and the LogSlice is empty. The snapshot
// overrides the entire log in Storage.
// 4. Both the snapshot and logSlice are non-empty. The snapshot immediately
// precedes the entries, i.e. logSlice.prev == snapshot.lastEntryID. This
// 4. Both the snapshot and LogSlice are non-empty. The snapshot immediately
// precedes the entries, i.e. LogSlice.prev == snapshot.lastEntryID. This
// state overrides the entire log in Storage.
//
// The type serves two roles. First, it holds on to the latest snapshot / log
Expand All @@ -50,7 +50,7 @@ import (
// no strict requirement on the order of acknowledgement delivery.
//
// TODO(pav-kv): describe the order requirements in more detail when accTerm
// (logSlice.term) is integrated into the protocol.
// (LogSlice.term) is integrated into the protocol.
//
// Note that the in-memory prefix of the log can contain entries at indices less
// than Storage.LastIndex(). This means that the next write to storage might
Expand All @@ -64,31 +64,31 @@ type unstable struct {
// snapshot is the pending unstable snapshot, if any.
//
// Invariant: snapshot == nil ==> !snapshotInProgress
// Invariant: snapshot != nil ==> snapshot.lastEntryID == logSlice.prev
// Invariant: snapshot != nil ==> snapshot.lastEntryID == LogSlice.prev
//
// The last invariant enforces the order of handling a situation when there is
// both a snapshot and entries. The snapshot write must be acknowledged first,
// before entries are acknowledged and the logSlice moves forward.
// before entries are acknowledged and the LogSlice moves forward.
snapshot *pb.Snapshot

// logSlice is the suffix of the raft log that is not yet written to storage.
// LogSlice is the suffix of the raft log that is not yet written to storage.
// If all the entries are written, or covered by the pending snapshot, then
// logSlice.entries is empty.
// LogSlice.entries is empty.
//
// Invariant: snapshot != nil ==> logSlice.prev == snapshot.lastEntryID
// Invariant: snapshot == nil ==> logSlice.prev is in Storage
// Invariant: logSlice.lastEntryID() is the end of the log at all times
// Invariant: snapshot != nil ==> LogSlice.prev == snapshot.lastEntryID
// Invariant: snapshot == nil ==> LogSlice.prev is in Storage
// Invariant: LogSlice.lastEntryID() is the end of the log at all times
//
// Invariant: logSlice.term, a.k.a. the "last accepted term", is the term of
// Invariant: LogSlice.term, a.k.a. the "last accepted term", is the term of
// the leader whose append (either entries or snapshot) we accepted last. Our
// state is consistent with the leader log at this term.
logSlice
LogSlice

// snapshotInProgress is true if the snapshot is being written to storage.
//
// Invariant: snapshotInProgress ==> snapshot != nil
snapshotInProgress bool
// entryInProgress is the index of the last entry in logSlice already present
// entryInProgress is the index of the last entry in LogSlice already present
// in, or being written to storage.
//
// Invariant: prev.index <= entryInProgress <= lastIndex()
Expand All @@ -103,7 +103,7 @@ type unstable struct {
}

func newUnstable(last entryID, logger raftlogger.Logger) unstable {
// To initialize the last accepted term (logSlice.term) correctly, we make
// To initialize the last accepted term (LogSlice.term) correctly, we make
// sure its invariant is true: the log is a prefix of the term's leader's log.
// This can be achieved by conservatively initializing to the term of the last
// log entry.
Expand All @@ -119,7 +119,7 @@ func newUnstable(last entryID, logger raftlogger.Logger) unstable {
// leader Term) gives us more information about the log, and then allows
// bumping its commit index sooner than when the next MsgApp arrives.
return unstable{
logSlice: logSlice{term: last.term, prev: last},
LogSlice: LogSlice{term: last.term, prev: last},
entryInProgress: last.index,
logger: logger,
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (u *unstable) stableTo(mark LogMark) {
u.logger.Panicf("mark %+v acked earlier than the snapshot(in-progress=%t): %s",
mark, u.snapshotInProgress, DescribeSnapshot(*u.snapshot))
}
u.logSlice = u.forward(mark.Index)
u.LogSlice = u.forward(mark.Index)
// TODO(pav-kv): why can mark.index overtake u.entryInProgress? Probably bugs
// in tests using the log writes incorrectly, e.g. TestLeaderStartReplication
// takes nextUnstableEnts() without acceptInProgress().
Expand Down Expand Up @@ -246,15 +246,15 @@ func (u *unstable) restore(s snapshot) bool {
term := max(u.term, s.term)

u.snapshot = &s.snap
u.logSlice = logSlice{term: term, prev: s.lastEntryID()}
u.LogSlice = LogSlice{term: term, prev: s.lastEntryID()}
u.snapshotInProgress = false
u.entryInProgress = u.prev.index
return true
}

// append adds the given log slice to the end of the log. Returns false if this
// can not be done.
func (u *unstable) append(a logSlice) bool {
func (u *unstable) append(a LogSlice) bool {
if a.term < u.term {
return false // append from an outdated log
} else if a.prev != u.lastEntryID() {
Expand All @@ -265,7 +265,7 @@ func (u *unstable) append(a logSlice) bool {
return true
}

func (u *unstable) truncateAndAppend(a logSlice) bool {
func (u *unstable) truncateAndAppend(a LogSlice) bool {
if a.term < u.term {
return false // append from an outdated log
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func (u *unstable) truncateAndAppend(a logSlice) bool {
// Truncate the log and append new entries. Regress the entryInProgress mark
// to reflect that the truncated entries are no longer considered in progress.
if a.prev.index <= u.prev.index {
u.logSlice = a // replace the entire logSlice with the latest append
u.LogSlice = a // replace the entire LogSlice with the latest append
// TODO(pav-kv): clean up the logging message. It will change all datadriven
// test outputs, so do it in a contained PR.
u.logger.Infof("replace the unstable entries from index %d", a.prev.index+1)
Expand Down
Loading

0 comments on commit ed7709b

Please sign in to comment.