Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Jan 9, 2024
1 parent d5ebed8 commit 96f92ea
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 14 deletions.
44 changes: 37 additions & 7 deletions block/fetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,58 @@ import (
"go.uber.org/zap"
)

// todo: implement firecore.BlockFetcher
type RPC struct {
type RPCFetcher struct {
rpcClient *rpc.Client
latest uint64
latestSlot uint64
latestBlockRetryInterval time.Duration
fetchInterval time.Duration
lastFetchAt time.Time
logger *zap.Logger
}

func (r *RPC) Fetch(ctx context.Context, blkNum uint64) (*pbbstream.Block, error) {
blockResult, err := r.rpcClient.GetBlock(ctx, blkNum)
func NewRPC(rpcClient *rpc.Client, fetchInterval time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *RPCFetcher {
return &RPCFetcher{
rpcClient: rpcClient,
fetchInterval: fetchInterval,
latestBlockRetryInterval: latestBlockRetryInterval,
logger: logger,
}
}

func (f *RPCFetcher) Fetch(ctx context.Context, blockNum uint64) (out *pbbstream.Block, err error) {
f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum))

for f.latestSlot < blockNum {
f.latestSlot, err = f.rpcClient.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return nil, fmt.Errorf("fetching latestSlot block num: %w", err)
}

f.logger.Info("got latestSlot block", zap.Uint64("latestSlot", f.latestSlot), zap.Uint64("block_num", blockNum))
//
if f.latestSlot < blockNum {
time.Sleep(f.latestBlockRetryInterval)
continue
}
break
}

blockResult, err := f.rpcClient.GetBlockWithOpts(ctx, blockNum, &rpc.GetBlockOpts{
Commitment: rpc.CommitmentConfirmed,
})
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blkNum, err)
return nil, fmt.Errorf("fetching block %d: %w", blockNum, err)
}
block := blockFromBlockResult(blockResult)
return block, nil
}

func blockFromBlockResult(b *rpc.GetBlockResult) *pbbstream.Block {

panic("implement me")
//todo: convert block result to pbsol.Block
//todo: return pbbstream.Block

//panic("implement me")
block := &pbbstream.Block{
//Number: b.BlockHeight,
//Id: "",
Expand Down
3 changes: 1 addition & 2 deletions cmd/firesol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"github.com/spf13/cobra"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-solana/cmd/firesol/bigtable"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand All @@ -28,7 +27,7 @@ func Chain() *firecore.Chain[*pbsol.Block] {

RegisterExtraCmd: func(chain *firecore.Chain[*pbsol.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
toolsCmd.AddCommand(newPollerCmd(zlog, tracer))
toolsCmd.AddCommand(bigtable.NewBigTableCmd(zlog, tracer))
//toolsCmd.AddCommand(bigtable.NewBigTableCmd(zlog, tracer))
return nil
},
},
Expand Down
30 changes: 25 additions & 5 deletions cmd/firesol/rpc/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@ import (
"fmt"
"path"
"strconv"
"time"

"github.com/gagliardetto/solana-go/rpc"
"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
"github.com/streamingfast/firehose-solana/block/fetcher"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)
Expand All @@ -27,6 +32,7 @@ func NewPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {

func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()
rpcEndpoint := args[0]

dataDir := sflags.MustGetString(cmd, "data-dir")
Expand All @@ -48,13 +54,27 @@ func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecu
zap.Duration("interval_between_fetch", fetchInterval),
)

//todo: init fetcher
//todo: init poller handler
//todo: init poller
rpcClient := rpc.New(rpcEndpoint)

//todo: fetch latest block from chain rpc
latestBlockRetryInterval := 250 * time.Millisecond
poller := blockpoller.New(
fetcher.NewRPC(rpcClient, fetchInterval, latestBlockRetryInterval, logger),
blockpoller.NewFireBlockHandler("type.googleapis.com/sf.solana.type.v1.Block"),
blockpoller.WithStoringState(stateDir),
blockpoller.WithLogger(logger),
)

latestSlot, err := rpcClient.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return fmt.Errorf("getting latest block: %w", err)
}

//todo: run the poller
requestedBlock, err := rpcClient.GetBlock(ctx, latestSlot)

err = poller.Run(ctx, firstStreamableBlock, bstream.NewBlockRef(requestedBlock.Blockhash.String(), latestSlot))
if err != nil {
return fmt.Errorf("running poller: %w", err)
}

return nil
}
Expand Down

0 comments on commit 96f92ea

Please sign in to comment.