From 52335f0e4aad1adcfb50c8540cc02190dd0607ea Mon Sep 17 00:00:00 2001 From: billettc Date: Fri, 12 Jan 2024 11:27:01 -0500 Subject: [PATCH] wip --- block/fetcher/bigtable.go | 4 +- block/fetcher/rpc.go | 97 ++++++++++++++++++++++++++++++--------- block/fetcher/rpc_test.go | 30 ++++++++++++ cmd/bt/main.go | 51 ++++++++++++++++++++ go.mod | 2 + go.sum | 9 +++- 6 files changed, 168 insertions(+), 25 deletions(-) create mode 100644 block/fetcher/rpc_test.go create mode 100644 cmd/bt/main.go diff --git a/block/fetcher/bigtable.go b/block/fetcher/bigtable.go index 448b240c..8076486a 100644 --- a/block/fetcher/bigtable.go +++ b/block/fetcher/bigtable.go @@ -71,7 +71,7 @@ func (r *BigtableBlockReader) Read( btRange := bigtable.NewRange(fmt.Sprintf("%016x", startBlockNum), "") err := table.ReadRows(ctx, btRange, func(row bigtable.Row) bool { - blk, zlogger, err := r.processRow(row) + blk, zlogger, err := r.ProcessRow(row) if err != nil { fatalError = fmt.Errorf("failed to read row: %w", err) return false @@ -192,7 +192,7 @@ func explodeRow(row bigtable.Row) (*big.Int, RowType, []byte) { return blockNum, rowType, el.Value } -func (r *BigtableBlockReader) processRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) { +func (r *BigtableBlockReader) ProcessRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) { blockNum, rowType, rowCnt := explodeRow(row) zlogger := r.logger.With( zap.Uint64("block_num", blockNum.Uint64()), diff --git a/block/fetcher/rpc.go b/block/fetcher/rpc.go index f436041e..69b32bdc 100644 --- a/block/fetcher/rpc.go +++ b/block/fetcher/rpc.go @@ -3,6 +3,7 @@ package fetcher import ( "context" "encoding/base64" + "encoding/json" "fmt" "math" "time" @@ -44,43 +45,46 @@ func NewRPC(rpcClient *rpc.Client, fetchInterval time.Duration, latestBlockRetry } } -func (f *RPCFetcher) Fetch(ctx context.Context, blockNum uint64) (out *pbbstream.Block, err error) { - f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum)) +func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbstream.Block, err error) { + f.logger.Info("fetching block", zap.Uint64("block_num", requestedSlot)) - for f.latestConfirmedSlot < blockNum { + for f.latestConfirmedSlot < requestedSlot { f.latestConfirmedSlot, err = f.rpcClient.GetSlot(ctx, rpc.CommitmentConfirmed) if err != nil { return nil, fmt.Errorf("fetching latestConfirmedSlot block num: %w", err) } - f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("block_num", blockNum)) + f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("block_num", requestedSlot)) // - if f.latestConfirmedSlot < blockNum { + if f.latestConfirmedSlot < requestedSlot { time.Sleep(f.latestBlockRetryInterval) continue } break } - for f.latestFinalizedSlot < blockNum { + for f.latestFinalizedSlot < requestedSlot { f.latestFinalizedSlot, err = f.rpcClient.GetSlot(ctx, rpc.CommitmentFinalized) if err != nil { return nil, fmt.Errorf("fetching latest finalized Slot block num: %w", err) } - f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("block_num", blockNum)) + f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("block_num", requestedSlot)) // - if f.latestFinalizedSlot < blockNum { + if f.latestFinalizedSlot < requestedSlot { time.Sleep(f.latestBlockRetryInterval) continue } break } - blockResult, err := f.rpcClient.GetBlockWithOpts(ctx, blockNum, GetBlockOpts) + blockResult, err := f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts) if err != nil { - return nil, fmt.Errorf("fetching block %d: %w", blockNum, err) + return nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err) + } + block, err := blockFromBlockResult(requestedSlot, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult) + if err != nil { + return nil, fmt.Errorf("decoding block %d: %w", requestedSlot, err) } - block, _ := blockFromBlockResult(blockNum, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult) return block, nil } @@ -146,7 +150,7 @@ func blockFromBlockResult(requestedSlot uint64, confirmedSlot uint64, finalizedS } pbBlock := &pbbstream.Block{ - Number: *result.BlockHeight, + Number: requestedSlot, Id: result.Blockhash.String(), ParentId: result.PreviousBlockhash.String(), Timestamp: timestamppb.New(result.BlockTime.Time()), @@ -165,8 +169,14 @@ func toPbTransactions(transactions []rpc.TransactionWithMeta) (out []*pbsol.Conf if err != nil { return nil, fmt.Errorf(`decoding transaction meta: %w`, err) } + solanaTrx := &solana.Transaction{} + transaction.Transaction.GetRawJSON() + err = json.Unmarshal(transaction.Transaction.GetRawJSON(), solanaTrx) + if err != nil { + return nil, fmt.Errorf(`decoding transaction: %w`, err) + } out = append(out, &pbsol.ConfirmedTransaction{ - Transaction: toPbTransaction(transaction.MustGetTransaction()), + Transaction: toPbTransaction(solanaTrx), Meta: meta, }) } @@ -178,27 +188,31 @@ func toPbTransactionMeta(meta *rpc.TransactionMeta) (*pbsol.TransactionStatusMet if err != nil { return nil, fmt.Errorf("decoding return data: %w", err) } + trxErr, err := toPbTransactionError(meta.Err) return &pbsol.TransactionStatusMeta{ - Err: toPbTransactionError(meta.Err), + Err: trxErr, Fee: meta.Fee, PreBalances: meta.PreBalances, PostBalances: meta.PostBalances, InnerInstructions: toPbInnerInstructions(meta.InnerInstructions), - InnerInstructionsNone: false, + InnerInstructionsNone: false, //todo: should we remove? LogMessages: meta.LogMessages, - LogMessagesNone: false, + LogMessagesNone: false, //todo: should we remove? PreTokenBalances: toPbTokenBalances(meta.PreTokenBalances), PostTokenBalances: toPbTokenBalances(meta.PostTokenBalances), Rewards: toPBReward(meta.Rewards), LoadedWritableAddresses: toPbWritableAddresses(meta.LoadedAddresses.Writable), LoadedReadonlyAddresses: toPbReadonlyAddresses(meta.LoadedAddresses.ReadOnly), ReturnData: returnData, - ReturnDataNone: false, + ReturnDataNone: false, //todo: should we remove? ComputeUnitsConsumed: meta.ComputeUnitsConsumed, }, nil } func toPbReturnData(data rpc.ReturnData) (*pbsol.ReturnData, error) { + if len(data.Data) == 0 { + return nil, nil + } d, err := base64.StdEncoding.DecodeString(data.Data[0]) if err != nil { return nil, fmt.Errorf("decoding return data: %w", err) @@ -245,8 +259,15 @@ func toPbTokenBalances(balances []rpc.TokenBalance) []*pbsol.TokenBalance { } func toPbUiTokenAmount(amount *rpc.UiTokenAmount) *pbsol.UiTokenAmount { + if amount == nil { + return nil + } + uiAmount := float64(0) + if amount.UiAmount != nil { + uiAmount = *amount.UiAmount + } return &pbsol.UiTokenAmount{ - UiAmount: *amount.UiAmount, + UiAmount: uiAmount, Decimals: uint32(amount.Decimals), Amount: amount.Amount, UiAmountString: amount.UiAmountString, @@ -285,11 +306,45 @@ func compileInstructionsToPbInnerInstructionArray(instructions []solana.Compiled return } -func toPbTransactionError(err interface{}) *pbsol.TransactionError { +type TransactionError struct { + Type string `json:"err"` + Details string +} + +func toPbTransactionError(err interface{}) (*pbsol.TransactionError, error) { if err == nil { - return nil + return nil, nil } - panic("not implemented") //todo : implement + + if mapErr, ok := err.(map[string]interface{}); ok { + for key, value := range mapErr { + detail, err := json.Marshal(value) + if err != nil { + return nil, fmt.Errorf("decoding transaction error: %w", err) + } + trxErr := &TransactionError{ + Type: key, + Details: string(detail), + } + fmt.Println(trxErr) + return nil, nil + } + } + + //8 0 0 0 3 25 0 0 0 113 23 0 0 + + // "InstructionError": [ + // 3, + // { + // "Custom": 6001 + // } + // ] + + //8 0 0 0 -> TransactionError.InstructionError + //3 -> instruction index + //25 0 0 0 -> InstructionError.Custom + //113 23 0 0 -> u32 error code + panic("not implemented") //todo : implement when test with a failed transaction } func toPbTransaction(transaction *solana.Transaction) *pbsol.Transaction { diff --git a/block/fetcher/rpc_test.go b/block/fetcher/rpc_test.go new file mode 100644 index 00000000..53e034b5 --- /dev/null +++ b/block/fetcher/rpc_test.go @@ -0,0 +1,30 @@ +package fetcher + +import ( + "encoding/json" + "os" + "testing" + + "github.com/gagliardetto/solana-go/rpc" + "github.com/test-go/testify/require" +) + +func Test_ToPBTransaction(t *testing.T) { + + b, err := os.ReadFile("/Users/cbillett/devel/sf/firehose-solana/block/fetcher/testdata/result_block_241179689.json") + require.NoError(t, err) + + getBlockResult := &rpc.GetBlockResult{} + err = json.Unmarshal(b, getBlockResult) + require.NoError(t, err) + + _, err = toPbTransactions(getBlockResult.Transactions) + require.NoError(t, err) + + //trxHash, err := base58.Decode("66gBszm6ybWVVykE4Svm2CvmiSmFbQi2J3Ua2FxHrYL9B1EPsTCGgjfWNVoJHSqd86iKmS8niywSZqDmqkk7uZLM") + //require.NoError(t, err) + //for _, tx := range confirmTransactions { + // if bytes.Equal(tx.Transaction.Signatures[0], trxHash) { + // } + //} +} diff --git a/cmd/bt/main.go b/cmd/bt/main.go new file mode 100644 index 00000000..064165d8 --- /dev/null +++ b/cmd/bt/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "fmt" + + "github.com/mr-tron/base58" + + "cloud.google.com/go/bigtable" + googleBigtable "cloud.google.com/go/bigtable" + "github.com/streamingfast/firehose-solana/block/fetcher" + "github.com/streamingfast/logging" + "go.uber.org/zap" +) + +func main() { + ctx := context.Background() + client, err := googleBigtable.NewClient(ctx, "mainnet-beta", "solana-ledger") + if err != nil { + panic(err) + } + + var logger, tracer = logging.PackageLogger("foo", "main") + logging.InstantiateLoggers(logging.WithDefaultLevel(zap.DebugLevel)) + + blockReader := fetcher.NewBigtableReader(client, 10, logger, tracer) + + table := client.Open("blocks") + btRange := bigtable.NewRange(fmt.Sprintf("%016x", 241179689), "") + + err = table.ReadRows(ctx, btRange, func(row bigtable.Row) bool { + block, _, err := blockReader.ProcessRow(row) + if err != nil { + panic(err) + } + + for _, transaction := range block.Transactions { + if transaction.Meta.Err != nil { + err := transaction.Meta.Err + sign := base58.Encode(transaction.Transaction.Signatures[0]) + fmt.Println("err: ", sign, err.Err) + } + } + + return false + }) + + if err != nil { + panic(err) + } +} diff --git a/go.mod b/go.mod index e64a0c48..3995ea2f 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/streamingfast/firehose-core v1.0.1-0.20240109054458-3f1edeff522c github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 github.com/streamingfast/solana-go v0.5.1-0.20230622180848-8faf68a7cb1d + github.com/test-go/testify v1.1.4 go.uber.org/zap v1.26.0 google.golang.org/protobuf v1.31.0 ) @@ -159,6 +160,7 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/spf13/viper v1.15.0 // indirect + github.com/streamingfast/binary v0.0.0-20210928223119-44fc44e4a0b5 // indirect github.com/streamingfast/dauth v0.0.0-20231120142446-843f4e045cc2 // indirect github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 // indirect diff --git a/go.sum b/go.sum index 9c0b2bc6..64db45eb 100644 --- a/go.sum +++ b/go.sum @@ -283,8 +283,6 @@ github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbS github.com/gagliardetto/binary v0.7.7 h1:QZpT38+sgoPg+TIQjH94sLbl/vX+nlIRA37pEyOsjfY= github.com/gagliardetto/binary v0.7.7/go.mod h1:mUuay5LL8wFVnIlecHakSZMvcdqfs+CsotR5n77kyjM= github.com/gagliardetto/gofuzz v1.2.2/go.mod h1:bkH/3hYLZrMLbfYWA0pWzXmi5TTRZnu4pMGZBkqMKvY= -github.com/gagliardetto/solana-go v1.8.4 h1:vmD/JmTlonyXGy39bAo0inMhmbdAwV7rXZtLDMZeodE= -github.com/gagliardetto/solana-go v1.8.4/go.mod h1:i+7aAyNDTHG0jK8GZIBSI4OVvDqkt2Qx+LklYclRNG8= github.com/gagliardetto/treeout v0.1.4 h1:ozeYerrLCmCubo1TcIjFiOWTTGteOOHND1twdFpgwaw= github.com/gagliardetto/treeout v0.1.4/go.mod h1:loUefvXTrlRG5rYmJmExNryyBRh8f89VZhmMOyCyqok= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -438,7 +436,9 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51 github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk= github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw= @@ -494,6 +494,7 @@ github.com/josephburnett/jd v1.7.1 h1:oXBPMS+SNnILTMGj1fWLK9pexpeJUXtbVFfRku/PjB github.com/josephburnett/jd v1.7.1/go.mod h1:R8ZnZnLt2D4rhW4NvBc/USTo6mzyNT6fYNIIWOJA9GY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -737,6 +738,7 @@ github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= +github.com/streamingfast/binary v0.0.0-20210928223119-44fc44e4a0b5 h1:xCVaIP9q+nqRxHrb1wCLs3AABUcCgLGK0IakN4CMQbk= github.com/streamingfast/binary v0.0.0-20210928223119-44fc44e4a0b5/go.mod h1:LEQhe6qUvAQSYBJu7MZvDU1kx4JrZzxru3Ga1rRCCuo= github.com/streamingfast/bstream v0.0.2-0.20231211192436-01f6a005b0e4 h1:WFXOC6fg2k9CuxUhbgtH7IYOZvu0CEPXuGRitS74YwQ= github.com/streamingfast/bstream v0.0.2-0.20231211192436-01f6a005b0e4/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y= @@ -762,6 +764,7 @@ github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1 github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns= github.com/streamingfast/firehose-core v1.0.1-0.20240109054458-3f1edeff522c h1:PpAiQiKbJq91jrW739fAPCwyOMjO23HHvf+0HqeJq2Y= github.com/streamingfast/firehose-core v1.0.1-0.20240109054458-3f1edeff522c/go.mod h1:Y/Sza/3iOeabRhVC/hPoRx21jZDcyQFBYRNR8wgIk7I= +github.com/streamingfast/gagliardetto-solana-go v0.0.0-20240111145418-2be68b59fe4c h1:GWlMGvspp2Mb5iCZBV31FeOHwliPI/LoIowvlx253qA= github.com/streamingfast/gagliardetto-solana-go v0.0.0-20240111145418-2be68b59fe4c/go.mod h1:i+7aAyNDTHG0jK8GZIBSI4OVvDqkt2Qx+LklYclRNG8= github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw= github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0/go.mod h1:cTNObq2Uofb330y05JbbZZ6RwE6QUXw5iVcHk1Fx3fk= @@ -783,6 +786,7 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk= github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4= +github.com/streamingfast/solana-go v0.5.1-0.20230622180848-8faf68a7cb1d h1:6wV8O4hUxYZ6IiJ/odsDBzeU8y/zAWMttUm0reG+r3Y= github.com/streamingfast/solana-go v0.5.1-0.20230622180848-8faf68a7cb1d/go.mod h1:9NfZWSK0zqA+M1YU2pTI8sr1BfijCpqBFceLQARQiNw= github.com/streamingfast/substreams v1.2.1-0.20231221200849-a355c5063d0c h1:tpwdanGNJjYCBRbqxGzoXRrz0VWa+Mjeb0OJt/aMOx8= github.com/streamingfast/substreams v1.2.1-0.20231221200849-a355c5063d0c/go.mod h1:fCC3pGTYMi0N4VhJjdJPQydefJpY+tsY9BzWxDi152k= @@ -835,6 +839,7 @@ github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/ybbus/jsonrpc v2.1.2+incompatible h1:V4mkE9qhbDQ92/MLMIhlhMSbz8jNXdagC3xBR5NDwaQ= github.com/ybbus/jsonrpc v2.1.2+incompatible/go.mod h1:XJrh1eMSzdIYFbM08flv0wp5G35eRniyeGut1z+LSiE= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 h1:7v7L5lsfw4w8iqBBXETukHo4IPltmD+mWoLRYUmeGN8=