Skip to content

Commit

Permalink
bump to new fire_core and clean up and refactor tools and commands
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Nov 21, 2023
1 parent bf1fc15 commit e286a44
Show file tree
Hide file tree
Showing 30 changed files with 472 additions and 3,419 deletions.
42 changes: 0 additions & 42 deletions bin/test.sh

This file was deleted.

162 changes: 158 additions & 4 deletions bt/reader.go → blockreader/bigtable.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,178 @@
package bt
package blockreader

import (
"bytes"
"compress/bzip2"
"compress/gzip"
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"math/big"
"os/exec"
"strings"
"time"

"cloud.google.com/go/bigtable"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type BigtableBlockReader struct {
bt *bigtable.Client
maxConnAttempt uint64

logger *zap.Logger
tracer logging.Tracer
}

func NewBigtableReader(bt *bigtable.Client, maxConnectionAttempt uint64, logger *zap.Logger, tracer logging.Tracer) *BigtableBlockReader {
return &BigtableBlockReader{
bt: bt,
logger: logger,
tracer: tracer,
maxConnAttempt: maxConnectionAttempt,
}
}

var PrintFreq = uint64(10)

func (r *BigtableBlockReader) Read(
ctx context.Context,
startBlockNum,
stopBlockNum uint64,
processBlock func(block *pbsolv1.Block) error,
) error {
var seenStartBlock bool
var lastSeenBlock *pbsolv1.Block
var fatalError error

r.logger.Info("launching firehose-solana reprocessing",
zap.Uint64("start_block_num", startBlockNum),
zap.Uint64("stop_block_num", stopBlockNum),
)
table := r.bt.Open("blocks")
attempts := uint64(0)

for {
if lastSeenBlock != nil {
resolvedStartBlock := lastSeenBlock.GetFirehoseBlockNumber()
r.logger.Debug("restarting read rows will retry last boundary",
zap.Uint64("last_seen_block", lastSeenBlock.GetFirehoseBlockNumber()),
zap.Uint64("resolved_block", resolvedStartBlock),
)
startBlockNum = resolvedStartBlock
}

btRange := bigtable.NewRange(fmt.Sprintf("%016x", startBlockNum), "")
err := table.ReadRows(ctx, btRange, func(row bigtable.Row) bool {

blk, zlogger, err := r.processRow(row)
if err != nil {
fatalError = fmt.Errorf("failed to read row: %w", err)
return false
}

if !seenStartBlock {
if blk.Slot < startBlockNum {
r.logger.Debug("skipping blow below start block",
zap.Uint64("expected_block", startBlockNum),
)
return true
}
seenStartBlock = true
}

if lastSeenBlock != nil && lastSeenBlock.Blockhash == blk.Blockhash {
r.logger.Debug("skipping block already seed",
zap.Object("blk", blk),
)
return true
}

if lastSeenBlock != nil && (lastSeenBlock.Blockhash != blk.PreviousBlockhash) {
// Weird cases where we do not receive the next linkeable block.
// we should try to reconnect
r.logger.Warn("received unlikable block",
zap.Object("last_seen_blk", lastSeenBlock),
zap.Object("blk", blk),
zap.String("blk_previous_blockhash", blk.PreviousBlockhash),
)
return false
}

r.progressLog(blk, zlogger)
lastSeenBlock = blk
if err := processBlock(blk); err != nil {
fatalError = fmt.Errorf("failed to write blokc: %w", err)
return false
}

if stopBlockNum != 0 && blk.GetFirehoseBlockNumber() > stopBlockNum {
return false
}

return true
})

if err != nil {
attempts++
if attempts >= r.maxConnAttempt {
return fmt.Errorf("error while reading rowns, reached max attempts %d: %w", attempts, err)
}
r.logger.Error("error white reading rows", zap.Error(err), zap.Reflect("last_seen_block", lastSeenBlock), zap.Uint64("attempts", attempts))
continue
}
if fatalError != nil {
msg := "no blocks senn"
if lastSeenBlock != nil {
msg = fmt.Sprintf("last seen block %d (%s)", lastSeenBlock.GetFirehoseBlockNumber(), lastSeenBlock.GetFirehoseBlockID())
}
return fmt.Errorf("read blocks finished with a fatal error, %s: %w", msg, fatalError)
}
var opt []zap.Field
if lastSeenBlock != nil {
opt = append(opt, zap.Object("last_seen_block", lastSeenBlock))
}
r.logger.Debug("read block finished", opt...)
if stopBlockNum != 0 {
return nil
}
r.logger.Debug("stop block is num will sleep for 5 seconds and retry")
time.Sleep(5 * time.Second)
}
}

func (r *BigtableBlockReader) progressLog(blk *pbsolv1.Block, zlogger *zap.Logger) {
if r.tracer.Enabled() {
zlogger.Debug("handing block",
zap.Uint64("parent_slot", blk.ParentSlot),
zap.String("hash", blk.Blockhash),
)
}

if blk.Slot%PrintFreq == 0 {
opts := []zap.Field{
zap.String("hash", blk.Blockhash),
zap.String("previous_hash", blk.GetFirehoseBlockParentID()),
zap.Uint64("parent_slot", blk.ParentSlot),
}

if blk.BlockTime != nil {
opts = append(opts, zap.Int64("timestamp", blk.BlockTime.Timestamp))
} else {
opts = append(opts, zap.Int64("timestamp", 0))
}

zlogger.Info(fmt.Sprintf("processing block 1 / %d", PrintFreq), opts...)
}

}

type RowType string

const (
Expand All @@ -38,7 +192,7 @@ func explodeRow(row bigtable.Row) (*big.Int, RowType, []byte) {
return blockNum, rowType, el.Value
}

func (r *Client) processRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) {
func (r *BigtableBlockReader) processRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) {
blockNum, rowType, rowCnt := explodeRow(row)
zlogger := r.logger.With(
zap.Uint64("block_num", blockNum.Uint64()),
Expand Down Expand Up @@ -138,7 +292,7 @@ func externalBinToProto(in []byte, command string, args ...string) ([]byte, erro
return cnt, nil
}

func (r *Client) decompress(in []byte) (out []byte, err error) {
func (r *BigtableBlockReader) decompress(in []byte) (out []byte, err error) {
switch in[0] {
case 0:
r.logger.Debug("no compression found")
Expand Down
Loading

0 comments on commit e286a44

Please sign in to comment.