Skip to content

Commit

Permalink
rac2: use the real raft LogSlice
Browse files Browse the repository at this point in the history
Epic: none
Release note: none
  • Loading branch information
pav-kv committed Oct 14, 2024
1 parent ed7709b commit 4479113
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_test(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/raft/tracker",
"//pkg/roachpb",
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -144,7 +145,7 @@ type RaftInterface interface {
//
// If it returns true, all the entries in the slice are in the message, and
// Next is advanced to be equal to end.
SendMsgAppRaftMuLocked(replicaID roachpb.ReplicaID, slice RaftLogSlice) (raftpb.Message, bool)
SendMsgAppRaftMuLocked(replicaID roachpb.ReplicaID, slice raft.LogSlice) (raftpb.Message, bool)
}

// RaftLogSnapshot abstract raft.LogSnapshot.
Expand All @@ -163,11 +164,9 @@ type RaftLogSnapshot interface {
//
// NB: the [start, end) interval is different from RawNode.LogSlice which
// accepts an open-closed interval.
LogSlice(start, end uint64, maxSize uint64) (RaftLogSlice, error)
LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error)
}

type RaftLogSlice interface{}

// RaftMsgAppMode specifies how Raft (at the leader) generates MsgApps. In
// both modes, Raft knows that (Match(i), Next(i)) are in-flight for a
// follower i.
Expand Down
27 changes: 16 additions & 11 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -422,31 +423,35 @@ func (r *testingRCRange) ScheduleControllerEvent(rangeID roachpb.RangeID) {
r.mu.scheduleControllerEventCount++
}

func (r *testingRCRange) LogSlice(start, end uint64, maxSize uint64) (RaftLogSlice, error) {
func (r *testingRCRange) LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error) {
if start >= end {
panic("start >= end")
}
msg := raftpb.Message{
Type: raftpb.MsgApp,
}
var size uint64
var entries []raftpb.Entry
for _, entry := range r.entries {
if entry.Index >= start && entry.Index < end {
msg.Entries = append(msg.Entries, entry)
entries = append(entries, entry)
size += uint64(len(entry.Data))
if size > maxSize {
break
}
}
}
return msg, nil
// TODO(pav-kv): use a real LogSnapshot and construct a correct LogSlice.
return raft.MakeLogSlice(entries), nil
}

func (r *testingRCRange) SendMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, slice RaftLogSlice,
replicaID roachpb.ReplicaID, ls raft.LogSlice,
) (raftpb.Message, bool) {
msg := slice.(raftpb.Message)
msg.To = raftpb.PeerID(replicaID)
// TODO(pav-kv): populate the message correctly.
// TODO(pav-kv): use the real RawNode instead of fakes.
msg := raftpb.Message{
Type: raftpb.MsgApp,
To: raftpb.PeerID(replicaID),
Entries: ls.Entries(),
}
r.mu.Lock()
defer r.mu.Unlock()
testR, ok := r.mu.r.replicaSet[replicaID]
Expand Down Expand Up @@ -1512,8 +1517,8 @@ func testingFirst(args ...interface{}) interface{} {

type testLogSnapshot struct{}

func (testLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (RaftLogSlice, error) {
return nil, nil
func (testLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error) {
return raft.LogSlice{}, nil
}

func TestRaftEventFromMsgStorageAppendAndMsgAppsBasic(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_test(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/raftlog",
"//pkg/raft",
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/settings/cluster",
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -94,7 +95,7 @@ func (rn *testRaftNode) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {
}

func (rn *testRaftNode) SendMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, slice rac2.RaftLogSlice,
_ roachpb.ReplicaID, _ raft.LogSlice,
) (raftpb.Message, bool) {
panic("unimplemented")
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/raft_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ func (rn raftNodeForRACv2) SendPingRaftMuLocked(to roachpb.ReplicaID) bool {

// SendMsgAppRaftMuLocked implements rac2.RaftInterface.
func (rn raftNodeForRACv2) SendMsgAppRaftMuLocked(
replicaID roachpb.ReplicaID, slice rac2.RaftLogSlice,
replicaID roachpb.ReplicaID, ls raft.LogSlice,
) (raftpb.Message, bool) {
ls := slice.(raft.LogSlice)
rn.r.MuLock()
defer rn.r.MuUnlock()
return rn.RawNode.SendMsgApp(raftpb.PeerID(replicaID), ls)
Expand All @@ -79,6 +78,6 @@ type RaftLogSnapshot raft.LogSnapshot
var _ rac2.RaftLogSnapshot = RaftLogSnapshot{}

// LogSlice implements rac2.RaftLogSnapshot.
func (l RaftLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (rac2.RaftLogSlice, error) {
func (l RaftLogSnapshot) LogSlice(start, end uint64, maxSize uint64) (raft.LogSlice, error) {
return (raft.LogSnapshot(l)).LogSlice(start-1, end-1, maxSize)
}
15 changes: 15 additions & 0 deletions pkg/raft/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,21 @@ type LogSlice struct {
entries []pb.Entry
}

// MakeLogSlice creates a fake log slice containing the supplied entries. Only
// for testing.
//
// TODO(pav-kv): this is not a correct LogSlice. Remove this function, and help
// construct a correct one.
func MakeLogSlice(entries []pb.Entry) LogSlice {
return LogSlice{entries: entries}
}

// Entries returns the log entries covered by this slice. The returned slice
// must not be mutated.
func (s LogSlice) Entries() []pb.Entry {
return s.entries
}

// lastIndex returns the index of the last entry in this log slice. Returns
// prev.index if there are no entries.
func (s LogSlice) lastIndex() uint64 {
Expand Down

0 comments on commit 4479113

Please sign in to comment.