From fd3f70f552e6ad6215d1abaec6fddcbea9c5c7be Mon Sep 17 00:00:00 2001 From: billettc Date: Wed, 4 Oct 2023 08:25:56 -0400 Subject: [PATCH] multi threaded merge block write --- accountresolver/processor.go | 150 +++++++++++++++------------ accountresolver/processor_test.go | 8 +- cmd/firesol/tools_bigtable_blocks.go | 6 +- 3 files changed, 90 insertions(+), 74 deletions(-) diff --git a/accountresolver/processor.go b/accountresolver/processor.go index 59359ad7..cc99b075 100644 --- a/accountresolver/processor.go +++ b/accountresolver/processor.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" ) -type stats struct { +type Stats struct { startProcessing time.Time transactionCount int lookupCount int @@ -35,9 +35,10 @@ type stats struct { cacheHit int totalBlockPushDuration time.Duration writeDurationAfterLastPush time.Duration + lastBlockPushedAt time.Time } -func (s *stats) log(logger *zap.Logger) { +func (s *Stats) Log(logger *zap.Logger) { lookupAvg := time.Duration(0) if s.lookupCount > 0 { lookupAvg = s.totalLookupDuration / time.Duration(s.lookupCount) @@ -71,7 +72,7 @@ func (s *stats) log(logger *zap.Logger) { zap.String("average_transaction_processing_duration", durafmt.Parse(s.totalTransactionProcessingDuration/time.Duration(s.transactionCount)).String()), zap.String("average_lookup_duration", durafmt.Parse(lookupAvg).String()), zap.String("average_extend_duration", durafmt.Parse(extendAvg).String()), - zap.String("write_duration_after_last_push", durafmt.Parse(s.writeDurationAfterLastPush).String()), + zap.String("write_duration_after_last_push", durafmt.Parse(time.Since(s.lastBlockPushedAt)).String()), ) } @@ -92,7 +93,6 @@ type Processor struct { accountsResolver AccountsResolver readerName string logger *zap.Logger - stats *stats } func NewProcessor(readerName string, accountsResolver AccountsResolver, logger *zap.Logger) *Processor { @@ -100,18 +100,9 @@ func NewProcessor(readerName string, accountsResolver AccountsResolver, logger * readerName: readerName, accountsResolver: accountsResolver, logger: logger, - stats: &stats{}, } } -func (p *Processor) ResetStats() { - p.stats = &stats{} -} - -func (p *Processor) LogStats() { - p.stats.log(p.logger) -} - func (p *Processor) ProcessMergeBlocks(ctx context.Context, cursor *Cursor, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { startBlockNum := cursor.slotNum - cursor.slotNum%100 //This is the first block slot of the last merge block file startBlockNum += 100 //This is the first block slot of the next merge block file @@ -119,7 +110,7 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, cursor *Cursor, sour p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum)) - mergeBlocksFileChan := make(chan *mergeBlocksFile, 10) + mergeBlocksFileChan := make(chan *mergeBlocksFile, 20) go func() { err := p.processMergeBlocksFiles(ctx, cursor, mergeBlocksFileChan, destinationStore, encoder) @@ -129,11 +120,10 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, cursor *Cursor, sour err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error { mbf := newMergeBlocksFile(filename, p.logger) go func() { - err := mbf.process(ctx, sourceStore, cursor) + err := mbf.process(ctx, sourceStore) if err != nil { panic(fmt.Errorf("processing merge block file %s: %w", mbf.filename, err)) } - }() mergeBlocksFileChan <- mbf return nil @@ -162,7 +152,7 @@ func newMergeBlocksFile(fileName string, logger *zap.Logger) *mergeBlocksFile { } } -func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, cursor *Cursor) error { +func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store) error { f.logger.Info("Processing merge block file", zap.String("filename", f.filename)) firstBlockOfFile, err := strconv.Atoi(strings.TrimLeft(f.filename, "0")) if err != nil { @@ -200,16 +190,48 @@ func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, } } +type bundleJob struct { + filename string + cursor *Cursor + stats *Stats + bundleReader *BundleReader +} + func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { + writerNailer := dhammer.NewNailer(50, func(ctx context.Context, br *bundleJob) (*bundleJob, error) { + err := destinationStore.WriteObject(ctx, br.filename, br.bundleReader) + if err != nil { + return br, fmt.Errorf("writing bundle file: %w", err) + } + + return br, nil + }) + writerNailer.OnTerminating(func(err error) { + panic(fmt.Errorf("writing bundle file: %w", err)) + }) + writerNailer.Start(ctx) + + go func() { + for out := range writerNailer.Out { + p.logger.Info("new merge blocks file written:", zap.String("filename", out.filename)) + err := p.accountsResolver.StoreCursor(ctx, p.readerName, out.cursor) + if err != nil { + panic(fmt.Errorf("storing cursor at block %d: %w", out.cursor.slotNum, err)) + } + out.stats.Log(p.logger) + out.bundleReader.Close() + } + }() + for mbf := range mergeBlocksFileChan { - p.stats = &stats{ + stats := &Stats{ startProcessing: time.Now(), } p.logger.Info("Receive merge block file", zap.String("filename", mbf.filename)) bundleReader := NewBundleReader(ctx, p.logger) - nailer := dhammer.NewNailer(50, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) { + decoderNailer := dhammer.NewNailer(100, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) { b, err := encoder.Encode(blk) if err != nil { return nil, fmt.Errorf("encoding block: %w", err) @@ -217,7 +239,17 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor, return b, nil }) - nailer.Start(ctx) + decoderNailer.OnTerminating(func(err error) { + panic(fmt.Errorf("encoding block: %w", err)) + }) + decoderNailer.Start(ctx) + + writerNailer.Push(ctx, &bundleJob{ + mbf.filename, + cursor, + stats, + bundleReader, + }) mbf := mbf go func() { @@ -227,7 +259,7 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor, return case blk, ok := <-mbf.blockChan: if !ok { - nailer.Close() + decoderNailer.Close() return } @@ -242,83 +274,67 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor, } start := time.Now() - err := p.ProcessBlock(context.Background(), blk) + err := p.ProcessBlock(context.Background(), stats, blk) if err != nil { bundleReader.PushError(fmt.Errorf("processing block: %w", err)) return } - p.stats.totalBlockProcessingDuration += time.Since(start) + stats.totalBlockProcessingDuration += time.Since(start) cursor.slotNum = blk.Slot + decoderNailer.Push(ctx, blk) - nailer.Push(ctx, blk) - p.stats.totalBlockHandlingDuration += time.Since(start) + stats.totalBlockHandlingDuration += time.Since(start) } } }() - lastPushTime := time.Now() - go func() { - for bb := range nailer.Out { - err := bundleReader.PushBlock(bb) - pushStart := time.Now() - if err != nil { - bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err)) - return - } - - p.stats.totalBlockPushDuration += time.Since(pushStart) + for bb := range decoderNailer.Out { + err := bundleReader.PushBlock(bb) + pushStart := time.Now() + if err != nil { + bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err)) + return fmt.Errorf("pushing block to bundle reader: %w", err) } - bundleReader.Close() - }() - err := destinationStore.WriteObject(ctx, mbf.filename, bundleReader) - if err != nil { - return fmt.Errorf("writing bundle file: %w", err) - } - p.stats.writeDurationAfterLastPush = time.Since(lastPushTime) - //p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start))) - err = p.accountsResolver.StoreCursor(ctx, p.readerName, cursor) - if err != nil { - return fmt.Errorf("storing cursor at block %d: %w", cursor.slotNum, err) + stats.totalBlockPushDuration += time.Since(pushStart) } - - p.stats.log(p.logger) + stats.lastBlockPushedAt = time.Now() } return nil } -func (p *Processor) ProcessBlock(ctx context.Context, block *pbsol.Block) error { - p.stats.transactionCount += len(block.Transactions) +func (p *Processor) ProcessBlock(ctx context.Context, stats *Stats, block *pbsol.Block) error { + stats.transactionCount += len(block.Transactions) for _, trx := range block.Transactions { if trx.Meta.Err != nil { continue } //p.logger.Debug("processing transaction", zap.Uint64("block_num", block.Slot), zap.String("trx_id", base58.Encode(trx.Transaction.Signatures[0]))) - err := p.applyTableLookup(ctx, block.Slot, trx) + err := p.applyTableLookup(ctx, stats, block.Slot, trx) if err != nil { return fmt.Errorf("applying table lookup at block %d: %w", block.Slot, err) } - err = p.manageAddressLookup(ctx, block.Slot, err, trx) + err = p.manageAddressLookup(ctx, stats, block.Slot, err, trx) if err != nil { return fmt.Errorf("managing address lookup at block %d: %w", block.Slot, err) } } - p.stats.totalBlockCount += 1 + stats.totalBlockCount += 1 return nil } -func (p *Processor) manageAddressLookup(ctx context.Context, blockNum uint64, err error, trx *pbsol.ConfirmedTransaction) error { - err = p.ProcessTransaction(ctx, blockNum, trx) +func (p *Processor) manageAddressLookup(ctx context.Context, stats *Stats, blockNum uint64, err error, trx *pbsol.ConfirmedTransaction) error { + err = p.ProcessTransaction(ctx, stats, blockNum, trx) if err != nil { return fmt.Errorf("processing transactions: %w", err) } return nil } -func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx *pbsol.ConfirmedTransaction) error { +func (p *Processor) applyTableLookup(ctx context.Context, stats *Stats, blockNum uint64, trx *pbsol.ConfirmedTransaction) error { start := time.Now() for _, addressTableLookup := range trx.Transaction.Message.AddressTableLookups { accs, cached, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey) @@ -326,7 +342,7 @@ func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx * return fmt.Errorf("resolving address table %s at block %d: %w", base58.Encode(addressTableLookup.AccountKey), blockNum, err) } if cached { - p.stats.cacheHit += 1 + stats.cacheHit += 1 } //p.logger.Debug("Resolve address table lookup", zap.String("trx", getTransactionHash(trx.Transaction.Signatures)), zap.String("account", base58.Encode(addressTableLookup.AccountKey)), zap.Int("count", len(accs)), zap.Bool("cached", cached)) trx.Transaction.Message.AccountKeys = append(trx.Transaction.Message.AccountKeys, accs.ToBytesArray()...) @@ -335,8 +351,8 @@ func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx * lookupCount := len(trx.Transaction.Message.AddressTableLookups) if lookupCount > 0 { - p.stats.lookupCount += lookupCount - p.stats.totalLookupDuration += totalDuration + stats.lookupCount += lookupCount + stats.totalLookupDuration += totalDuration p.logger.Debug( "applyTableLookup", zap.Duration("duration", totalDuration), @@ -348,12 +364,12 @@ func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx * return nil } -func (p *Processor) ProcessTransaction(ctx context.Context, blockNum uint64, confirmedTransaction *pbsol.ConfirmedTransaction) error { +func (p *Processor) ProcessTransaction(ctx context.Context, stats *Stats, blockNum uint64, confirmedTransaction *pbsol.ConfirmedTransaction) error { start := time.Now() accountKeys := confirmedTransaction.Transaction.Message.AccountKeys for compileIndex, compiledInstruction := range confirmedTransaction.Transaction.Message.Instructions { idx := compiledInstruction.ProgramIdIndex - err := p.ProcessInstruction(ctx, blockNum, confirmedTransaction.Transaction.Signatures[0], confirmedTransaction.Transaction.Message.AccountKeys[idx], accountKeys, compiledInstruction) + err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], confirmedTransaction.Transaction.Message.AccountKeys[idx], accountKeys, compiledInstruction) if err != nil { return fmt.Errorf("confirmedTransaction %s processing compiled instruction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err) } @@ -367,17 +383,17 @@ func (p *Processor) ProcessTransaction(ctx context.Context, blockNum uint64, con return fmt.Errorf("missing account key at index %d for transaction %s with account keys count of %d", instruction.ProgramIdIndex, getTransactionHash(confirmedTransaction.Transaction.Signatures), len(accountKeys)) } - err := p.ProcessInstruction(ctx, blockNum, confirmedTransaction.Transaction.Signatures[0], accountKeys[instruction.ProgramIdIndex], accountKeys, instruction) + err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], accountKeys[instruction.ProgramIdIndex], accountKeys, instruction) if err != nil { return fmt.Errorf("confirmedTransaction %s processing instruxction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err) } } } - p.stats.totalTransactionProcessingDuration += time.Since(start) + stats.totalTransactionProcessingDuration += time.Since(start) return nil } -func (p *Processor) ProcessInstruction(ctx context.Context, blockNum uint64, trxHash []byte, programAccount Account, accountKeys [][]byte, instructionable pbsol.Instructionable) error { +func (p *Processor) ProcessInstruction(ctx context.Context, stats *Stats, blockNum uint64, trxHash []byte, programAccount Account, accountKeys [][]byte, instructionable pbsol.Instructionable) error { if !bytes.Equal(programAccount, AddressTableLookupAccountProgram) { return nil } @@ -395,8 +411,8 @@ func (p *Processor) ProcessInstruction(ctx context.Context, blockNum uint64, trx return fmt.Errorf("extending address table %s at block %d: %w", tableLookupAccount, blockNum, err) } - p.stats.totalExtendDuration += time.Since(start) - p.stats.extendCount += 1 + stats.totalExtendDuration += time.Since(start) + stats.extendCount += 1 } return nil diff --git a/accountresolver/processor_test.go b/accountresolver/processor_test.go index c9370713..3add5b60 100644 --- a/accountresolver/processor_test.go +++ b/accountresolver/processor_test.go @@ -67,7 +67,7 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) { resolver := NewKVDBAccountsResolver(db, zap.NewNop()) p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) - err = p.ProcessBlock(context.Background(), solBlock) + err = p.ProcessBlock(context.Background(), &Stats{}, solBlock) require.NoError(t, err) accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount) @@ -145,7 +145,7 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) { resolver := NewKVDBAccountsResolver(db, zap.NewNop()) p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop()) - err = p.ProcessBlock(context.Background(), solBlock) + err = p.ProcessBlock(context.Background(), &Stats{}, solBlock) require.NoError(t, err) accounts, _, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount) @@ -228,7 +228,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku accounts := NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys) require.Equal(t, 2, len(accounts)) - err = p.ProcessBlock(context.Background(), solBlock) + err = p.ProcessBlock(context.Background(), &Stats{}, solBlock) require.NoError(t, err) accounts = NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys) @@ -317,7 +317,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa err = resolver.store.FlushPuts(context.Background()) require.NoError(t, err) - err = p.ProcessBlock(context.Background(), solBlock) + err = p.ProcessBlock(context.Background(), &Stats{}, solBlock) require.NoError(t, err) accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend) diff --git a/cmd/firesol/tools_bigtable_blocks.go b/cmd/firesol/tools_bigtable_blocks.go index 0088f6b3..8afe6977 100644 --- a/cmd/firesol/tools_bigtable_blocks.go +++ b/cmd/firesol/tools_bigtable_blocks.go @@ -85,12 +85,12 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm return btClient.ReadBlocks(ctx, startBlockNum, stopBlockNum, linkable, func(block *pbsolv1.Block) error { if firehoseEnabled { - processor.ResetStats() - err := processor.ProcessBlock(ctx, block) + stats := &accountsresolver.Stats{} + err := processor.ProcessBlock(ctx, stats, block) if err != nil { return fmt.Errorf("unable to process table lookup for block: %w", err) } - processor.LogStats() + stats.Log(logger) cnt, err := proto.Marshal(block) if err != nil {