Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Jan 12, 2024
1 parent 441219f commit 52335f0
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 25 deletions.
4 changes: 2 additions & 2 deletions block/fetcher/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *BigtableBlockReader) Read(
btRange := bigtable.NewRange(fmt.Sprintf("%016x", startBlockNum), "")
err := table.ReadRows(ctx, btRange, func(row bigtable.Row) bool {

blk, zlogger, err := r.processRow(row)
blk, zlogger, err := r.ProcessRow(row)
if err != nil {
fatalError = fmt.Errorf("failed to read row: %w", err)
return false
Expand Down Expand Up @@ -192,7 +192,7 @@ func explodeRow(row bigtable.Row) (*big.Int, RowType, []byte) {
return blockNum, rowType, el.Value
}

func (r *BigtableBlockReader) processRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) {
func (r *BigtableBlockReader) ProcessRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) {
blockNum, rowType, rowCnt := explodeRow(row)
zlogger := r.logger.With(
zap.Uint64("block_num", blockNum.Uint64()),
Expand Down
97 changes: 76 additions & 21 deletions block/fetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fetcher
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math"
"time"
Expand Down Expand Up @@ -44,43 +45,46 @@ func NewRPC(rpcClient *rpc.Client, fetchInterval time.Duration, latestBlockRetry
}
}

func (f *RPCFetcher) Fetch(ctx context.Context, blockNum uint64) (out *pbbstream.Block, err error) {
f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum))
func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbstream.Block, err error) {
f.logger.Info("fetching block", zap.Uint64("block_num", requestedSlot))

for f.latestConfirmedSlot < blockNum {
for f.latestConfirmedSlot < requestedSlot {
f.latestConfirmedSlot, err = f.rpcClient.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return nil, fmt.Errorf("fetching latestConfirmedSlot block num: %w", err)
}

f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("block_num", blockNum))
f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("block_num", requestedSlot))
//
if f.latestConfirmedSlot < blockNum {
if f.latestConfirmedSlot < requestedSlot {
time.Sleep(f.latestBlockRetryInterval)
continue
}
break
}
for f.latestFinalizedSlot < blockNum {
for f.latestFinalizedSlot < requestedSlot {
f.latestFinalizedSlot, err = f.rpcClient.GetSlot(ctx, rpc.CommitmentFinalized)
if err != nil {
return nil, fmt.Errorf("fetching latest finalized Slot block num: %w", err)
}

f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("block_num", blockNum))
f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("block_num", requestedSlot))
//
if f.latestFinalizedSlot < blockNum {
if f.latestFinalizedSlot < requestedSlot {
time.Sleep(f.latestBlockRetryInterval)
continue
}
break
}

blockResult, err := f.rpcClient.GetBlockWithOpts(ctx, blockNum, GetBlockOpts)
blockResult, err := f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts)
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockNum, err)
return nil, fmt.Errorf("fetching block %d: %w", requestedSlot, err)
}
block, err := blockFromBlockResult(requestedSlot, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult)
if err != nil {
return nil, fmt.Errorf("decoding block %d: %w", requestedSlot, err)
}
block, _ := blockFromBlockResult(blockNum, f.latestConfirmedSlot, f.latestFinalizedSlot, blockResult)
return block, nil
}

Expand Down Expand Up @@ -146,7 +150,7 @@ func blockFromBlockResult(requestedSlot uint64, confirmedSlot uint64, finalizedS
}

