Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/fetcher: enhance ValidatorSet method with improved error handlin… #4038

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 72 additions & 27 deletions core/fetcher.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package core provides fundamental blockchain interaction functionality,
// including block fetching, subscription handling, and validator set management.
package core

import (
Expand All @@ -12,13 +14,18 @@ import (
libhead "github.com/celestiaorg/go-header"
)

// Constants for event subscription
const newBlockSubscriber = "NewBlock/Events"

var (
log = logging.Logger("core")
// log is the package-level logger
log = logging.Logger("core")
// newDataSignedBlockQuery defines the query string for subscribing to signed block events
newDataSignedBlockQuery = types.QueryForEvent(types.EventSignedBlock).String()
)

// BlockFetcher provides functionality to fetch blocks, commits, and validator sets
// from a Tendermint Core node. It also supports subscribing to new block events.
type BlockFetcher struct {
client Client

Expand All @@ -33,48 +40,49 @@ func NewBlockFetcher(client Client) *BlockFetcher {
}
}

// GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet.
// GetBlockInfo queries Core for additional block information, including Commit and ValidatorSet.
// If height is nil, it fetches information for the latest block.
// Returns an error if either the commit or validator set cannot be retrieved.
func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) {
commit, err := f.Commit(ctx, height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err)
return nil, nil, fmt.Errorf("failed to get commit at height %v: %w", height, err)
}

// If a nil `height` is given as a parameter, there is a chance
// that a new block could be produced between getting the latest
// commit and getting the latest validator set. Therefore, it is
// best to get the validator set at the latest commit's height to
// prevent this potential inconsistency.
// If a nil height is given, we use the commit's height to ensure consistency
// between the commit and validator set
valSet, err := f.ValidatorSet(ctx, &commit.Height)
if err != nil {
return nil, nil, fmt.Errorf("core/fetcher: getting validator set at height %d: %w", height, err)
return nil, nil, fmt.Errorf("failed to get validator set at height %v: %w", height, err)
}

return commit, valSet, nil
}

// GetBlock queries Core for a `Block` at the given height.
// GetBlock retrieves a block at the specified height from Core.
// If height is nil, it fetches the latest block.
func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) {
res, err := f.client.Block(ctx, height)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get block: %w", err)
}

if res != nil && res.Block == nil {
return nil, fmt.Errorf("core/fetcher: block not found, height: %d", height)
return nil, fmt.Errorf("block not found at height %v", height)
}

return res.Block, nil
}

// GetBlockByHash retrieves a block with the specified hash from Core.
func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) {
res, err := f.client.BlockByHash(ctx, hash)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get block by hash: %w", err)
}

if res != nil && res.Block == nil {
return nil, fmt.Errorf("core/fetcher: block not found, hash: %s", hash.String())
return nil, fmt.Errorf("block not found with hash %s", hash.String())
}

return res.Block, nil
Expand Down Expand Up @@ -103,32 +111,69 @@ func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit
// ValidatorSet queries Core for the ValidatorSet from the
// block at the given height.
func (f *BlockFetcher) ValidatorSet(ctx context.Context, height *int64) (*types.ValidatorSet, error) {
perPage := 100
// Validate height if provided
if height != nil && *height < 0 {
return nil, fmt.Errorf("invalid height: %d, height must be non-negative", *height)
}

const (
perPage = 100 // number of validators per page
maxPages = 100 // protection against too many iterations
)

vals := make([]*types.Validator, 0)
total := -1
page := 1

for len(vals) != total {
// Protection against too many iterations
if page > maxPages {
return nil, fmt.Errorf("exceeded maximum number of pages (%d) while fetching validator set", maxPages)
}

vals, total := make([]*types.Validator, 0), -1
for page := 1; len(vals) != total; page++ {
res, err := f.client.Validators(ctx, height, &page, &perPage)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to fetch validators at height %v, page %d: %w", height, page, err)
}

if res != nil && len(res.Validators) == 0 {
return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height)
if res == nil {
return nil, fmt.Errorf("received nil response while fetching validators at height %v, page %d", height, page)
}

if len(res.Validators) == 0 {
if page == 1 {
return nil, fmt.Errorf("validator set not found at height %v", height)
}
// This shouldn't happen if total was correct
return nil, fmt.Errorf("unexpected empty validator page %d when total is %d", page, total)
}

// Initialize total on first page
if total == -1 {
total = res.Total
// Pre-allocate the slice with the total size
vals = make([]*types.Validator, 0, total)
}

total = res.Total
vals = append(vals, res.Validators...)
page++

select {
case <-ctx.Done():
return nil, fmt.Errorf("context cancelled while fetching validators: %w", ctx.Err())
default:
}
}

return types.NewValidatorSet(vals), nil
}

// SubscribeNewBlockEvent subscribes to new block events from Core, returning
// a new block event channel on success.
// SubscribeNewBlockEvent subscribes to new block events from Core.
// Returns a channel that receives new block events and an error if the subscription fails.
// The subscription can be cancelled using UnsubscribeNewBlockEvent or when the context is cancelled.
func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) {
// start the client if not started yet
if !f.client.IsRunning() {
return nil, errors.New("client not running")
return nil, errors.New("client is not running")
}

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -137,7 +182,7 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types

eventChan, err := f.client.Subscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to subscribe to new blocks: %w", err)
}

signedBlockCh := make(chan types.EventDataSignedBlock)
Expand All @@ -150,7 +195,7 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types
return
case newEvent, ok := <-eventChan:
if !ok {
log.Errorw("fetcher: new blocks subscription channel closed unexpectedly")
log.Errorw("new blocks subscription channel closed unexpectedly")
return
}
signedBlock := newEvent.Data.(types.EventDataSignedBlock)
Expand Down
Loading