From d155a32bf07ff8c9c82e606e8edac189d3654ae5 Mon Sep 17 00:00:00 2001 From: billettc Date: Tue, 3 Oct 2023 11:31:43 -0400 Subject: [PATCH] Table lookup reader integration --- accountresolver/processor.go | 57 ++++++++++++++++------------ accountresolver/processor_test.go | 12 ++---- bt/core.go | 3 ++ cmd/firesol/tablelookup.go | 4 +- cmd/firesol/tools_bigtable_blocks.go | 21 ++++++++++ 5 files changed, 63 insertions(+), 34 deletions(-) diff --git a/accountresolver/processor.go b/accountresolver/processor.go index e1479c21..ba72f774 100644 --- a/accountresolver/processor.go +++ b/accountresolver/processor.go @@ -90,40 +90,46 @@ func NewCursor(blockNum uint64) *Cursor { type Processor struct { accountsResolver AccountsResolver - cursor *Cursor readerName string logger *zap.Logger stats *stats } -func NewProcessor(readerName string, cursor *Cursor, accountsResolver AccountsResolver, logger *zap.Logger) *Processor { +func NewProcessor(readerName string, accountsResolver AccountsResolver, logger *zap.Logger) *Processor { return &Processor{ readerName: readerName, accountsResolver: accountsResolver, - cursor: cursor, logger: logger, stats: &stats{}, } } -func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { - startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100 //This is the first block slot of the last merge block file - startBlockNum += 100 //This is the first block slot of the next merge block file +func (p *Processor) ResetStats() { + p.stats = &stats{} +} + +func (p *Processor) LogStats() { + p.stats.log(p.logger) +} + +func (p *Processor) ProcessMergeBlocks(ctx context.Context, cursor *Cursor, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { + startBlockNum := cursor.slotNum - cursor.slotNum%100 //This is the first block slot of the last merge block file + startBlockNum += 100 //This is the first block slot of the next merge block file paddedBlockNum := fmt.Sprintf("%010d", startBlockNum) - p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", p.cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum)) + p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum)) mergeBlocksFileChan := make(chan *mergeBlocksFile, 10) go func() { - err := p.processMergeBlocksFiles(ctx, mergeBlocksFileChan, destinationStore, encoder) + err := p.processMergeBlocksFiles(ctx, cursor, mergeBlocksFileChan, destinationStore, encoder) panic(fmt.Errorf("processing merge blocks files: %w", err)) }() err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error { mbf := newMergeBlocksFile(filename, p.logger) go func() { - err := mbf.process(ctx, sourceStore, p.cursor) + err := mbf.process(ctx, sourceStore, cursor) if err != nil { panic(fmt.Errorf("processing merge block file %s: %w", mbf.filename, err)) } @@ -185,8 +191,8 @@ func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, } blk := block.ToProtocol().(*pbsol.Block) - if blk.Slot < uint64(firstBlockOfFile) || blk.Slot <= cursor.slotNum { - f.logger.Info("skip block", zap.Uint64("slot", blk.Slot)) + if blk.Slot < uint64(firstBlockOfFile) { + f.logger.Info("skip block process in previous file", zap.Uint64("slot", blk.Slot)) continue } @@ -194,7 +200,7 @@ func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, } } -func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { +func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { for mbf := range mergeBlocksFileChan { p.stats = &stats{ @@ -225,16 +231,26 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile return } + if blk.Slot <= cursor.slotNum { + p.logger.Info("skip block", zap.Uint64("slot", blk.Slot)) + continue + } + + if cursor.slotNum != blk.ParentSlot { + bundleReader.PushError(fmt.Errorf("cursor block num %d is not the same as parent slot num %d of block %d", cursor.slotNum, blk.ParentSlot, blk.Slot)) + return + } + start := time.Now() err := p.ProcessBlock(context.Background(), blk) if err != nil { bundleReader.PushError(fmt.Errorf("processing block: %w", err)) return } + p.stats.totalBlockProcessingDuration += time.Since(start) nailer.Push(ctx, blk) - p.stats.totalBlockCount += 1 p.stats.totalBlockHandlingDuration += time.Since(start) } } @@ -248,6 +264,7 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err)) return } + cursor.slotNum = bb.Num() p.stats.totalBlockPushDuration += time.Since(pushStart) } bundleReader.Close() @@ -258,9 +275,9 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile } p.stats.writeDurationAfterLastPush = time.Since(lastPushTime) //p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start))) - err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor) + err = p.accountsResolver.StoreCursor(ctx, p.readerName, cursor) if err != nil { - return fmt.Errorf("storing cursor at block %d: %w", p.cursor.slotNum, err) + return fmt.Errorf("storing cursor at block %d: %w", cursor.slotNum, err) } p.stats.log(p.logger) @@ -270,13 +287,6 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile } func (p *Processor) ProcessBlock(ctx context.Context, block *pbsol.Block) error { - if p.cursor == nil { - return fmt.Errorf("cursor is nil") - } - - if p.cursor.slotNum != block.ParentSlot { - return fmt.Errorf("cursor block num %d is not the same as parent slot num %d of block %d", p.cursor.slotNum, block.ParentSlot, block.Slot) - } p.stats.transactionCount += len(block.Transactions) for _, trx := range block.Transactions { if trx.Meta.Err != nil { @@ -293,8 +303,7 @@ func (p *Processor) ProcessBlock(ctx context.Context, block *pbsol.Block) error return fmt.Errorf("managing address lookup at block %d: %w", block.Slot, err) } } - - p.cursor.slotNum = block.Slot + p.stats.totalBlockCount += 1 return nil } diff --git a/accountresolver/processor_test.go b/accountresolver/processor_test.go index 591f6fd7..c9370713 100644 --- a/accountresolver/processor_test.go +++ b/accountresolver/processor_test.go @@ -65,9 +65,8 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) { db, err := kvstore.New("badger3:///tmp/my-badger.db") require.NoError(t, err) - cursor := NewCursor(185_914_861) resolver := NewKVDBAccountsResolver(db, zap.NewNop()) - p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) + p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) err = p.ProcessBlock(context.Background(), solBlock) require.NoError(t, err) @@ -144,9 +143,8 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) { db, err := kvstore.New("badger3:///tmp/my-badger.db") require.NoError(t, err) - cursor := NewCursor(157_564_919) resolver := NewKVDBAccountsResolver(db, zap.NewNop()) - p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) + p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) err = p.ProcessBlock(context.Background(), solBlock) require.NoError(t, err) @@ -219,9 +217,8 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku db, err := kvstore.New("badger3:///tmp/my-badger.db") require.NoError(t, err) - cursor := NewCursor(185_914_861) resolver := NewKVDBAccountsResolver(db, zap.NewNop()) - p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) + p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) err = p.accountsResolver.Extend(context.Background(), 185_914_860, []byte{0x00}, tableLookupAddressToResolve, Accounts{AddressTableLookupAccountProgram}) require.NoError(t, err) @@ -311,9 +308,8 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa db, err := kvstore.New("badger3:///tmp/my-badger.db") require.NoError(t, err) - cursor := NewCursor(185_914_861) resolver := NewKVDBAccountsResolver(db, zap.NewNop()) - p := NewProcessor("test", cursor, NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) + p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) // Pre populate the table lookup account with the address table lookup program err = p.accountsResolver.Extend(context.Background(), 185_914_860, []byte{0x00}, tableLookupAccountInTransaction, Accounts{tableAccountToExtend}) diff --git a/bt/core.go b/bt/core.go index 7ce9af52..3eaf405e 100644 --- a/bt/core.go +++ b/bt/core.go @@ -97,6 +97,9 @@ func (r *Client) ReadBlocks( r.progressLog(blk, zlogger) lastSeenBlock = blk + + //todo: resolve address lookup + if err := writer(blk); err != nil { fatalError = fmt.Errorf("failed to write blokc: %w", err) return false diff --git a/cmd/firesol/tablelookup.go b/cmd/firesol/tablelookup.go index fa4ab946..408f4cfe 100644 --- a/cmd/firesol/tablelookup.go +++ b/cmd/firesol/tablelookup.go @@ -60,9 +60,9 @@ func processAddressLookupE(chain *firecore.Chain[*pbsolv1.Block], logger *zap.Lo } fmt.Println("Cursor", cursor) - processor := accountsresolver.NewProcessor("reproc", cursor, resolver, logger) + processor := accountsresolver.NewProcessor("reproc", resolver, logger) - err = processor.ProcessMergeBlocks(ctx, sourceStore, destinationStore, chain.BlockEncoder) + err = processor.ProcessMergeBlocks(ctx, cursor, sourceStore, destinationStore, chain.BlockEncoder) if err != nil { return fmt.Errorf("unable to process merge blocks: %w", err) } diff --git a/cmd/firesol/tools_bigtable_blocks.go b/cmd/firesol/tools_bigtable_blocks.go index cd836470..0088f6b3 100644 --- a/cmd/firesol/tools_bigtable_blocks.go +++ b/cmd/firesol/tools_bigtable_blocks.go @@ -6,6 +6,9 @@ import ( "fmt" "strconv" + accountsresolver "github.com/streamingfast/firehose-solana/accountresolver" + kvstore "github.com/streamingfast/kvdb/store" + "cloud.google.com/go/bigtable" "github.com/spf13/cobra" "github.com/streamingfast/cli/sflags" @@ -28,6 +31,7 @@ func newToolsBigTableBlocksCmd(logger *zap.Logger, tracer logging.Tracer) *cobra cmd.Flags().Bool("firehose-enabled", false, "When enable the blocks read will output Firehose formatted logs 'FIRE '") cmd.Flags().Bool("compact", false, "When printing in JSON it will print compact instead of pretty-printed output") cmd.Flags().Bool("linkable", false, "Ensure that no block is skipped they are linkeable") + cmd.Flags().String("table-lookup-dsn", "", "DSN to the table lookup kv db") return cmd } @@ -43,6 +47,7 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm linkable := sflags.MustGetBool(cmd, "linkable") btProject := sflags.MustGetString(cmd, "bt-project") btInstance := sflags.MustGetString(cmd, "bt-instance") + tableLookupDSN := sflags.MustGetString(cmd, "table-lookup-dsn") logger.Info("retrieving from bigtable", zap.Bool("firehose_enabled", firehoseEnabled), @@ -52,6 +57,7 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm zap.String("stop_block_num", stopBlockNumStr), zap.String("bt_project", btProject), zap.String("bt_instance", btInstance), + zap.String("table_lookup_dsn", tableLookupDSN), ) client, err := bigtable.NewClient(ctx, btProject, btInstance) if err != nil { @@ -69,8 +75,23 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm btClient := bt.New(client, 10, logger, tracer) + db, err := kvstore.New(tableLookupDSN) + if err != nil { + return fmt.Errorf("unable to create kv store: %w", err) + } + + resolver := accountsresolver.NewKVDBAccountsResolver(db, logger) + processor := accountsresolver.NewProcessor("reader", resolver, logger) + return btClient.ReadBlocks(ctx, startBlockNum, stopBlockNum, linkable, func(block *pbsolv1.Block) error { if firehoseEnabled { + processor.ResetStats() + err := processor.ProcessBlock(ctx, block) + if err != nil { + return fmt.Errorf("unable to process table lookup for block: %w", err) + } + processor.LogStats() + cnt, err := proto.Marshal(block) if err != nil { return fmt.Errorf("failed to proto marshal pb sol block: %w", err)