Skip to content

Commit

Permalink
Simplify participant locking and expose current GPBFT progress
Browse files Browse the repository at this point in the history
The host implementation needs to know the progress of the current
instance for certificate storage and catch up purposes. As a result the
current instance must be safe to read from multiple goroutines. This was
achieved by a dedicated mutex that synchronised all access to the
current instanceID.

Separately, validation logic requires to know how far the current
instance has progressed in order to effectively validate incoming
messages.

The changes here simplify the locking by unifying the logic for checking
the progress of an instance: both validator and host can now get the
latest progress from the participant that is safe for concurrent use.
This is achieved by moving the progress observer mechanism out of
validator and into the participant, while introducing the concept of
`gpbft.Progress` a function that returns the current instance, round and
phase.

As part of this change, Lotus integration can now get the current round
and phase as well as current instance, which is useful for self
equivocation checking as well as debugging purposes.

Fixes #658
Relates to #599
  • Loading branch information
masih committed Sep 26, 2024
1 parent 12c0356 commit e30c8aa
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 106 deletions.
21 changes: 9 additions & 12 deletions gpbft/gpbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ type instance struct {
// independently of protocol phases/rounds.
decision *quorumState
// tracer traces logic logs for debugging and simulation purposes.
tracer Tracer
progress ProgressObserver
tracer Tracer
}

func newInstance(
Expand All @@ -222,8 +221,7 @@ func newInstance(
data *SupplementalData,
powerTable *PowerTable,
aggregateVerifier Aggregate,
beacon []byte,
progress ProgressObserver) (*instance, error) {
beacon []byte) (*instance, error) {
if input.IsZero() {
return nil, fmt.Errorf("input is empty")
}
Expand Down Expand Up @@ -253,7 +251,6 @@ func newInstance(
},
decision: newQuorumState(powerTable),
tracer: participant.tracer,
progress: progress,
}, nil
}

Expand Down Expand Up @@ -486,7 +483,7 @@ func (i *instance) beginQuality() error {
}
// Broadcast input value and wait to receive from others.
i.phase = QUALITY_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.participant.progression.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()
i.broadcast(i.round, QUALITY_PHASE, i.proposal, false, nil)
Expand Down Expand Up @@ -541,7 +538,7 @@ func (i *instance) beginConverge(justification *Justification) {
}

i.phase = CONVERGE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.participant.progression.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()