pbBlock := &pbbstream.Block{
Number: *result.BlockHeight,
Number: requestedSlot,
Id: result.Blockhash.String(),
ParentId: result.PreviousBlockhash.String(),
Timestamp: timestamppb.New(result.BlockTime.Time()),
Expand All @@ -165,8 +169,14 @@ func toPbTransactions(transactions []rpc.TransactionWithMeta) (out []*pbsol.Conf
if err != nil {
return nil, fmt.Errorf(`decoding transaction meta: %w`, err)
}
solanaTrx := &solana.Transaction{}
transaction.Transaction.GetRawJSON()
err = json.Unmarshal(transaction.Transaction.GetRawJSON(), solanaTrx)
if err != nil {
return nil, fmt.Errorf(`decoding transaction: %w`, err)
}
out = append(out, &pbsol.ConfirmedTransaction{
Transaction: toPbTransaction(transaction.MustGetTransaction()),
Transaction: toPbTransaction(solanaTrx),
Meta: meta,
})
}
Expand All @@ -178,27 +188,31 @@ func toPbTransactionMeta(meta *rpc.TransactionMeta) (*pbsol.TransactionStatusMet
if err != nil {
return nil, fmt.Errorf("decoding return data: %w", err)
}
trxErr, err := toPbTransactionError(meta.Err)
return &pbsol.TransactionStatusMeta{
Err: toPbTransactionError(meta.Err),
Err: trxErr,
Fee: meta.Fee,
PreBalances: meta.PreBalances,
PostBalances: meta.PostBalances,
InnerInstructions: toPbInnerInstructions(meta.InnerInstructions),
InnerInstructionsNone: false,
InnerInstructionsNone: false, //todo: should we remove?
LogMessages: meta.LogMessages,
LogMessagesNone: false,
LogMessagesNone: false, //todo: should we remove?
PreTokenBalances: toPbTokenBalances(meta.PreTokenBalances),
PostTokenBalances: toPbTokenBalances(meta.PostTokenBalances),
Rewards: toPBReward(meta.Rewards),
LoadedWritableAddresses: toPbWritableAddresses(meta.LoadedAddresses.Writable),
LoadedReadonlyAddresses: toPbReadonlyAddresses(meta.LoadedAddresses.ReadOnly),
ReturnData: returnData,
ReturnDataNone: false,
ReturnDataNone: false, //todo: should we remove?
ComputeUnitsConsumed: meta.ComputeUnitsConsumed,
}, nil
}

func toPbReturnData(data rpc.ReturnData) (*pbsol.ReturnData, error) {
if len(data.Data) == 0 {
return nil, nil
}
d, err := base64.StdEncoding.DecodeString(data.Data[0])
if err != nil {
return nil, fmt.Errorf("decoding return data: %w", err)
Expand Down Expand Up @@ -245,8 +259,15 @@ func toPbTokenBalances(balances []rpc.TokenBalance) []*pbsol.TokenBalance {
}

func toPbUiTokenAmount(amount *rpc.UiTokenAmount) *pbsol.UiTokenAmount {
if amount == nil {
return nil
}
uiAmount := float64(0)
if amount.UiAmount != nil {
uiAmount = *amount.UiAmount
}
return &pbsol.UiTokenAmount{
UiAmount: *amount.UiAmount,
UiAmount: uiAmount,
Decimals: uint32(amount.Decimals),
Amount: amount.Amount,
UiAmountString: amount.UiAmountString,
Expand Down Expand Up @@ -285,11 +306,45 @@ func compileInstructionsToPbInnerInstructionArray(instructions []solana.Compiled
return
}

func toPbTransactionError(err interface{}) *pbsol.TransactionError {
type TransactionError struct {
Type string `json:"err"`
Details string
}

func toPbTransactionError(err interface{}) (*pbsol.TransactionError, error) {
if err == nil {
return nil
return nil, nil
}
panic("not implemented") //todo : implement

if mapErr, ok := err.(map[string]interface{}); ok {
for key, value := range mapErr {
detail, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("decoding transaction error: %w", err)
}
trxErr := &TransactionError{
Type: key,
Details: string(detail),
}
fmt.Println(trxErr)
return nil, nil
}
}

//8 0 0 0 3 25 0 0 0 113 23 0 0

// "InstructionError": [
// 3,
// {
// "Custom": 6001
// }
// ]

//8 0 0 0 -> TransactionError.InstructionError
//3 -> instruction index
//25 0 0 0 -> InstructionError.Custom
//113 23 0 0 -> u32 error code
panic("not implemented") //todo : implement when test with a failed transaction
}

func toPbTransaction(transaction *solana.Transaction) *pbsol.Transaction {
Expand Down
30 changes: 30 additions & 0 deletions block/fetcher/rpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package fetcher

import (
"encoding/json"
"os"
"testing"

"github.com/gagliardetto/solana-go/rpc"
"github.com/test-go/testify/require"
)

func Test_ToPBTransaction(t *testing.T) {

b, err := os.ReadFile("/Users/cbillett/devel/sf/firehose-solana/block/fetcher/testdata/result_block_241179689.json")
require.NoError(t, err)

getBlockResult := &rpc.GetBlockResult{}
err = json.Unmarshal(b, getBlockResult)
require.NoError(t, err)

_, err = toPbTransactions(getBlockResult.Transactions)
require.NoError(t, err)

//trxHash, err := base58.Decode("66gBszm6ybWVVykE4Svm2CvmiSmFbQi2J3Ua2FxHrYL9B1EPsTCGgjfWNVoJHSqd86iKmS8niywSZqDmqkk7uZLM")
//require.NoError(t, err)
//for _, tx := range confirmTransactions {
// if bytes.Equal(tx.Transaction.Signatures[0], trxHash) {
// }
//}
}
51 changes: 51 additions & 0 deletions cmd/bt/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"fmt"

"github.com/mr-tron/base58"

"cloud.google.com/go/bigtable"
googleBigtable "cloud.google.com/go/bigtable"
"github.com/streamingfast/firehose-solana/block/fetcher"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func main() {
ctx := context.Background()
client, err := googleBigtable.NewClient(ctx, "mainnet-beta", "solana-ledger")
if err != nil {
panic(err)
}

var logger, tracer = logging.PackageLogger("foo", "main")
logging.InstantiateLoggers(logging.WithDefaultLevel(zap.DebugLevel))

blockReader := fetcher.NewBigtableReader(client, 10, logger, tracer)

table := client.Open("blocks")
btRange := bigtable.NewRange(fmt.Sprintf("%016x", 241179689), "")

err = table.ReadRows(ctx, btRange, func(row bigtable.Row) bool {
block, _, err := blockReader.ProcessRow(row)
if err != nil {
panic(err)
}

for _, transaction := range block.Transactions {
if transaction.Meta.Err != nil {
err := transaction.Meta.Err
sign := base58.Encode(transaction.Transaction.Signatures[0])
fmt.Println("err: ", sign, err.Err)
}
}

return false
})

if err != nil {
panic(err)
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/streamingfast/firehose-core v1.0.1-0.20240109054458-3f1edeff522c
github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091
github.com/streamingfast/solana-go v0.5.1-0.20230622180848-8faf68a7cb1d
github.com/test-go/testify v1.1.4
go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0
)
Expand Down Expand Up @@ -159,6 +160,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.15.0 // indirect
github.com/streamingfast/binary v0.0.0-20210928223119-44fc44e4a0b5 // indirect
github.com/streamingfast/dauth v0.0.0-20231120142446-843f4e045cc2 // indirect
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c // indirect
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 // indirect
Expand Down
9 changes: 7 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,6 @@ github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbS
github.com/gagliardetto/binary v0.7.7 h1:QZpT38+sgoPg+TIQjH94sLbl/vX+nlIRA37pEyOsjfY=
github.com/gagliardetto/binary v0.7.7/go.mod h1:mUuay5LL8wFVnIlecHakSZMvcdqfs+CsotR5n77kyjM=
github.com/gagliardetto/gofuzz v1.2.2/go.mod h1:bkH/3hYLZrMLbfYWA0pWzXmi5TTRZnu4pMGZBkqMKvY=
github.com/gagliardetto/solana-go v1.8.4 h1:vmD/JmTlonyXGy39bAo0inMhmbdAwV7rXZtLDMZeodE=
github.com/gagliardetto/solana-go v1.8.4/go.mod h1:i+7aAyNDTHG0jK8GZIBSI4OVvDqkt2Qx+LklYclRNG8=
github.com/gagliardetto/treeout v0.1.4 h1:ozeYerrLCmCubo1TcIjFiOWTTGteOOHND1twdFpgwaw=
github.com/gagliardetto/treeout v0.1.4/go.mod h1:loUefvXTrlRG5rYmJmExNryyBRh8f89VZhmMOyCyqok=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
Expand Down Expand Up @@ -438,7 +436,9 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk=
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
Expand Down Expand Up @@ -494,6 +494,7 @@ github.com/josephburnett/jd v1.7.1 h1:oXBPMS+SNnILTMGj1fWLK9pexpeJUXtbVFfRku/PjB
github.com/josephburnett/jd v1.7.1/go.mod h1:R8ZnZnLt2D4rhW4NvBc/USTo6mzyNT6fYNIIWOJA9GY=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down Expand Up @@ -737,6 +738,7 @@ github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streamingfast/binary v0.0.0-20210928223119-44fc44e4a0b5 h1:xCVaIP9q+nqRxHrb1wCLs3AABUcCgLGK0IakN4CMQbk=
github.com/streamingfast/binary v0.0.0-20210928223119-44fc44e4a0b5/go.mod h1:LEQhe6qUvAQSYBJu7MZvDU1kx4JrZzxru3Ga1rRCCuo=
github.com/streamingfast/bstream v0.0.2-0.20231211192436-01f6a005b0e4 h1:WFXOC6fg2k9CuxUhbgtH7IYOZvu0CEPXuGRitS74YwQ=
github.com/streamingfast/bstream v0.0.2-0.20231211192436-01f6a005b0e4/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
Expand All @@ -762,6 +764,7 @@ github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839 h1:K6mJPvh1
github.com/streamingfast/dtracing v0.0.0-20220305214756-b5c0e8699839/go.mod h1:huOJyjMYS6K8upTuxDxaNd+emD65RrXoVBvh8f1/7Ns=
github.com/streamingfast/firehose-core v1.0.1-0.20240109054458-3f1edeff522c h1:PpAiQiKbJq91jrW739fAPCwyOMjO23HHvf+0HqeJq2Y=
github.com/streamingfast/firehose-core v1.0.1-0.20240109054458-3f1edeff522c/go.mod h1:Y/Sza/3iOeabRhVC/hPoRx21jZDcyQFBYRNR8wgIk7I=
github.com/streamingfast/gagliardetto-solana-go v0.0.0-20240111145418-2be68b59fe4c h1:GWlMGvspp2Mb5iCZBV31FeOHwliPI/LoIowvlx253qA=
github.com/streamingfast/gagliardetto-solana-go v0.0.0-20240111145418-2be68b59fe4c/go.mod h1:i+7aAyNDTHG0jK8GZIBSI4OVvDqkt2Qx+LklYclRNG8=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0 h1:g8eEYbFSykyzIyuxNMmHEUGGUvJE0ivmqZagLDK42gw=
github.com/streamingfast/jsonpb v0.0.0-20210811021341-3670f0aa02d0/go.mod h1:cTNObq2Uofb330y05JbbZZ6RwE6QUXw5iVcHk1Fx3fk=
Expand All @@ -783,6 +786,7 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt
github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0 h1:Y15G1Z4fpEdm2b+/70owI7TLuXadlqBtGM7rk4Hxrzk=
github.com/streamingfast/snapshotter v0.0.0-20230316190750-5bcadfde44d0/go.mod h1:/Rnz2TJvaShjUct0scZ9kKV2Jr9/+KBAoWy4UMYxgv4=
github.com/streamingfast/solana-go v0.5.1-0.20230622180848-8faf68a7cb1d h1:6wV8O4hUxYZ6IiJ/odsDBzeU8y/zAWMttUm0reG+r3Y=
github.com/streamingfast/solana-go v0.5.1-0.20230622180848-8faf68a7cb1d/go.mod h1:9NfZWSK0zqA+M1YU2pTI8sr1BfijCpqBFceLQARQiNw=
github.com/streamingfast/substreams v1.2.1-0.20231221200849-a355c5063d0c h1:tpwdanGNJjYCBRbqxGzoXRrz0VWa+Mjeb0OJt/aMOx8=
github.com/streamingfast/substreams v1.2.1-0.20231221200849-a355c5063d0c/go.mod h1:fCC3pGTYMi0N4VhJjdJPQydefJpY+tsY9BzWxDi152k=
Expand Down Expand Up @@ -835,6 +839,7 @@ github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT
github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g=
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/ybbus/jsonrpc v2.1.2+incompatible h1:V4mkE9qhbDQ92/MLMIhlhMSbz8jNXdagC3xBR5NDwaQ=
github.com/ybbus/jsonrpc v2.1.2+incompatible/go.mod h1:XJrh1eMSzdIYFbM08flv0wp5G35eRniyeGut1z+LSiE=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yourbasic/graph v0.0.0-20210606180040-8ecfec1c2869 h1:7v7L5lsfw4w8iqBBXETukHo4IPltmD+mWoLRYUmeGN8=
Expand Down

0 comments on commit 52335f0

Please sign in to comment.