Skip to content

Commit

Permalink
Table lookup reader integration
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Oct 3, 2023
1 parent fafd236 commit d155a32
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 34 deletions.
57 changes: 33 additions & 24 deletions accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,40 +90,46 @@ func NewCursor(blockNum uint64) *Cursor {

type Processor struct {
accountsResolver AccountsResolver
cursor *Cursor
readerName string
logger *zap.Logger
stats *stats
}

func NewProcessor(readerName string, cursor *Cursor, accountsResolver AccountsResolver, logger *zap.Logger) *Processor {
func NewProcessor(readerName string, accountsResolver AccountsResolver, logger *zap.Logger) *Processor {
return &Processor{
readerName: readerName,
accountsResolver: accountsResolver,
cursor: cursor,
logger: logger,
stats: &stats{},
}
}

func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100 //This is the first block slot of the last merge block file
startBlockNum += 100 //This is the first block slot of the next merge block file
func (p *Processor) ResetStats() {
p.stats = &stats{}
}

func (p *Processor) LogStats() {
p.stats.log(p.logger)
}

func (p *Processor) ProcessMergeBlocks(ctx context.Context, cursor *Cursor, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
startBlockNum := cursor.slotNum - cursor.slotNum%100 //This is the first block slot of the last merge block file
startBlockNum += 100 //This is the first block slot of the next merge block file
paddedBlockNum := fmt.Sprintf("%010d", startBlockNum)

p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", p.cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum))
p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum))

mergeBlocksFileChan := make(chan *mergeBlocksFile, 10)

go func() {
err := p.processMergeBlocksFiles(ctx, mergeBlocksFileChan, destinationStore, encoder)
err := p.processMergeBlocksFiles(ctx, cursor, mergeBlocksFileChan, destinationStore, encoder)
panic(fmt.Errorf("processing merge blocks files: %w", err))
}()

err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error {
mbf := newMergeBlocksFile(filename, p.logger)
go func() {
err := mbf.process(ctx, sourceStore, p.cursor)
err := mbf.process(ctx, sourceStore, cursor)
if err != nil {
panic(fmt.Errorf("processing merge block file %s: %w", mbf.filename, err))
}
Expand Down Expand Up @@ -185,16 +191,16 @@ func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store,
}

blk := block.ToProtocol().(*pbsol.Block)
if blk.Slot < uint64(firstBlockOfFile) || blk.Slot <= cursor.slotNum {
f.logger.Info("skip block", zap.Uint64("slot", blk.Slot))
if blk.Slot < uint64(firstBlockOfFile) {
f.logger.Info("skip block process in previous file", zap.Uint64("slot", blk.Slot))
continue
}

f.blockChan <- blk
}
}

func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {

for mbf := range mergeBlocksFileChan {
p.stats = &stats{
Expand Down Expand Up @@ -225,16 +231,26 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile
return
}

if blk.Slot <= cursor.slotNum {
p.logger.Info("skip block", zap.Uint64("slot", blk.Slot))
continue
}

if cursor.slotNum != blk.ParentSlot {
bundleReader.PushError(fmt.Errorf("cursor block num %d is not the same as parent slot num %d of block %d", cursor.slotNum, blk.ParentSlot, blk.Slot))
return
}

start := time.Now()
err := p.ProcessBlock(context.Background(), blk)
if err != nil {
bundleReader.PushError(fmt.Errorf("processing block: %w", err))
return
}

p.stats.totalBlockProcessingDuration += time.Since(start)

nailer.Push(ctx, blk)
p.stats.totalBlockCount += 1
p.stats.totalBlockHandlingDuration += time.Since(start)
}
}
Expand All @@ -248,6 +264,7 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile
bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err))
return
}
cursor.slotNum = bb.Num()
p.stats.totalBlockPushDuration += time.Since(pushStart)
}
bundleReader.Close()
Expand All @@ -258,9 +275,9 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile
}
p.stats.writeDurationAfterLastPush = time.Since(lastPushTime)
//p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start)))
err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor)
err = p.accountsResolver.StoreCursor(ctx, p.readerName, cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", p.cursor.slotNum, err)
return fmt.Errorf("storing cursor at block %d: %w", cursor.slotNum, err)
}

p.stats.log(p.logger)
Expand All @@ -270,13 +287,6 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile
}

func (p *Processor) ProcessBlock(ctx context.Context, block *pbsol.Block) error {
if p.cursor == nil {
return fmt.Errorf("cursor is nil")
}

if p.cursor.slotNum != block.ParentSlot {
return fmt.Errorf("cursor block num %d is not the same as parent slot num %d of block %d", p.cursor.slotNum, block.ParentSlot, block.Slot)
}
p.stats.transactionCount += len(block.Transactions)
for _, trx := range block.Transactions {
if trx.Meta.Err != nil {
Expand All @@ -293,8 +303,7 @@ func (p *Processor) ProcessBlock(ctx context.Context, block *pbsol.Block) error
return fmt.Errorf("managing address lookup at block %d: %w", block.Slot, err)
}
}

p.cursor.slotNum = block.Slot
p.stats.totalBlockCount += 1

return nil
}
Expand Down
12 changes: 4 additions & 8 deletions accountresolver/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) {
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(185_914_861)
resolver := NewKVDBAccountsResolver(db, zap.NewNop())
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

Expand Down Expand Up @@ -144,9 +143,8 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) {
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(157_564_919)
resolver := NewKVDBAccountsResolver(db, zap.NewNop())
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
err = p.ProcessBlock(context.Background(), solBlock)
require.NoError(t, err)

Expand Down Expand Up @@ -219,9 +217,8 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(185_914_861)
resolver := NewKVDBAccountsResolver(db, zap.NewNop())
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())

