Skip to content

Commit

Permalink
multi threaded merge block write
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Oct 4, 2023
1 parent 2368cd5 commit fd3f70f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 74 deletions.
150 changes: 83 additions & 67 deletions accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.uber.org/zap"
)

type stats struct {
type Stats struct {
startProcessing time.Time
transactionCount int
lookupCount int
Expand All @@ -35,9 +35,10 @@ type stats struct {
cacheHit int
totalBlockPushDuration time.Duration
writeDurationAfterLastPush time.Duration
lastBlockPushedAt time.Time
}

func (s *stats) log(logger *zap.Logger) {
func (s *Stats) Log(logger *zap.Logger) {
lookupAvg := time.Duration(0)
if s.lookupCount > 0 {
lookupAvg = s.totalLookupDuration / time.Duration(s.lookupCount)
Expand Down Expand Up @@ -71,7 +72,7 @@ func (s *stats) log(logger *zap.Logger) {
zap.String("average_transaction_processing_duration", durafmt.Parse(s.totalTransactionProcessingDuration/time.Duration(s.transactionCount)).String()),
zap.String("average_lookup_duration", durafmt.Parse(lookupAvg).String()),
zap.String("average_extend_duration", durafmt.Parse(extendAvg).String()),
zap.String("write_duration_after_last_push", durafmt.Parse(s.writeDurationAfterLastPush).String()),
zap.String("write_duration_after_last_push", durafmt.Parse(time.Since(s.lastBlockPushedAt)).String()),
)
}

Expand All @@ -92,34 +93,24 @@ type Processor struct {
accountsResolver AccountsResolver
readerName string
logger *zap.Logger
stats *stats
}

func NewProcessor(readerName string, accountsResolver AccountsResolver, logger *zap.Logger) *Processor {
return &Processor{
readerName: readerName,
accountsResolver: accountsResolver,
logger: logger,
stats: &stats{},
}
}

func (p *Processor) ResetStats() {
p.stats = &stats{}
}

func (p *Processor) LogStats() {
p.stats.log(p.logger)
}

func (p *Processor) ProcessMergeBlocks(ctx context.Context, cursor *Cursor, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
startBlockNum := cursor.slotNum - cursor.slotNum%100 //This is the first block slot of the last merge block file
startBlockNum += 100 //This is the first block slot of the next merge block file
paddedBlockNum := fmt.Sprintf("%010d", startBlockNum)

p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum))

mergeBlocksFileChan := make(chan *mergeBlocksFile, 10)
mergeBlocksFileChan := make(chan *mergeBlocksFile, 20)

go func() {
err := p.processMergeBlocksFiles(ctx, cursor, mergeBlocksFileChan, destinationStore, encoder)
Expand All @@ -129,11 +120,10 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, cursor *Cursor, sour
err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error {
mbf := newMergeBlocksFile(filename, p.logger)
go func() {
err := mbf.process(ctx, sourceStore, cursor)
err := mbf.process(ctx, sourceStore)
if err != nil {
panic(fmt.Errorf("processing merge block file %s: %w", mbf.filename, err))
}

}()
mergeBlocksFileChan <- mbf
return nil
Expand Down Expand Up @@ -162,7 +152,7 @@ func newMergeBlocksFile(fileName string, logger *zap.Logger) *mergeBlocksFile {
}
}

func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, cursor *Cursor) error {
func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store) error {
f.logger.Info("Processing merge block file", zap.String("filename", f.filename))
firstBlockOfFile, err := strconv.Atoi(strings.TrimLeft(f.filename, "0"))
if err != nil {
Expand Down Expand Up @@ -200,24 +190,66 @@ func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store,
}
}

type bundleJob struct {
filename string
cursor *Cursor
stats *Stats
bundleReader *BundleReader
}

func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {

writerNailer := dhammer.NewNailer(50, func(ctx context.Context, br *bundleJob) (*bundleJob, error) {
err := destinationStore.WriteObject(ctx, br.filename, br.bundleReader)
if err != nil {
return br, fmt.Errorf("writing bundle file: %w", err)
}

return br, nil
})
writerNailer.OnTerminating(func(err error) {
panic(fmt.Errorf("writing bundle file: %w", err))
})
writerNailer.Start(ctx)

go func() {
for out := range writerNailer.Out {
p.logger.Info("new merge blocks file written:", zap.String("filename", out.filename))
err := p.accountsResolver.StoreCursor(ctx, p.readerName, out.cursor)
if err != nil {
panic(fmt.Errorf("storing cursor at block %d: %w", out.cursor.slotNum, err))
}
out.stats.Log(p.logger)
out.bundleReader.Close()
}
}()

for mbf := range mergeBlocksFileChan {
p.stats = &stats{
stats := &Stats{
startProcessing: time.Now(),
}
p.logger.Info("Receive merge block file", zap.String("filename", mbf.filename))
bundleReader := NewBundleReader(ctx, p.logger)

nailer := dhammer.NewNailer(50, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) {
decoderNailer := dhammer.NewNailer(100, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) {
b, err := encoder.Encode(blk)
if err != nil {
return nil, fmt.Errorf("encoding block: %w", err)
}

return b, nil
})
nailer.Start(ctx)
decoderNailer.OnTerminating(func(err error) {
panic(fmt.Errorf("encoding block: %w", err))
})
decoderNailer.Start(ctx)

writerNailer.Push(ctx, &bundleJob{
mbf.filename,
cursor,
stats,
bundleReader,
})

mbf := mbf
go func() {
Expand All @@ -227,7 +259,7 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor,
return
case blk, ok := <-mbf.blockChan:
if !ok {
nailer.Close()
decoderNailer.Close()
return
}

Expand All @@ -242,91 +274,75 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, cursor *Cursor,
}

start := time.Now()
err := p.ProcessBlock(context.Background(), blk)
err := p.ProcessBlock(context.Background(), stats, blk)
if err != nil {
bundleReader.PushError(fmt.Errorf("processing block: %w", err))
return
}

p.stats.totalBlockProcessingDuration += time.Since(start)
stats.totalBlockProcessingDuration += time.Since(start)

cursor.slotNum = blk.Slot
decoderNailer.Push(ctx, blk)

nailer.Push(ctx, blk)
p.stats.totalBlockHandlingDuration += time.Since(start)
stats.totalBlockHandlingDuration += time.Since(start)
}
}
}()
lastPushTime := time.Now()
go func() {
for bb := range nailer.Out {
err := bundleReader.PushBlock(bb)
pushStart := time.Now()
if err != nil {
bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err))
return
}

p.stats.totalBlockPushDuration += time.Since(pushStart)
for bb := range decoderNailer.Out {
err := bundleReader.PushBlock(bb)
pushStart := time.Now()
if err != nil {
bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err))
return fmt.Errorf("pushing block to bundle reader: %w", err)
}
bundleReader.Close()
}()
err := destinationStore.WriteObject(ctx, mbf.filename, bundleReader)
if err != nil {
return fmt.Errorf("writing bundle file: %w", err)
}
p.stats.writeDurationAfterLastPush = time.Since(lastPushTime)
//p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start)))
err = p.accountsResolver.StoreCursor(ctx, p.readerName, cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", cursor.slotNum, err)
stats.totalBlockPushDuration += time.Since(pushStart)
}

p.stats.log(p.logger)
stats.lastBlockPushedAt = time.Now()
}

