From c6d0a201489ce86e015ad19aec624b6d8a57c251 Mon Sep 17 00:00:00 2001 From: billettc Date: Fri, 1 Sep 2023 14:26:19 -0400 Subject: [PATCH 1/2] processAddressLookupE handle cursor # Conflicts: # cmd/firesol/tablelookup.go --- accountresolver/processor.go | 7 +++++++ cmd/firesol/tablelookup.go | 27 +++++++++++++++++++++------ 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/accountresolver/processor.go b/accountresolver/processor.go index 1bc308f0..ff2b3f50 100644 --- a/accountresolver/processor.go +++ b/accountresolver/processor.go @@ -51,13 +51,19 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.S startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100 paddedBlockNum := fmt.Sprintf("%010d", startBlockNum) + p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", p.cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum)) + err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error { + p.logger.Debug("processing merge block file", zap.String("filename", filename)) return p.processMergeBlocksFile(ctx, filename, sourceStore, destinationStore) }) if err != nil { return fmt.Errorf("walking merge block sourceStore: %w", err) } + + p.logger.Info("Done processing merge blocks") + return nil } @@ -67,6 +73,7 @@ func (p *Processor) processMergeBlocksFile(ctx context.Context, filename string, if err != nil { return fmt.Errorf("converting filename to block number: %w", err) } + reader, err := sourceStore.OpenObject(ctx, filename) if err != nil { return fmt.Errorf("opening merge block file %s: %w", filename, err) diff --git a/cmd/firesol/tablelookup.go b/cmd/firesol/tablelookup.go index a7c6a4d4..8b4d07cb 100644 --- a/cmd/firesol/tablelookup.go +++ b/cmd/firesol/tablelookup.go @@ -2,7 +2,9 @@ package main import ( "fmt" + "github.com/spf13/cobra" + "github.com/spf13/pflag" "github.com/streamingfast/dstore" accountsresolver "github.com/streamingfast/firehose-solana/accountresolver" kvstore "github.com/streamingfast/kvdb/store" @@ -23,6 +25,11 @@ func newProcessAddressLookupCmd(logger *zap.Logger, tracer logging.Tracer) *cobr func processAddressLookupE(logger *zap.Logger, tracer logging.Tracer) func(cmd *cobra.Command, args []string) error { return func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() + + cmd.Flags().VisitAll(func(flag *pflag.Flag) { + logger.Info("flag", zap.String("flag", flag.Name), zap.Reflect("value", flag.Value)) + }) + sourceStore, err := dstore.NewDBinStore(args[0]) if err != nil { return fmt.Errorf("unable to create sourceStore: %w", err) @@ -38,17 +45,25 @@ func processAddressLookupE(logger *zap.Logger, tracer logging.Tracer) func(cmd * return fmt.Errorf("unable to create sourceStore: %w", err) } - //todo: discover cursor from kv - cursor := accountsresolver.NewCursor(154655004, nil) - fmt.Println("Default Cursor", cursor) - processor := accountsresolver.NewProcessor("reproc", cursor, accountsresolver.NewKVDBAccountsResolver(db), logger) + resolver := accountsresolver.NewKVDBAccountsResolver(db) + cursor, err := resolver.GetCursor(ctx, "reproc") + if err != nil { + return fmt.Errorf("unable to get cursor: %w", err) + } + + if cursor == nil { + logger.Info("No cursor found, starting from beginning") + cursor = accountsresolver.NewCursor(154655004, nil) + } + + fmt.Println("Cursor", cursor) + processor := accountsresolver.NewProcessor("reproc", cursor, resolver, logger) - //todo: needs a destination sourceStore to write the merge blocks with the address lookup resolved err = processor.ProcessMergeBlocks(ctx, sourceStore, destinationStore) if err != nil { return fmt.Errorf("unable to process merge blocks: %w", err) } - + logger.Info("All done. Goodbye!") return nil } } From 21aa7e9650adf41d67e94c210149538eb41eb8cb Mon Sep 17 00:00:00 2001 From: billettc Date: Fri, 1 Sep 2023 14:23:12 -0400 Subject: [PATCH 2/2] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cc1336b..70b4b620 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ for instructions to keep up to date. * Migrated to firehose-core * change block reader-node block encoding from hex to base64 - +* Added support for address lookup in the `reader-bt` app and tools `tablelookup` ## v0.2.2-rc1