diff --git a/accountresolver/processor.go b/accountresolver/processor.go index 3dc80443..454abd22 100644 --- a/accountresolver/processor.go +++ b/accountresolver/processor.go @@ -118,12 +118,15 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.S err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error { mbf := newMergeBlocksFile(filename, p.logger) - err := mbf.process(ctx, sourceStore, p.cursor) - if err != nil { - return fmt.Errorf("processing merge block file %s: %w", mbf.filename, err) - } + go func() { + err := mbf.process(ctx, sourceStore, p.cursor) + if err != nil { + panic(fmt.Errorf("processing merge block file %s: %w", mbf.filename, err)) + } + + }() mergeBlocksFileChan <- mbf - return err + return nil }) if err != nil { @@ -182,6 +185,7 @@ func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, f.logger.Info("skip block", zap.Uint64("slot", blk.Slot)) continue } + fmt.Println("mergeBlocksFile : pushing block", blk.Slot) f.blockChan <- blk } } @@ -193,6 +197,7 @@ func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFile } for mbf := range mergeBlocksFileChan { + p.logger.Info("Receive merge block file", zap.String("filename", mbf.filename)) bundleReader := NewBundleReader(ctx, p.logger) nailer := dhammer.NewNailer(50, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) {