Skip to content

Commit

Permalink
add ignore-advertise-validation flag
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Aug 26, 2024
1 parent 7cebed0 commit f8c80ef
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s
- `advertise-chain-aliases` Alternate names for that chain (optional)
- `advertise-block-features` List of features describing the blocks (optional)
- `advertise-block-id-encoding` Encoding format of the block ID [BLOCK_ID_ENCODING_BASE58, BLOCK_ID_ENCODING_BASE64, BLOCK_ID_ENCODING_BASE64URL, BLOCK_ID_ENCODING_HEX, BLOCK_ID_ENCODING_0X_HEX] (required, unless the block type is in the "well-known" list)
- `ignore-advertise-validation` Runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.

* Add a well-known list of chains (hard-coded in `wellknown/chains.go` to help automatically determine the 'advertise' flag values). Users are encouraged to propose Pull Requests to add more chains to the list.
* The new info endpoint adds a mandatory fetching of the first streamable block on startup, with a failure if no block can be fetched after 3 minutes and you are running `firehose` or `substreams-tier1` service.
Expand Down
2 changes: 1 addition & 1 deletion chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ type Chain[B Block] struct {

// InfoResponseFiller is a function that fills the `pbfirehose.InfoResponse` from the first streamable block of the chain.
// It can validate that we are on the right chain by checking against a known hash, or populate missing fields.
InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error
InfoResponseFiller func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error
}

type ToolsConfig[B Block] struct {
Expand Down
1 change: 1 addition & 0 deletions cmd/apps/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func start[B firecore.Block](cmd *cobra.Command, dataDir string, args []string,
blockIDEncoding,
sflags.MustGetStringSlice(cmd, "advertise-block-features"),
bstream.GetProtocolFirstStreamableBlock,
!sflags.MustGetBool(cmd, "ignore-advertise-validation"),
chain.InfoResponseFiller,
rootLog,
)
Expand Down
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func registerCommonFlags[B firecore.Block](chain *firecore.Chain[B]) {
cmd.Flags().String("advertise-chain-name", "", "[firehose,substreams-tier1] Chain name to advertise in the Info Endpoint. Required but it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-chain-aliases", nil, "[firehose,substreams-tier1] List of chain name aliases to advertise in the Info Endpoint. If unset, it may be inferred from the genesis blocks.")
cmd.Flags().StringSlice("advertise-block-features", nil, "[firehose,substreams-tier1] List of block features to advertise in the Info Endpoint. If unset, it may be inferred from the genesis block.")
cmd.Flags().Bool("ignore-advertise-validation", false, "[firehose,substreams-tier1] When true, runtime checks of chain name/features/encoding against the genesis block will no longer cause server to wait or fail.")

acceptedEncodings := make([]string, len(pbfirehose.InfoResponse_BlockIdEncoding_value)-1)
i := 0
Expand Down
49 changes: 40 additions & 9 deletions firehose/info/endpoint_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"github.com/streamingfast/dstore"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type InfoServer struct {
sync.Mutex

responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error
validate bool
responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error
response *pbfirehose.InfoResponse
ready chan struct{}
initDone bool
Expand All @@ -40,7 +42,8 @@ func NewInfoServer(
blockIDEncoding pbfirehose.InfoResponse_BlockIdEncoding,
blockFeatures []string,
firstStreamableBlock uint64,
responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse) error,
validate bool,
responseFiller func(block *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error,
logger *zap.Logger,
) *InfoServer {

Expand All @@ -55,6 +58,7 @@ func NewInfoServer(
return &InfoServer{
responseFiller: responseFiller,
response: resp,
validate: validate,
ready: make(chan struct{}),
logger: logger,
}
Expand Down Expand Up @@ -90,7 +94,6 @@ func (s *InfoServer) Init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc
return err
}

close(s.ready)
return nil
}

Expand Down Expand Up @@ -142,9 +145,12 @@ func (s *InfoServer) getBlockFromOneBlockStore(ctx context.Context, blockNum uin

// init tries to fetch the first streamable block from the different sources and fills the response with it
// returns an error if it is incomplete
// it can be called only once
func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBlocksStore dstore.Store, oneBlockStore dstore.Store, logger *zap.Logger) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if s.validate {
defer cancel()
}

ch := make(chan *pbbstream.Block)

Expand Down Expand Up @@ -177,19 +183,43 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
logger.Warn("waiting to read the first_streamable_block before starting firehose/substreams endpoints",
loglevel := zapcore.WarnLevel
if !s.validate {
loglevel = zapcore.DebugLevel
}
logger.Log(loglevel, "waiting to read the first_streamable_block before starting firehose/substreams endpoints",
zap.Uint64("first_streamable_block", s.response.FirstStreamableBlockNum),
zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String())
zap.Stringer("one_block_store", oneBlockStore.BaseURL()), // , zap.String("one_block_store", oneBlockStore.String())
zap.Stringer("merged_blocks_store", mergedBlocksStore.BaseURL()),
zap.Stringer("one_block_store", oneBlockStore.BaseURL()),
)
}
}
}()