Expand Down Expand Up @@ -604,7 +601,7 @@ func (i *instance) tryConverge() error {
func (i *instance) beginPrepare(justification *Justification) {
// Broadcast preparation of value and wait for everyone to respond.
i.phase = PREPARE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.participant.progression.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()

Expand Down Expand Up @@ -645,7 +642,7 @@ func (i *instance) tryPrepare() error {

func (i *instance) beginCommit() {
i.phase = COMMIT_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.participant.progression.NotifyProgress(i.instanceID, i.round, i.phase)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()

Expand Down Expand Up @@ -722,7 +719,7 @@ func (i *instance) tryCommit(round uint64) error {

func (i *instance) beginDecide(round uint64) {
i.phase = DECIDE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.participant.progression.NotifyProgress(i.instanceID, i.round, i.phase)
i.resetRebroadcastParams()
var justification *Justification
// Value cannot be empty here.
Expand All @@ -748,7 +745,7 @@ func (i *instance) beginDecide(round uint64) {
// The provided justification must justify the value being decided.
func (i *instance) skipToDecide(value ECChain, justification *Justification) {
i.phase = DECIDE_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.participant.progression.NotifyProgress(i.instanceID, i.round, i.phase)
i.proposal = value
i.value = i.proposal
i.resetRebroadcastParams()
Expand Down Expand Up @@ -854,7 +851,7 @@ func (i *instance) addCandidate(c ECChain) bool {
func (i *instance) terminate(decision *Justification) {
i.log("✅ terminated %s during round %d", &i.value, i.round)
i.phase = TERMINATED_PHASE
i.progress.NotifyProgress(i.instanceID, i.round, i.phase)
i.participant.progression.NotifyProgress(i.instanceID, i.round, i.phase)
i.value = decision.Vote.Value
i.terminationValue = decision
i.resetRebroadcastParams()
Expand Down
90 changes: 48 additions & 42 deletions gpbft/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/filecoin-project/go-f3/internal/caching"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel/metric"
)
Expand All @@ -25,17 +26,10 @@ type Participant struct {
*options
host Host

// Mutex for detecting concurrent invocation of stateful API methods.
// To be taken at the top of public API methods, and always taken before instanceMutex,
// if both are to be taken.
// apiMutex prevents concurrent access to stateful API methods to ensure thread
// safety. This mutex should be locked at the beginning of each public API method
// that modifies state.
apiMutex sync.Mutex
// Mutex protecting currentInstance and committees cache for concurrent validation.
// Note that not every access needs to be protected:
// - writes to currentInstance, and reads from it during validation,
// - reads from or writes to committees (which is written during validation).
instanceMutex sync.Mutex
// Instance identifier for the current (or, if none, next to start) GPBFT instance.
currentInstance uint64
// Cache of committees for the current or future instances.
committeeProvider *cachedCommitteeProvider

Expand All @@ -48,7 +42,19 @@ type Participant struct {
// protocol round for which a strong quorum of COMMIT messages was observed,
// which may not be known to the participant.
terminatedDuringRound uint64
validator *cachingValidator
// progression is the atomic reference to the current GPBFT instance being
// progressed by this Participant, or the next instance to be started if no such
// instance exists.
progression *atomicProgression
// messageCache maintains a cache of unique identifiers for valid messages or
// justifications that this Participant has received since the previous instance.
// It ensures that only relevant messages or justifications are retained by
// automatically pruning entries from older instances as the participant
// progresses to the next instance.
//
// See Participant.finishCurrentInstance, Participant.validator.
messageCache *caching.GroupedSet
validator *cachingValidator
}

type validatedMessage struct {
Expand Down Expand Up @@ -83,12 +89,16 @@ func NewParticipant(host Host, o ...Option) (*Participant, error) {
return nil, err
}
ccp := newCachedCommitteeProvider(host)
messageCache := caching.NewGroupedSet(opts.maxCachedInstances, opts.maxCachedMessagesPerInstance)
progression := newAtomicProgression()
return &Participant{
options: opts,
host: host,
committeeProvider: ccp,
mqueue: newMessageQueue(opts.maxLookaheadRounds),
validator: newValidator(host, ccp, opts.committeeLookback, opts.maxCachedInstances, opts.maxCachedMessagesPerInstance),
messageCache: messageCache,
progression: progression,
validator: newValidator(host, ccp, progression.Get, messageCache, opts.committeeLookback),
}, nil
}

Expand Down Expand Up @@ -117,25 +127,19 @@ func (p *Participant) StartInstanceAt(instance uint64, when time.Time) (err erro
return err
}

func (p *Participant) CurrentRound() uint64 {
if !p.apiMutex.TryLock() {
panic("concurrent API method invocation")
}
defer p.apiMutex.Unlock()
if p.gpbft == nil {
return 0
}
return p.gpbft.round
// Progress returns the latest progress of this Participant in terms of GPBFT
// instance ID, round and phase.
//
// This API is safe for concurrent use.
func (p *Participant) Progress() (instance, round uint64, phase Phase) {
return p.progression.Get()
}

func (p *Participant) CurrentInstance() uint64 {
if !p.apiMutex.TryLock() {
panic("concurrent API method invocation")
}
defer p.apiMutex.Unlock()
p.instanceMutex.Lock()
defer p.instanceMutex.Unlock()
return p.currentInstance
// currentInstance is a convenient wrapper around Participant.Progress that returns the
// current GPBFT instance ID for internal use.
func (p *Participant) currentInstance() uint64 {
currentInstance, _, _ := p.Progress()
return currentInstance
}

// ValidateMessage checks if the given message is valid. If invalid, an error is
Expand Down Expand Up @@ -171,15 +175,16 @@ func (p *Participant) ReceiveMessage(vmsg ValidatedMessage) (err error) {
}()
msg := vmsg.Message()

currentInstance := p.currentInstance()
// Drop messages for past instances.
if msg.Vote.Instance < p.currentInstance {
if msg.Vote.Instance < currentInstance {
p.trace("dropping message from old instance %d while received in instance %d",
msg.Vote.Instance, p.currentInstance)
msg.Vote.Instance, currentInstance)
return nil
}

// If the message is for the current instance, deliver immediately.
if p.gpbft != nil && msg.Vote.Instance == p.currentInstance {
if p.gpbft != nil && msg.Vote.Instance == currentInstance {
if err := p.gpbft.Receive(msg); err != nil {
return fmt.Errorf("%w: %w", ErrReceivedInternalError, err)
}
Expand Down Expand Up @@ -217,9 +222,10 @@ func (p *Participant) ReceiveAlarm() (err error) {
}

func (p *Participant) beginInstance() error {
data, chain, err := p.host.GetProposal(p.currentInstance)
currentInstance := p.currentInstance()
data, chain, err := p.host.GetProposal(currentInstance)
if err != nil {
return fmt.Errorf("failed fetching chain for instance %d: %w", p.currentInstance, err)
return fmt.Errorf("failed fetching chain for instance %d: %w", currentInstance, err)
}
// Limit length of the chain to be proposed.
if chain.IsZero() {
Expand All @@ -230,11 +236,11 @@ func (p *Participant) beginInstance() error {
return fmt.Errorf("invalid canonical chain: %w", err)
}

comt, err := p.committeeProvider.GetCommittee(p.currentInstance)
comt, err := p.committeeProvider.GetCommittee(currentInstance)
if err != nil {
return err
}
if p.gpbft, err = newInstance(p, p.currentInstance, chain, data, comt.PowerTable, comt.AggregateVerifier, comt.Beacon, p.validator); err != nil {
if p.gpbft, err = newInstance(p, currentInstance, chain, data, comt.PowerTable, comt.AggregateVerifier, comt.Beacon); err != nil {
return fmt.Errorf("failed creating new gpbft instance: %w", err)
}
if err := p.gpbft.Start(); err != nil {
Expand Down Expand Up @@ -264,7 +270,7 @@ func (p *Participant) handleDecision() {
p.trace("failed to receive decision: %+v", err)
p.host.SetAlarm(time.Time{})
} else {
p.beginNextInstance(p.currentInstance + 1)
p.beginNextInstance(p.currentInstance() + 1)
p.host.SetAlarm(nextStart)
}
}
Expand All @@ -276,13 +282,14 @@ func (p *Participant) finishCurrentInstance() *Justification {
p.terminatedDuringRound = p.gpbft.round
}
p.gpbft = nil
p.validator.NotifyProgress(p.currentInstance, 0, INITIAL_PHASE)
if currentInstance := p.currentInstance(); currentInstance > 1 {
// Remove all cached messages that are older than the previous instance
p.messageCache.RemoveGroupsLessThan(currentInstance - 1)
}
return decision
}

func (p *Participant) beginNextInstance(nextInstance uint64) {
p.instanceMutex.Lock()
defer p.instanceMutex.Unlock()
// Clean all messages queued and for instances below the next one.
for inst := range p.mqueue.messages {
if inst < nextInstance {
Expand All @@ -294,8 +301,7 @@ func (p *Participant) beginNextInstance(nextInstance uint64) {
if nextInstance > 0 {
p.committeeProvider.EvictCommitteesBefore(nextInstance - 1)
}
p.currentInstance = nextInstance
p.validator.NotifyProgress(p.currentInstance, 0, INITIAL_PHASE)
p.progression.NotifyProgress(nextInstance, 0, INITIAL_PHASE)
}

func (p *Participant) terminated() bool {
Expand Down
5 changes: 4 additions & 1 deletion gpbft/participant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,10 @@ func (pt *participantTestSubject) expectBeginInstance() {

func (pt *participantTestSubject) requireNotStarted() {
pt.t.Helper()
require.Zero(pt.t, pt.CurrentRound())
instance, round, phase := pt.Progress()
require.Zero(pt.t, instance)
require.Zero(pt.t, round)
require.Equal(pt.t, gpbft.INITIAL_PHASE, phase)
require.Equal(pt.t, "nil", pt.Describe())
}

Expand Down
49 changes: 49 additions & 0 deletions gpbft/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package gpbft

import "sync/atomic"

var (
_ ProgressObserver = (*atomicProgression)(nil)
_ Progress = (*atomicProgression)(nil).Get
)

// Progress gets the latest GPBFT instance progress.
type Progress func() (instance, round uint64, phase Phase)

// ProgressObserver defines an interface for observing and being notified about
// the progress of a GPBFT instance as it advances through different instance,
// rounds or phases.
type ProgressObserver interface {
// NotifyProgress is called to notify the observer about the progress of GPBFT
// instance, round or phase.
NotifyProgress(instance, round uint64, phase Phase)
}

type atomicProgression struct {
progression atomic.Pointer[progress]
}

type progress struct {
instance uint64
round uint64
phase Phase
}

func newAtomicProgression() *atomicProgression {
return &atomicProgression{}
}

func (a *atomicProgression) NotifyProgress(instance, round uint64, phase Phase) {
a.progression.Store(&progress{
instance: instance,
round: round,
phase: phase,
})
}

func (a *atomicProgression) Get() (instance, round uint64, phase Phase) {
if latest := a.progression.Load(); latest != nil {
instance, round, phase = latest.instance, latest.round, latest.phase
}
return
}
48 changes: 48 additions & 0 deletions gpbft/progress_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package gpbft

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestAtomicProgression(t *testing.T) {
subject := newAtomicProgression()
t.Run("zero value", func(t *testing.T) {
instance, round, phase := subject.Get()
require.Equal(t, uint64(0), instance, "Expected initial instance to be 0")
require.Equal(t, uint64(0), round, "Expected initial round to be 0")
require.Equal(t, INITIAL_PHASE, phase, "Expected initial phase to be INITIAL_PHASE")
})
t.Run("notify and get", func(t *testing.T) {
subject.NotifyProgress(1, 10, PREPARE_PHASE)
instance, round, phase := subject.Get()
require.Equal(t, uint64(1), instance, "Expected instance to be 1")
require.Equal(t, uint64(10), round, "Expected round to be 10")
require.Equal(t, PREPARE_PHASE, phase, "Expected phase to be PREPARE_PHASE")
})
t.Run("notify and get progresses", func(t *testing.T) {
subject.NotifyProgress(2, 20, COMMIT_PHASE)
instance, round, phase := subject.Get()
require.Equal(t, uint64(2), instance, "Expected instance to be updated to 2")
require.Equal(t, uint64(20), round, "Expected round to be updated to 20")
require.Equal(t, COMMIT_PHASE, phase, "Expected phase to be updated to COMMIT_PHASE")
})
t.Run("concurrent update", func(t *testing.T) {
var wg sync.WaitGroup
update := func(inst, rnd uint64, ph Phase) {
defer wg.Done()
subject.NotifyProgress(inst, rnd, ph)
}
wg.Add(2)
go update(3, 30, COMMIT_PHASE)
go update(4, 40, DECIDE_PHASE)
wg.Wait()

instance, round, phase := subject.Get()
require.True(t, instance == 3 || instance == 4, "Instance should match one of the updates")
require.True(t, round == 30 || round == 40, "Round should match one of the updates")
require.True(t, phase == COMMIT_PHASE || phase == DECIDE_PHASE, "Phase should match one of the updates")
})
}
Loading

0 comments on commit e30c8aa

Please sign in to comment.