return nil
}

func (p *Processor) ProcessBlock(ctx context.Context, block *pbsol.Block) error {
p.stats.transactionCount += len(block.Transactions)
func (p *Processor) ProcessBlock(ctx context.Context, stats *Stats, block *pbsol.Block) error {
stats.transactionCount += len(block.Transactions)
for _, trx := range block.Transactions {
if trx.Meta.Err != nil {
continue
}
//p.logger.Debug("processing transaction", zap.Uint64("block_num", block.Slot), zap.String("trx_id", base58.Encode(trx.Transaction.Signatures[0])))
err := p.applyTableLookup(ctx, block.Slot, trx)
err := p.applyTableLookup(ctx, stats, block.Slot, trx)
if err != nil {
return fmt.Errorf("applying table lookup at block %d: %w", block.Slot, err)
}

err = p.manageAddressLookup(ctx, block.Slot, err, trx)
err = p.manageAddressLookup(ctx, stats, block.Slot, err, trx)
if err != nil {
return fmt.Errorf("managing address lookup at block %d: %w", block.Slot, err)
}
}
p.stats.totalBlockCount += 1
stats.totalBlockCount += 1

return nil
}

func (p *Processor) manageAddressLookup(ctx context.Context, blockNum uint64, err error, trx *pbsol.ConfirmedTransaction) error {
err = p.ProcessTransaction(ctx, blockNum, trx)
func (p *Processor) manageAddressLookup(ctx context.Context, stats *Stats, blockNum uint64, err error, trx *pbsol.ConfirmedTransaction) error {
err = p.ProcessTransaction(ctx, stats, blockNum, trx)
if err != nil {
return fmt.Errorf("processing transactions: %w", err)
}
return nil
}

func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx *pbsol.ConfirmedTransaction) error {
func (p *Processor) applyTableLookup(ctx context.Context, stats *Stats, blockNum uint64, trx *pbsol.ConfirmedTransaction) error {
start := time.Now()
for _, addressTableLookup := range trx.Transaction.Message.AddressTableLookups {
accs, cached, err := p.accountsResolver.Resolve(ctx, blockNum, addressTableLookup.AccountKey)
if err != nil {
return fmt.Errorf("resolving address table %s at block %d: %w", base58.Encode(addressTableLookup.AccountKey), blockNum, err)
}
if cached {
p.stats.cacheHit += 1
stats.cacheHit += 1
}
//p.logger.Debug("Resolve address table lookup", zap.String("trx", getTransactionHash(trx.Transaction.Signatures)), zap.String("account", base58.Encode(addressTableLookup.AccountKey)), zap.Int("count", len(accs)), zap.Bool("cached", cached))
trx.Transaction.Message.AccountKeys = append(trx.Transaction.Message.AccountKeys, accs.ToBytesArray()...)
Expand All @@ -335,8 +351,8 @@ func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx *
lookupCount := len(trx.Transaction.Message.AddressTableLookups)

if lookupCount > 0 {
p.stats.lookupCount += lookupCount
p.stats.totalLookupDuration += totalDuration
stats.lookupCount += lookupCount
stats.totalLookupDuration += totalDuration
p.logger.Debug(
"applyTableLookup",
zap.Duration("duration", totalDuration),
Expand All @@ -348,12 +364,12 @@ func (p *Processor) applyTableLookup(ctx context.Context, blockNum uint64, trx *
return nil
}

func (p *Processor) ProcessTransaction(ctx context.Context, blockNum uint64, confirmedTransaction *pbsol.ConfirmedTransaction) error {
func (p *Processor) ProcessTransaction(ctx context.Context, stats *Stats, blockNum uint64, confirmedTransaction *pbsol.ConfirmedTransaction) error {
start := time.Now()
accountKeys := confirmedTransaction.Transaction.Message.AccountKeys
for compileIndex, compiledInstruction := range confirmedTransaction.Transaction.Message.Instructions {
idx := compiledInstruction.ProgramIdIndex
err := p.ProcessInstruction(ctx, blockNum, confirmedTransaction.Transaction.Signatures[0], confirmedTransaction.Transaction.Message.AccountKeys[idx], accountKeys, compiledInstruction)
err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], confirmedTransaction.Transaction.Message.AccountKeys[idx], accountKeys, compiledInstruction)
if err != nil {
return fmt.Errorf("confirmedTransaction %s processing compiled instruction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err)
}
Expand All @@ -367,17 +383,17 @@ func (p *Processor) ProcessTransaction(ctx context.Context, blockNum uint64, con
return fmt.Errorf("missing account key at index %d for transaction %s with account keys count of %d", instruction.ProgramIdIndex, getTransactionHash(confirmedTransaction.Transaction.Signatures), len(accountKeys))
}

err := p.ProcessInstruction(ctx, blockNum, confirmedTransaction.Transaction.Signatures[0], accountKeys[instruction.ProgramIdIndex], accountKeys, instruction)
err := p.ProcessInstruction(ctx, stats, blockNum, confirmedTransaction.Transaction.Signatures[0], accountKeys[instruction.ProgramIdIndex], accountKeys, instruction)
if err != nil {
return fmt.Errorf("confirmedTransaction %s processing instruxction: %w", getTransactionHash(confirmedTransaction.Transaction.Signatures), err)
}
}
}
p.stats.totalTransactionProcessingDuration += time.Since(start)
stats.totalTransactionProcessingDuration += time.Since(start)
return nil
}

func (p *Processor) ProcessInstruction(ctx context.Context, blockNum uint64, trxHash []byte, programAccount Account, accountKeys [][]byte, instructionable pbsol.Instructionable) error {
func (p *Processor) ProcessInstruction(ctx context.Context, stats *Stats, blockNum uint64, trxHash []byte, programAccount Account, accountKeys [][]byte, instructionable pbsol.Instructionable) error {
if !bytes.Equal(programAccount, AddressTableLookupAccountProgram) {
return nil
}
Expand All @@ -395,8 +411,8 @@ func (p *Processor) ProcessInstruction(ctx context.Context, blockNum uint64, trx
return fmt.Errorf("extending address table %s at block %d: %w", tableLookupAccount, blockNum, err)
}

p.stats.totalExtendDuration += time.Since(start)
p.stats.extendCount += 1
stats.totalExtendDuration += time.Since(start)
stats.extendCount += 1
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions accountresolver/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Test_ExtendTableLookupInCompiledInstruction(t *testing.T) {

resolver := NewKVDBAccountsResolver(db, zap.NewNop())
p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
err = p.ProcessBlock(context.Background(), solBlock)
err = p.ProcessBlock(context.Background(), &Stats{}, solBlock)
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableLookupAccount)
Expand Down Expand Up @@ -145,7 +145,7 @@ func Test_ExtendTableLookup_In_InnerInstructions(t *testing.T) {

resolver := NewKVDBAccountsResolver(db, zap.NewNop())
p := NewProcessor("test", NewKVDBAccountsResolver(db, zap.NewNop()), zap.NewNop())
err = p.ProcessBlock(context.Background(), solBlock)
err = p.ProcessBlock(context.Background(), &Stats{}, solBlock)
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 157_564_921, tableLookupAccount)
Expand Down Expand Up @@ -228,7 +228,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_AddressLooku
accounts := NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys)
require.Equal(t, 2, len(accounts))

err = p.ProcessBlock(context.Background(), solBlock)
err = p.ProcessBlock(context.Background(), &Stats{}, solBlock)
require.NoError(t, err)

accounts = NewAccounts(solBlock.Transactions[0].Transaction.Message.AccountKeys)
Expand Down Expand Up @@ -317,7 +317,7 @@ func Test_ExtendTableLookup_By_AnotherAddressTableLookup_Containing_ExtendableTa
err = resolver.store.FlushPuts(context.Background())
require.NoError(t, err)

err = p.ProcessBlock(context.Background(), solBlock)
err = p.ProcessBlock(context.Background(), &Stats{}, solBlock)
require.NoError(t, err)

accounts, _, err := resolver.Resolve(context.Background(), 185_914_862, tableAccountToExtend)
Expand Down
6 changes: 3 additions & 3 deletions cmd/firesol/tools_bigtable_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func bigtableBlocksRunE(logger *zap.Logger, tracer logging.Tracer) firecore.Comm

return btClient.ReadBlocks(ctx, startBlockNum, stopBlockNum, linkable, func(block *pbsolv1.Block) error {
if firehoseEnabled {
processor.ResetStats()
err := processor.ProcessBlock(ctx, block)
stats := &accountsresolver.Stats{}
err := processor.ProcessBlock(ctx, stats, block)
if err != nil {
return fmt.Errorf("unable to process table lookup for block: %w", err)
}
processor.LogStats()
stats.Log(logger)

cnt, err := proto.Marshal(block)
if err != nil {
Expand Down

0 comments on commit fd3f70f

Please sign in to comment.