if !s.validate {
// in this case we don't wait for an answer, but we still try to fill the response
go func() {
select {
case blk := <-ch:
if err := s.responseFiller(blk, s.response, s.validate); err != nil {
logger.Warn("unable to fill and validate info response", zap.Error(err))
}
case <-ctx.Done():
}
if err := validateInfoResponse(s.response); err != nil {
logger.Warn("info response", zap.Error(err))
}
close(s.ready)
}()

cancel()
return nil
}

select {
case blk := <-ch:
if err := s.responseFiller(blk, s.response); err != nil {
return err
if err := s.responseFiller(blk, s.response, s.validate); err != nil {
return fmt.Errorf("%w -- use --ignore-advertise-validation to skip these checks", err)
}
case <-ctx.Done():
}
Expand All @@ -198,5 +228,6 @@ func (s *InfoServer) init(ctx context.Context, fhub *hub.ForkableHub, mergedBloc
return err
}

close(s.ready)
return nil
}
29 changes: 20 additions & 9 deletions firehose/info/info_filler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,33 @@ import (
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
)

var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse) error {
var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp *pbfirehose.InfoResponse, validate bool) error {
resp.FirstStreamableBlockId = firstStreamableBlock.Id

for _, protocol := range wellknown.WellKnownProtocols {
if protocol.BlockType == firstStreamableBlock.Payload.TypeUrl {
resp.BlockIdEncoding = protocol.BytesEncoding
break
}
}

if !validate {
if resp.ChainName == "" {
// still try to fill the chain name if it is not given
if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil {
resp.ChainName = chain.Name
resp.ChainNameAliases = chain.Aliases
}
}
return nil
}

if resp.ChainName != "" {
if chain := wellknown.WellKnownProtocols.ChainByName(resp.ChainName); chain != nil {
if firstStreamableBlock.Number == chain.GenesisBlockNumber && chain.GenesisBlockID != firstStreamableBlock.Id { // we don't check if the firstStreamableBlock is something other than our well-known genesis block
return fmt.Errorf("chain name defined in flag: %q inconsistent with the genesis block ID %q (expected: %q)", resp.ChainName, ox(firstStreamableBlock.Id), ox(chain.GenesisBlockID))
}
resp.ChainName = chain.Name
resp.ChainName = chain.Name // ensure we use the canonical name if the user provided one of the aliases
resp.ChainNameAliases = chain.Aliases
} else if chain := wellknown.WellKnownProtocols.ChainByGenesisBlock(firstStreamableBlock.Number, firstStreamableBlock.Id); chain != nil {
return fmt.Errorf("chain name defined in flag: %q inconsistent with the one discovered from genesis block %q", resp.ChainName, chain.Name)
Expand All @@ -28,13 +46,6 @@ var DefaultInfoResponseFiller = func(firstStreamableBlock *pbbstream.Block, resp
}
}

for _, protocol := range wellknown.WellKnownProtocols {
if protocol.BlockType == firstStreamableBlock.Payload.TypeUrl {
resp.BlockIdEncoding = protocol.BytesEncoding
break
}
}

// Extra validation for ethereum blocks
if firstStreamableBlock.Payload.TypeUrl == "type.googleapis.com/sf.ethereum.type.v2.Block" {
var seenDetailLevel bool
Expand Down

0 comments on commit f8c80ef

Please sign in to comment.