err = p.accountsResolver.Extend(context.Background(), 185_914_860, []byte{0x00}, tableLookupAddressToResolve, Accounts{AddressTableLookupAccountProgram})
require.NoError(t, err)
Expand Down Expand Up @@ -311,9 +308,8 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
db, err := kvstore.New("badger3:///tmp/my-badger.db")
require.NoError(t, err)

cursor := NewCursor(185_914_861)
resolver := NewKVDBAccountsResolver(db, zap.NewNop())
p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())

// Pre populate the table lookup account with the address table lookup program
err = p.accountsResolver.Extend(context.Background(), 185_914_860, []byte{0x00}, tableLookupAccountInTransaction, Accounts{tableAccountToExtend})
Expand Down
3 changes: 3 additions & 0 deletions bt/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (r *Client) ReadBlocks(

r.progressLog(blk, zlogger)
lastSeenBlock = blk

//todo: resolve address lookup

if err := writer(blk); err != nil {
fatalError = fmt.Errorf("failed to write blokc: %w", err)
return false
Expand Down
4 changes: 2 additions & 2 deletions cmd/firesol/tablelookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func processAddressLookupE(chain *firecore.Chain[*pbsolv1.Block], logger *zap.Lo
}

fmt.Println("Cursor", cursor)
processor := accountsresolver.NewProcessor("reproc", cursor, resolver, logger)
processor := accountsresolver.NewProcessor("reproc", resolver, logger)

err = processor.ProcessMergeBlocks(ctx, sourceStore, destinationStore, chain.BlockEncoder)
err = processor.ProcessMergeBlocks(ctx, cursor, sourceStore, destinationStore, chain.BlockEncoder)
if err != nil {
return fmt.Errorf("unable to process merge blocks: %w", err)
}
Expand Down
21 changes: 21 additions & 0 deletions cmd/firesol/tools_bigtable_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"fmt"
"strconv"

accountsresolver "github.com/streamingfast/firehose-solana/accountresolver"
kvstore "github.com/streamingfast/kvdb/store"

"cloud.google.com/go/bigtable"
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
Expand All @@ -28,6 +31,7 @@ func newToolsBigTableBlocksCmd(logger *zap.Logger, tracer logging.Tracer) *cobra
cmd.Flags().Bool("firehose-enabled", false, "When enable the blocks read will output Firehose formatted logs 'FIRE <block_num> <block_payload_in_hex>'")
cmd.Flags().Bool("compact", false, "When printing in JSON it will print compact instead of pretty-printed output")
cmd.Flags().Bool("linkable", false, "Ensure that no block is skipped they are linkeable")
cmd.Flags().String("table-lookup-dsn", "", "DSN to the table lookup kv db")
return cmd
}

Expand All @@ -43,6 +47,7 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm
linkable := sflags.MustGetBool(cmd, "linkable")
btProject := sflags.MustGetString(cmd, "bt-project")
btInstance := sflags.MustGetString(cmd, "bt-instance")
tableLookupDSN := sflags.MustGetString(cmd, "table-lookup-dsn")

logger.Info("retrieving from bigtable",
zap.Bool("firehose_enabled", firehoseEnabled),
Expand All @@ -52,6 +57,7 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm
zap.String("stop_block_num", stopBlockNumStr),
zap.String("bt_project", btProject),
zap.String("bt_instance", btInstance),
zap.String("table_lookup_dsn", tableLookupDSN),
)
client, err := bigtable.NewClient(ctx, btProject, btInstance)
if err != nil {
Expand All @@ -69,8 +75,23 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm

btClient := bt.New(client, 10, logger, tracer)

db, err := kvstore.New(tableLookupDSN)
if err != nil {
return fmt.Errorf("unable to create kv store: %w", err)
}

resolver := accountsresolver.NewKVDBAccountsResolver(db, logger)
processor := accountsresolver.NewProcessor("reader", resolver, logger)

return btClient.ReadBlocks(ctx, startBlockNum, stopBlockNum, linkable, func(block *pbsolv1.Block) error {
if firehoseEnabled {
processor.ResetStats()
err := processor.ProcessBlock(ctx, block)
if err != nil {
return fmt.Errorf("unable to process table lookup for block: %w", err)
}
processor.LogStats()

cnt, err := proto.Marshal(block)
if err != nil {
return fmt.Errorf("failed to proto marshal pb sol block: %w", err)
Expand Down

0 comments on commit d155a32

Please sign in to comment.