Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Jan 9, 2024
1 parent e286a44 commit d5ebed8
Show file tree
Hide file tree
Showing 49 changed files with 311 additions and 23,761 deletions.
2 changes: 1 addition & 1 deletion blockreader/bigtable.go → block/fetcher/bigtable.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blockreader
package fetcher

import (
"bytes"
Expand Down
51 changes: 51 additions & 0 deletions block/fetcher/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package fetcher

import (
"context"
"fmt"
"time"

"github.com/gagliardetto/solana-go/rpc"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"go.uber.org/zap"
)

// todo: implement firecore.BlockFetcher
type RPC struct {
rpcClient *rpc.Client
latest uint64
latestBlockRetryInterval time.Duration
fetchInterval time.Duration
lastFetchAt time.Time
logger *zap.Logger
}

func (r *RPC) Fetch(ctx context.Context, blkNum uint64) (*pbbstream.Block, error) {
blockResult, err := r.rpcClient.GetBlock(ctx, blkNum)
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blkNum, err)
}
block := blockFromBlockResult(blockResult)
return block, nil
}

func blockFromBlockResult(b *rpc.GetBlockResult) *pbbstream.Block {

panic("implement me")
block := &pbbstream.Block{
//Number: b.BlockHeight,
//Id: "",
//ParentId: "",
//Timestamp: nil,
//LibNum: 0,
//PayloadKind: 0,
//PayloadVersion: 0,
//PayloadBuffer: nil,
//HeadNum: 0,
//ParentNum: 0,
//Payload: nil,
}

return block

}
4 changes: 2 additions & 2 deletions cmd/firesol/bigtable/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
)

func NewBigTableCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{Use: "bigtable", Short: "bigtable"}
cmd := &cobra.Command{Use: "poller", Short: "poller"}
cmd.PersistentFlags().String("bt-project", "mainnet-beta", "Bigtable project")
cmd.PersistentFlags().String("bt-instance", "solana-ledger", "Bigtable instance")

cmd.AddCommand(newPrintCmd(logger, tracer))
cmd.AddCommand(NewPollerCmd(logger, tracer))
return cmd
}
4 changes: 2 additions & 2 deletions cmd/firesol/bigtable/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-solana/blockreader"
"github.com/streamingfast/firehose-solana/block/fetcher"
pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand Down Expand Up @@ -61,7 +61,7 @@ func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecu
return fmt.Errorf("unable to parse stop block number %s: %w", stopBlockNumStr, err)
}

blockReader := blockreader.NewBigtableReader(client, 10, logger, tracer)
blockReader := fetcher.NewBigtableReader(client, 10, logger, tracer)

return blockReader.Read(ctx, startBlockNum, stopBlockNum, func(block *pbsolv1.Block) error {
cnt, err := proto.Marshal(block)
Expand Down
100 changes: 0 additions & 100 deletions cmd/firesol/bigtable/print.go

This file was deleted.

63 changes: 3 additions & 60 deletions cmd/firesol/main.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,17 @@
package main

import (
"context"
"fmt"
"time"

"github.com/spf13/cobra"
pbbstream "github.com/streamingfast/bstream/types/pb/sf/bstream/v1"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/firehose-solana/cmd/firesol/bigtable"
"github.com/streamingfast/firehose-solana/codec"
pbsol "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"google.golang.org/protobuf/reflect/protoreflect"
)

func init() {
firecore.UnsafePayloadKind = pbbstream.Protocol_SOLANA
firecore.UnsafeResolveReaderNodeStartBlock = readerNodeStartBlockResolver

}

func main() {
firecore.Main(&firecore.Chain[*pbsol.Block]{
func Chain() *firecore.Chain[*pbsol.Block] {
return &firecore.Chain[*pbsol.Block]{
ShortName: "sol",
LongName: "Solana",
ExecutableName: "firesol",
Expand All @@ -40,10 +24,6 @@ func main() {

BlockTransformerFactories: map[protoreflect.FullName]firecore.BlockTransformerFactory{},

ConsoleReaderFactory: func(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) {
return codec.NewBigtableConsoleReader(lines, blockEncoder, logger)
},

Tools: &firecore.ToolsConfig[*pbsol.Block]{

RegisterExtraCmd: func(chain *firecore.Chain[*pbsol.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
Expand All @@ -52,45 +32,8 @@ func main() {
return nil
},
},
})
}
}

// Version value, injected via go build `ldflags` at build time, **must** not be removed or inlined
var version = "dev"

func readerNodeStartBlockResolver(ctx context.Context, command *cobra.Command, runtime *launcher.Runtime, rootLog *zap.Logger) (uint64, error) {
startBlockNum, userDefined := sflags.MustGetUint64Provided(command, "reader-node-start-block-num")
if userDefined {
return startBlockNum, nil
}

mergedBlocksStoreURL, _, _, err := firecore.GetCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return 0, err
}

mergedBlocksStore, err := dstore.NewDBinStore(mergedBlocksStoreURL)
if err != nil {
return 0, fmt.Errorf("unable to create merged blocks store at path %q: %w", mergedBlocksStoreURL, err)
}

firstStreamableBlock := sflags.MustGetUint64(command, "common-first-streamable-block")

t0 := time.Now()
rootLog.Info("resolving reader node start block",
zap.Uint64("first_streamable_block", firstStreamableBlock),
zap.String("merged_block_store_url", mergedBlocksStoreURL),
)

lastMergedBlockNum := firecore.LastMergedBlockNum(ctx, firstStreamableBlock, mergedBlocksStore, rootLog)
if firstStreamableBlock != lastMergedBlockNum {
startBlockNum = lastMergedBlockNum + 100
}

rootLog.Info("start block resolved",
zap.Duration("elapsed", time.Since(t0)),
zap.Uint64("start_block", startBlockNum),
zap.Uint64("last_merged_block_num", lastMergedBlockNum),
)
return startBlockNum, nil
}
4 changes: 2 additions & 2 deletions cmd/firesol/rpc/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

func NewBigTableCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{Use: "rpc", Short: "rpc"}
cmd := &cobra.Command{Use: "poller", Short: "poller"}

cmd.AddCommand(newPrintCmd(logger, tracer))
cmd.AddCommand(NewPollerCmd(logger, tracer))
return cmd
}
39 changes: 36 additions & 3 deletions cmd/firesol/rpc/poller.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,61 @@
package rpc

import (
"fmt"
"path"
"strconv"

"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func NewPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "rpc <start_block_num> <stop_block_num>",
Use: "rpc <rpc-endpoint> <first-streamable-block>",
Short: "poll blocks from rpc endpoint",
Args: cobra.ExactArgs(2),
RunE: pollerRunE(logger, tracer),
}

cmd.Flags().String("rpc-endpoint", "", "RPC endpoint")
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")

return cmd
}

func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
//ctx := cmd.Context()
rpcEndpoint := args[0]

dataDir := sflags.MustGetString(cmd, "data-dir")
stateDir := path.Join(dataDir, "poller-state")

firstStreamableBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("unable to parse first streamable block %d: %w", firstStreamableBlock, err)
}

fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")

logger.Info(
"launching firehose-solana poller",
zap.String("rpc_endpoint", rpcEndpoint),
zap.String("data_dir", dataDir),
zap.String("state_dir", stateDir),
zap.Uint64("first_streamable_block", firstStreamableBlock),
zap.Duration("interval_between_fetch", fetchInterval),
)

//todo: init fetcher
//todo: init poller handler
//todo: init poller

//todo: fetch latest block from chain rpc

//todo: run the poller

return nil
}
}
Loading

0 comments on commit d5ebed8

Please sign in to comment.