forked from cometbft/cometbft
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsyncer.go
516 lines (461 loc) · 16.3 KB
/
syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
package statesync
import (
"bytes"
"context"
"errors"
"fmt"
"time"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/config"
"github.com/cometbft/cometbft/libs/log"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/light"
"github.com/cometbft/cometbft/p2p"
ssproto "github.com/cometbft/cometbft/proto/tendermint/statesync"
"github.com/cometbft/cometbft/proxy"
sm "github.com/cometbft/cometbft/state"
"github.com/cometbft/cometbft/types"
)
const (
// chunkTimeout is the timeout while waiting for the next chunk from the chunk queue.
chunkTimeout = 2 * time.Minute
// minimumDiscoveryTime is the lowest allowable time for a
// SyncAny discovery time.
minimumDiscoveryTime = 5 * time.Second
)
var (
// errAbort is returned by Sync() when snapshot restoration is aborted.
errAbort = errors.New("state sync aborted")
// errRetrySnapshot is returned by Sync() when the snapshot should be retried.
errRetrySnapshot = errors.New("retry snapshot")
// errRejectSnapshot is returned by Sync() when the snapshot is rejected.
errRejectSnapshot = errors.New("snapshot was rejected")
// errRejectFormat is returned by Sync() when the snapshot format is rejected.
errRejectFormat = errors.New("snapshot format was rejected")
// errRejectSender is returned by Sync() when the snapshot sender is rejected.
errRejectSender = errors.New("snapshot sender was rejected")
// errVerifyFailed is returned by Sync() when app hash or last height verification fails.
errVerifyFailed = errors.New("verification failed")
// errTimeout is returned by Sync() when we've waited too long to receive a chunk.
errTimeout = errors.New("timed out waiting for chunk")
// errNoSnapshots is returned by SyncAny() if no snapshots are found and discovery is disabled.
errNoSnapshots = errors.New("no suitable snapshots found")
)
// syncer runs a state sync against an ABCI app. Use either SyncAny() to automatically attempt to
// sync all snapshots in the pool (pausing to discover new ones), or Sync() to sync a specific
// snapshot. Snapshots and chunks are fed via AddSnapshot() and AddChunk() as appropriate.
type syncer struct {
logger log.Logger
stateProvider StateProvider
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
snapshots *snapshotPool
tempDir string
chunkFetchers int32
retryTimeout time.Duration
mtx cmtsync.RWMutex
chunks *chunkQueue
}
// newSyncer creates a new syncer.
func newSyncer(
cfg config.StateSyncConfig,
logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
stateProvider StateProvider,
tempDir string,
) *syncer {
return &syncer{
logger: logger,
stateProvider: stateProvider,
conn: conn,
connQuery: connQuery,
snapshots: newSnapshotPool(),
tempDir: tempDir,
chunkFetchers: cfg.ChunkFetchers,
retryTimeout: cfg.ChunkRequestTimeout,
}
}
// AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
// been added to the queue, or an error if there's no sync in progress.
func (s *syncer) AddChunk(chunk *chunk) (bool, error) {
s.mtx.RLock()
defer s.mtx.RUnlock()
if s.chunks == nil {
return false, errors.New("no state sync in progress")
}
added, err := s.chunks.Add(chunk)
if err != nil {
return false, err
}
if added {
s.logger.Debug("Added chunk to queue", "height", chunk.Height, "format", chunk.Format,
"chunk", chunk.Index)
} else {
s.logger.Debug("Ignoring duplicate chunk in queue", "height", chunk.Height, "format", chunk.Format,
"chunk", chunk.Index)
}
return added, nil
}
// AddSnapshot adds a snapshot to the snapshot pool. It returns true if a new, previously unseen
// snapshot was accepted and added.
func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) {
added, err := s.snapshots.Add(peer, snapshot)
if err != nil {
return false, err
}
if added {
s.logger.Info("Discovered new snapshot", "height", snapshot.Height, "format", snapshot.Format,
"hash", log.NewLazySprintf("%X", snapshot.Hash))
}
return added, nil
}
// AddPeer adds a peer to the pool. For now we just keep it simple and send a single request
// to discover snapshots, later we may want to do retries and stuff.
func (s *syncer) AddPeer(peer p2p.Peer) {
s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID())
e := p2p.Envelope{
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
}
peer.Send(e)
}
// RemovePeer removes a peer from the pool.
func (s *syncer) RemovePeer(peer p2p.Peer) {
s.logger.Debug("Removing peer from sync", "peer", peer.ID())
s.snapshots.RemovePeer(peer.ID())
}
// SyncAny tries to sync any of the snapshots in the snapshot pool, waiting to discover further
// snapshots if none were found and discoveryTime > 0. It returns the latest state and block commit
// which the caller must use to bootstrap the node.
func (s *syncer) SyncAny(discoveryTime time.Duration, retryHook func()) (sm.State, *types.Commit, error) {
if discoveryTime != 0 && discoveryTime < minimumDiscoveryTime {
discoveryTime = 5 * minimumDiscoveryTime
}
if discoveryTime > 0 {
s.logger.Info("Discovering snapshots", "discoverTime", discoveryTime)
time.Sleep(discoveryTime)
}
// The app may ask us to retry a snapshot restoration, in which case we need to reuse
// the snapshot and chunk queue from the previous loop iteration.
var (
snapshot *snapshot
chunks *chunkQueue
err error
)
for {
// If not nil, we're going to retry restoration of the same snapshot.
if snapshot == nil {
snapshot = s.snapshots.Best()
chunks = nil
}
if snapshot == nil {
if discoveryTime == 0 {
return sm.State{}, nil, errNoSnapshots
}
retryHook()
s.logger.Info("sync any", "msg", log.NewLazySprintf("Discovering snapshots for %v", discoveryTime))
time.Sleep(discoveryTime)
continue
}
if chunks == nil {
chunks, err = newChunkQueue(snapshot, s.tempDir)
if err != nil {
return sm.State{}, nil, fmt.Errorf("failed to create chunk queue: %w", err)
}
defer chunks.Close() // in case we forget to close it elsewhere
}
newState, commit, err := s.Sync(snapshot, chunks)
switch {
case err == nil:
return newState, commit, nil
case errors.Is(err, errAbort):
return sm.State{}, nil, err
case errors.Is(err, errRetrySnapshot):
chunks.RetryAll()
s.logger.Info("Retrying snapshot", "height", snapshot.Height, "format", snapshot.Format,
"hash", log.NewLazySprintf("%X", snapshot.Hash))
continue
case errors.Is(err, errTimeout):
s.snapshots.Reject(snapshot)
s.logger.Error("Timed out waiting for snapshot chunks, rejected snapshot",
"height", snapshot.Height, "format", snapshot.Format, "hash", log.NewLazySprintf("%X", snapshot.Hash))
case errors.Is(err, errRejectSnapshot):
s.snapshots.Reject(snapshot)
s.logger.Info("Snapshot rejected", "height", snapshot.Height, "format", snapshot.Format,
"hash", log.NewLazySprintf("%X", snapshot.Hash))
case errors.Is(err, errRejectFormat):
s.snapshots.RejectFormat(snapshot.Format)
s.logger.Info("Snapshot format rejected", "format", snapshot.Format)
case errors.Is(err, errRejectSender):
s.logger.Info("Snapshot senders rejected", "height", snapshot.Height, "format", snapshot.Format,
"hash", log.NewLazySprintf("%X", snapshot.Hash))
for _, peer := range s.snapshots.GetPeers(snapshot) {
s.snapshots.RejectPeer(peer.ID())
s.logger.Info("Snapshot sender rejected", "peer", peer.ID())
}
case errors.Is(err, context.DeadlineExceeded):
s.logger.Info("Timed out validating snapshot, rejecting", "height", snapshot.Height, "err", err)
s.snapshots.Reject(snapshot)
default:
return sm.State{}, nil, fmt.Errorf("snapshot restoration failed: %w", err)
}
// Discard snapshot and chunks for next iteration
err = chunks.Close()
if err != nil {
s.logger.Error("Failed to clean up chunk queue", "err", err)
}
snapshot = nil
chunks = nil
}
}
// Sync executes a sync for a specific snapshot, returning the latest state and block commit which
// the caller must use to bootstrap the node.
func (s *syncer) Sync(snapshot *snapshot, chunks *chunkQueue) (sm.State, *types.Commit, error) {
s.mtx.Lock()
if s.chunks != nil {
s.mtx.Unlock()
return sm.State{}, nil, errors.New("a state sync is already in progress")
}
s.chunks = chunks
s.mtx.Unlock()
defer func() {
s.mtx.Lock()
s.chunks = nil
s.mtx.Unlock()
}()
hctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
appHash, err := s.stateProvider.AppHash(hctx, snapshot.Height)
if err != nil {
s.logger.Info("failed to fetch and verify app hash", "err", err)
if err == light.ErrNoWitnesses {
return sm.State{}, nil, err
}
return sm.State{}, nil, errRejectSnapshot
}
snapshot.trustedAppHash = appHash
// Offer snapshot to ABCI app.
err = s.offerSnapshot(snapshot)
if err != nil {
return sm.State{}, nil, err
}
// Spawn chunk fetchers. They will terminate when the chunk queue is closed or context canceled.
fetchCtx, cancel := context.WithCancel(context.TODO())
defer cancel()
for i := int32(0); i < s.chunkFetchers; i++ {
go s.fetchChunks(fetchCtx, snapshot, chunks)
}
pctx, pcancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer pcancel()
// Optimistically build new state, so we don't discover any light client failures at the end.
state, err := s.stateProvider.State(pctx, snapshot.Height)
if err != nil {
s.logger.Info("failed to fetch and verify CometBFT state", "err", err)
if err == light.ErrNoWitnesses {
return sm.State{}, nil, err
}
return sm.State{}, nil, errRejectSnapshot
}
commit, err := s.stateProvider.Commit(pctx, snapshot.Height)
if err != nil {
s.logger.Info("failed to fetch and verify commit", "err", err)
if err == light.ErrNoWitnesses {
return sm.State{}, nil, err
}
return sm.State{}, nil, errRejectSnapshot
}
// Restore snapshot
err = s.applyChunks(chunks)
if err != nil {
return sm.State{}, nil, err
}
// Verify app and app version
if err := s.verifyApp(snapshot, state.Version.Consensus.App); err != nil {
return sm.State{}, nil, err
}
// Done! 🎉
s.logger.Info("Snapshot restored", "height", snapshot.Height, "format", snapshot.Format,
"hash", log.NewLazySprintf("%X", snapshot.Hash))
return state, commit, nil
}
// offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
// response, or nil if the snapshot was accepted.
func (s *syncer) offerSnapshot(snapshot *snapshot) error {
s.logger.Info("Offering snapshot to ABCI app", "height", snapshot.Height,
"format", snapshot.Format, "hash", log.NewLazySprintf("%X", snapshot.Hash))
resp, err := s.conn.OfferSnapshot(context.TODO(), &abci.RequestOfferSnapshot{
Snapshot: &abci.Snapshot{
Height: snapshot.Height,
Format: snapshot.Format,
Chunks: snapshot.Chunks,
Hash: snapshot.Hash,
Metadata: snapshot.Metadata,
},
AppHash: snapshot.trustedAppHash,
})
if err != nil {
return fmt.Errorf("failed to offer snapshot: %w", err)
}
switch resp.Result {
case abci.ResponseOfferSnapshot_ACCEPT:
s.logger.Info("Snapshot accepted, restoring", "height", snapshot.Height,
"format", snapshot.Format, "hash", log.NewLazySprintf("%X", snapshot.Hash))
return nil
case abci.ResponseOfferSnapshot_ABORT:
return errAbort
case abci.ResponseOfferSnapshot_REJECT:
return errRejectSnapshot
case abci.ResponseOfferSnapshot_REJECT_FORMAT:
return errRejectFormat
case abci.ResponseOfferSnapshot_REJECT_SENDER:
return errRejectSender
default:
return fmt.Errorf("unknown ResponseOfferSnapshot result %v", resp.Result)
}
}
// applyChunks applies chunks to the app. It returns various errors depending on the app's
// response, or nil once the snapshot is fully restored.
func (s *syncer) applyChunks(chunks *chunkQueue) error {
for {
chunk, err := chunks.Next()
if err == errDone {
return nil
} else if err != nil {
return fmt.Errorf("failed to fetch chunk: %w", err)
}
resp, err := s.conn.ApplySnapshotChunk(context.TODO(), &abci.RequestApplySnapshotChunk{
Index: chunk.Index,
Chunk: chunk.Chunk,
Sender: string(chunk.Sender),
})
if err != nil {
return fmt.Errorf("failed to apply chunk %v: %w", chunk.Index, err)
}
s.logger.Info("Applied snapshot chunk to ABCI app", "height", chunk.Height,
"format", chunk.Format, "chunk", chunk.Index, "total", chunks.Size())
// Discard and refetch any chunks as requested by the app
for _, index := range resp.RefetchChunks {
err := chunks.Discard(index)
if err != nil {
return fmt.Errorf("failed to discard chunk %v: %w", index, err)
}
}
// Reject any senders as requested by the app
for _, sender := range resp.RejectSenders {
if sender != "" {
s.snapshots.RejectPeer(p2p.ID(sender))
err := chunks.DiscardSender(p2p.ID(sender))
if err != nil {
return fmt.Errorf("failed to reject sender: %w", err)
}
}
}
switch resp.Result {
case abci.ResponseApplySnapshotChunk_ACCEPT:
case abci.ResponseApplySnapshotChunk_ABORT:
return errAbort
case abci.ResponseApplySnapshotChunk_RETRY:
chunks.Retry(chunk.Index)
case abci.ResponseApplySnapshotChunk_RETRY_SNAPSHOT:
return errRetrySnapshot
case abci.ResponseApplySnapshotChunk_REJECT_SNAPSHOT:
return errRejectSnapshot
default:
return fmt.Errorf("unknown ResponseApplySnapshotChunk result %v", resp.Result)
}
}
}
// fetchChunks requests chunks from peers, receiving allocations from the chunk queue. Chunks
// will be received from the reactor via syncer.AddChunks() to chunkQueue.Add().
func (s *syncer) fetchChunks(ctx context.Context, snapshot *snapshot, chunks *chunkQueue) {
var (
next = true
index uint32
err error
)
for {
if next {
index, err = chunks.Allocate()
if errors.Is(err, errDone) {
// Keep checking until the context is canceled (restore is done), in case any
// chunks need to be refetched.
select {
case <-ctx.Done():
return
default:
}
time.Sleep(2 * time.Second)
continue
}
if err != nil {
s.logger.Error("Failed to allocate chunk from queue", "err", err)
return
}
}
s.logger.Info("Fetching snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", index, "total", chunks.Size())
ticker := time.NewTicker(s.retryTimeout)
defer ticker.Stop()
s.requestChunk(snapshot, index)
select {
case <-chunks.WaitFor(index):
next = true
case <-ticker.C:
next = false
case <-ctx.Done():
return
}
ticker.Stop()
}
}
// requestChunk requests a chunk from a peer.
func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) {
peer := s.snapshots.GetPeer(snapshot)
if peer == nil {
s.logger.Error("No valid peers found for snapshot", "height", snapshot.Height,
"format", snapshot.Format, "hash", log.NewLazySprintf("%X", snapshot.Hash))
return
}
s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height,
"format", snapshot.Format, "chunk", chunk, "peer", peer.ID())
peer.Send(p2p.Envelope{
ChannelID: ChunkChannel,
Message: &ssproto.ChunkRequest{
Height: snapshot.Height,
Format: snapshot.Format,
Index: chunk,
},
})
}
// verifyApp verifies the sync, checking the app hash, last block height and app version
func (s *syncer) verifyApp(snapshot *snapshot, appVersion uint64) error {
resp, err := s.connQuery.Info(context.TODO(), proxy.RequestInfo)
if err != nil {
return fmt.Errorf("failed to query ABCI app for appHash: %w", err)
}
// sanity check that the app version in the block matches the application's own record
// of its version
if resp.AppVersion != appVersion {
// An error here most likely means that the app hasn't inplemented state sync
// or the Info call correctly
return fmt.Errorf("app version mismatch. Expected: %d, got: %d",
appVersion, resp.AppVersion)
}
if !bytes.Equal(snapshot.trustedAppHash, resp.LastBlockAppHash) {
s.logger.Error("appHash verification failed",
"expected", fmt.Sprintf("%X", snapshot.trustedAppHash),
"actual", fmt.Sprintf("%X", resp.LastBlockAppHash))
return errVerifyFailed
}
if uint64(resp.LastBlockHeight) != snapshot.Height {
s.logger.Error(
"ABCI app reported unexpected last block height",
"expected", snapshot.Height,
"actual", resp.LastBlockHeight,
)
return errVerifyFailed
}
s.logger.Info("Verified ABCI app", "height", snapshot.Height, "appHash", log.NewLazySprintf("%X", snapshot.trustedAppHash))
return nil
}