Skip to content

Commit

Permalink
Strictly require start instance to never decrease
Browse files Browse the repository at this point in the history
Require the start instance of a participation to never decrease if there
 is an existing lease by the miner.
  • Loading branch information
masih authored and Stebalien committed Oct 7, 2024
1 parent f53fdec commit d2ea23a
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 31 deletions.
11 changes: 11 additions & 0 deletions api/api_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
EF3ParticipationTicketExpired
EF3ParticipationIssuerMismatch
EF3ParticipationTooManyInstances
EF3ParticipationTicketStartBeforeExisting
)

var (
Expand All @@ -31,6 +32,9 @@ var (
// ErrF3ParticipationTooManyInstances signals that participation ticket cannot be
// issued because it asks for too many instances.
ErrF3ParticipationTooManyInstances = errF3ParticipationTooManyInstances{}
// ErrF3ParticipationTicketStartBeforeExisting signals that participation ticket
// is before the start instance of an existing lease held by the miner.
ErrF3ParticipationTicketStartBeforeExisting = errF3ParticipationTicketStartBeforeExisting{}

_ error = (*ErrOutOfGas)(nil)
_ error = (*ErrActorNotFound)(nil)
Expand All @@ -48,6 +52,7 @@ func init() {
RPCErrors.Register(EF3ParticipationTicketExpired, new(*errF3ParticipationTicketExpired))
RPCErrors.Register(EF3ParticipationIssuerMismatch, new(*errF3ParticipationIssuerMismatch))
RPCErrors.Register(EF3ParticipationTooManyInstances, new(*errF3ParticipationTooManyInstances))
RPCErrors.Register(EF3ParticipationTicketStartBeforeExisting, new(*errF3ParticipationTicketStartBeforeExisting))
}

func ErrorIsIn(err error, errorTypes []error) bool {
Expand Down Expand Up @@ -89,3 +94,9 @@ func (errF3ParticipationIssuerMismatch) Error() string { return "issuer does not
type errF3ParticipationTooManyInstances struct{}

func (errF3ParticipationTooManyInstances) Error() string { return "requested instance count too high" }

type errF3ParticipationTicketStartBeforeExisting struct{}

func (errF3ParticipationTicketStartBeforeExisting) Error() string {
return "ticket starts before existing lease"
}
5 changes: 5 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,11 @@ type FullNode interface {
// (ErrF3ParticipationTicketExpired), the provider must obtain a new ticket by
// calling F3GetOrRenewParticipationTicket.
//
// The start instance associated to the given ticket cannot be less than the
// start instance of any existing lease held by the miner. Otherwise,
// ErrF3ParticipationTicketStartBeforeExisting is returned. In this case, the
// miner should acquire a new ticket before attempting to participate again.
//
// For details on obtaining or renewing a ticket, see F3GetOrRenewParticipationTicket.
F3Participate(ctx context.Context, ticket F3ParticipationTicket) (F3ParticipationLease, error) //perm:sign
// F3GetCertificate returns a finality certificate at given instance.
Expand Down
2 changes: 1 addition & 1 deletion build/openrpc/full.json
Original file line number Diff line number Diff line change
Expand Up @@ -7843,7 +7843,7 @@
{
"name": "Filecoin.F3Participate",
"description": "```go\nfunc (s *FullNodeStruct) F3Participate(p0 context.Context, p1 F3ParticipationTicket) (F3ParticipationLease, error) {\n\tif s.Internal.F3Participate == nil {\n\t\treturn *new(F3ParticipationLease), ErrNotSupported\n\t}\n\treturn s.Internal.F3Participate(p0, p1)\n}\n```",
"summary": "F3Participate enrolls a storage provider in the F3 consensus process using a\nprovided participation ticket. This ticket grants a temporary lease that enables\nthe provider to sign transactions as part of the F3 consensus.\n\nThe function verifies the ticket's validity and checks if the ticket's issuer\naligns with the current node. If there is an issuer mismatch\n(ErrF3ParticipationIssuerMismatch), the provider should retry with the same\nticket, assuming the issue is due to transient network problems or operational\ndeployment conditions. If the ticket is invalid\n(ErrF3ParticipationTicketInvalid) or has expired\n(ErrF3ParticipationTicketExpired), the provider must obtain a new ticket by\ncalling F3GetOrRenewParticipationTicket.\n\nFor details on obtaining or renewing a ticket, see F3GetOrRenewParticipationTicket.\n",
"summary": "F3Participate enrolls a storage provider in the F3 consensus process using a\nprovided participation ticket. This ticket grants a temporary lease that enables\nthe provider to sign transactions as part of the F3 consensus.\n\nThe function verifies the ticket's validity and checks if the ticket's issuer\naligns with the current node. If there is an issuer mismatch\n(ErrF3ParticipationIssuerMismatch), the provider should retry with the same\nticket, assuming the issue is due to transient network problems or operational\ndeployment conditions. If the ticket is invalid\n(ErrF3ParticipationTicketInvalid) or has expired\n(ErrF3ParticipationTicketExpired), the provider must obtain a new ticket by\ncalling F3GetOrRenewParticipationTicket.\n\nThe start instance associated to the given ticket cannot be less than the\nstart instance of any existing lease held by the miner. Otherwise,\nErrF3ParticipationTicketStartBeforeExisting is returned. In this case, the\nminer should acquire a new ticket before attempting to participate again.\n\nFor details on obtaining or renewing a ticket, see F3GetOrRenewParticipationTicket.\n",
"paramStructure": "by-position",
"params": [
{
Expand Down
13 changes: 9 additions & 4 deletions chain/lf3/participation_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,19 @@ func (l *leaser) getOrRenewParticipationTicket(participant uint64, previous api.

func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) {
currentInstance, _, _ := l.progress()
lease, err := l.validate(currentInstance, ticket)
newLease, err := l.validate(currentInstance, ticket)
if err != nil {
return api.F3ParticipationLease{}, err
}
l.mutex.Lock()
l.leases[lease.MinerID] = lease
l.mutex.Unlock()
return lease, nil
defer l.mutex.Unlock()
currentLease, found := l.leases[newLease.MinerID]
if found && currentLease.FromInstance > newLease.FromInstance {
// For safety, strictly require lease start instance to never decrease.
return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting
}
l.leases[newLease.MinerID] = newLease
return newLease, nil
}

func (l *leaser) getParticipantsByInstance(instance uint64) []uint64 {
Expand Down
18 changes: 18 additions & 0 deletions chain/lf3/participation_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ func TestLeaser(t *testing.T) {
require.ErrorIs(t, err, api.ErrF3ParticipationIssuerMismatch)
require.Zero(t, lease)
})
t.Run("never decreasing start", func(t *testing.T) {
progress.currentInstance++
earlierTicket, err := subject.getOrRenewParticipationTicket(123, nil, 5)
require.NoError(t, err)
progress.currentInstance++
laterTicket, err := subject.getOrRenewParticipationTicket(123, nil, 5)
require.NoError(t, err)

lease, err := subject.participate(laterTicket)
require.NoError(t, err)
require.Equal(t, uint64(123), lease.MinerID)
require.Equal(t, uint64(5), lease.ValidityTerm)
require.Equal(t, progress.currentInstance, lease.FromInstance)

lease, err = subject.participate(earlierTicket)
require.ErrorIs(t, err, api.ErrF3ParticipationTicketStartBeforeExisting)
require.Zero(t, lease)
})
t.Run("expired previous ticket", func(t *testing.T) {
previous, err := subject.getOrRenewParticipationTicket(123, nil, 5)
require.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions documentation/en/api-v1-unstable-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -2645,6 +2645,11 @@ deployment conditions. If the ticket is invalid
(ErrF3ParticipationTicketExpired), the provider must obtain a new ticket by
calling F3GetOrRenewParticipationTicket.

The start instance associated to the given ticket cannot be less than the
start instance of any existing lease held by the miner. Otherwise,
ErrF3ParticipationTicketStartBeforeExisting is returned. In this case, the
miner should acquire a new ticket before attempting to participate again.

For details on obtaining or renewing a ticket, see F3GetOrRenewParticipationTicket.


Expand Down
58 changes: 32 additions & 26 deletions node/modules/storageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ func (p *f3Participator) tryF3Participate(ctx context.Context, ticket api.F3Part
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
p.backOff(ctx)
Expand Down Expand Up @@ -485,6 +488,7 @@ func (p *f3Participator) backOffFor(ctx context.Context, d time.Duration) {
// mutex despite the fact that f3Participator is never (and should never) be
// called from multiple goroutines.
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return
Expand All @@ -510,35 +514,37 @@ func F3Participation(mctx helpers.MetricsCtx, lc fx.Lifecycle, node v1api.FullNo
// leaseTerm The number of instances the miner will attempt to lease from nodes.
leaseTerm = 5
)
ctx, cancel := context.WithCancel(mctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

participator := newF3Participator(
node,
participant,
&backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
},
checkProgressMaxAttempts,
checkProgressInterval,
leaseTerm,
)

switch err := participator.participate(ctx); {
case err == nil, errors.Is(err, context.Canceled):
log.Infof("Stopped participating in F3")
default:
log.Errorw("F3 participation stopped abruptly", "err", err)
}
}()
participator := newF3Participator(
node,
participant,
&backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
},
checkProgressMaxAttempts,
checkProgressInterval,
leaseTerm,
)

ctx, cancel := context.WithCancel(mctx)
var wg sync.WaitGroup
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
OnStart: func(context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
switch err := participator.participate(ctx); {
case err == nil, ctx.Err() != nil:
log.Infof("Stopped participating in F3")
default:
log.Errorw("F3 participation stopped abruptly", "err", err)
}
}()
return nil
},
OnStop: func(context.Context) error {
cancel()
wg.Wait()
return nil
Expand Down

0 comments on commit d2ea23a

Please sign in to comment.