From a8a1bc23726e874a0952372150ebb7f17da188d3 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Tue, 19 Dec 2023 21:11:35 +0000 Subject: [PATCH 01/11] internal/ethapi, eth, params, cmd/geth, cmd/utils: public blockchain api performance enhancement - added connection check during GetBlockByXXX and GetReceiptsByHash call to cancel requests for timed-out connections added context argument to ethapi.RPCMarshalBlock for this - put a limit on the concurrent requests for GetBlockByXXX and GetReceiptsByHash: params.MaxPublicRequests - added a cache for marshaled block and receipts for GetBlockByXXX and getReceiptsByHash: params.PublicRequestsCacheLocation --- cmd/geth/main.go | 2 + cmd/geth/usage.go | 2 + cmd/utils/flags.go | 16 ++++++ eth/api.go | 2 +- internal/ethapi/api.go | 91 +++++++++++++++++++++++++++++++++- internal/ethapi/api_cache.go | 94 ++++++++++++++++++++++++++++++++++++ params/protocol_params.go | 3 ++ 7 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 internal/ethapi/api_cache.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 9d61c4b359c7..86003be2f410 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -225,6 +225,8 @@ var ( utils.BlockMinBuildTime, utils.BlockMinBuildTxs, utils.BlockTrailTime, + utils.PublicRequestsCacheLocation, + utils.MaxPublicRequests, } ) diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index c1d2abc09be6..3da01589cba9 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -235,6 +235,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.BlockMinBuildTime, utils.BlockMinBuildTxs, utils.BlockTrailTime, + utils.PublicRequestsCacheLocation, + utils.MaxPublicRequests, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c1ec692a8efc..4e1ccf4dbcbd 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -916,6 +916,16 @@ var ( Usage: "Time to leave for block data transfer in ms", Value: params.BlockTrailTime, } + PublicRequestsCacheLocation = cli.StringFlag{ + Name: "wemix.publicrequests.cache", + Usage: "Public requests cache location", + Value: params.PublicRequestsCacheLocation, + } + MaxPublicRequests = cli.Int64Flag{ + Name: "wemix.publicrequests.max", + Usage: "Max # of concurrent public requests", + Value: params.MaxPublicRequests, + } ) var ( @@ -1982,6 +1992,12 @@ func SetWemixConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(BlockTrailTime.Name) { params.BlockTrailTime = ctx.GlobalInt64(BlockTrailTime.Name) } + if ctx.GlobalIsSet(PublicRequestsCacheLocation.Name) { + params.PublicRequestsCacheLocation = ctx.GlobalString(PublicRequestsCacheLocation.Name) + } + if ctx.GlobalIsSet(MaxPublicRequests.Name) { + params.MaxPublicRequests = ctx.GlobalInt64(MaxPublicRequests.Name) + } if params.ConsensusMethod == params.ConsensusInvalid { params.ConsensusMethod = params.ConsensusPoW diff --git a/eth/api.go b/eth/api.go index 7ec802b15852..4ebd7231e162 100644 --- a/eth/api.go +++ b/eth/api.go @@ -412,7 +412,7 @@ func (api *PrivateDebugAPI) GetBadBlocks(ctx context.Context) ([]*BadBlockArgs, } else { blockRlp = fmt.Sprintf("0x%x", rlpBytes) } - if blockJSON, err = ethapi.RPCMarshalBlock(block, true, true, api.eth.APIBackend.ChainConfig()); err != nil { + if blockJSON, err = ethapi.RPCMarshalBlock(ctx, block, true, true, api.eth.APIBackend.ChainConfig()); err != nil { blockJSON = map[string]interface{}{"error": err.Error()} } results = append(results, &BadBlockArgs{ diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index c351f4fdfb46..3cb3f87a5428 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -20,8 +20,10 @@ import ( "context" "errors" "fmt" + "io" "math/big" "os" + "runtime" "strings" "time" @@ -42,6 +44,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/vrf" "github.com/ethereum/go-ethereum/eth/tracers/logger" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/params" @@ -50,6 +53,28 @@ import ( "github.com/tyler-smith/go-bip39" ) +var apiRequestsThrottle chan struct{} +var apiRequestsCache ethdb.Database + +func apiRequestsEnter() { + if len(apiRequestsThrottle) >= int(params.MaxPublicRequests) { + pc, _, _, _ := runtime.Caller(1) + var name string + parts := strings.Split(runtime.FuncForPC(pc).Name(), ".") + if len(parts) > 0 { + name = parts[len(parts)-1] + } else { + name = runtime.FuncForPC(pc).Name() + } + log.Warn("Too many API requests", "func", name, "count", len(apiRequestsThrottle)) + } + apiRequestsThrottle <- struct{}{} +} + +func apiRequestsLeave() { + <-apiRequestsThrottle +} + // PublicEthereumAPI provides an API to access Ethereum related information. // It offers only methods that operate on public data that is freely available to anyone. type PublicEthereumAPI struct { @@ -681,6 +706,15 @@ type PublicBlockChainAPI struct { // NewPublicBlockChainAPI creates a new Ethereum blockchain API. func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI { + if len(params.PublicRequestsCacheLocation) > 0 { + var err error + apiRequestsCache, err = apiCacheOpen(params.PublicRequestsCacheLocation) + if err != nil { + panic(err) + } + } + apiRequestsThrottle = make(chan struct{}, params.MaxPublicRequests) + return &PublicBlockChainAPI{b} } @@ -701,6 +735,8 @@ func (s *PublicBlockChainAPI) BlockNumber() hexutil.Uint64 { // GetBlockReceipts returns all the transaction receipts for the given block hash. func (s *PublicBlockChainAPI) GetReceiptsByHash(ctx context.Context, blockHash common.Hash) ([]map[string]interface{}, error) { + apiRequestsEnter() + defer apiRequestsLeave() block, err1 := s.b.BlockByHash(ctx, blockHash) if block == nil && err1 == nil { @@ -709,6 +745,12 @@ func (s *PublicBlockChainAPI) GetReceiptsByHash(ctx context.Context, blockHash c return nil, err1 } + select { + case <-ctx.Done(): + return nil, io.EOF + default: + } + receipts, err2 := s.b.GetReceipts(ctx, blockHash) if receipts == nil && err2 == nil { return make([]map[string]interface{}, 0), nil @@ -723,6 +765,11 @@ func (s *PublicBlockChainAPI) GetReceiptsByHash(ctx context.Context, blockHash c fieldsList := make([]map[string]interface{}, 0, len(receipts)) for index, receipt := range receipts { + select { + case <-ctx.Done(): + return nil, io.EOF + default: + } bigblock := new(big.Int).SetUint64(block.NumberU64()) signer := types.MakeSigner(s.b.ChainConfig(), bigblock) @@ -884,6 +931,9 @@ func (s *PublicBlockChainAPI) GetHeaderByHash(ctx context.Context, hash common.H // - When fullTx is true all transactions in the block are returned, otherwise // only the transaction hash is returned. func (s *PublicBlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) { + apiRequestsEnter() + defer apiRequestsLeave() + block, err := s.b.BlockByNumber(ctx, number) if block != nil && err == nil { response, err := s.rpcMarshalBlock(ctx, block, true, fullTx) @@ -901,6 +951,9 @@ func (s *PublicBlockChainAPI) GetBlockByNumber(ctx context.Context, number rpc.B // GetBlockByHash returns the requested block. When fullTx is true all transactions in the block are returned in full // detail, otherwise only the transaction hash is returned. func (s *PublicBlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) { + apiRequestsEnter() + defer apiRequestsLeave() + block, err := s.b.BlockByHash(ctx, hash) if block != nil { return s.rpcMarshalBlock(ctx, block, true, fullTx) @@ -910,6 +963,9 @@ func (s *PublicBlockChainAPI) GetBlockByHash(ctx context.Context, hash common.Ha // GetUncleByBlockNumberAndIndex returns the uncle block for the given block hash and index. func (s *PublicBlockChainAPI) GetUncleByBlockNumberAndIndex(ctx context.Context, blockNr rpc.BlockNumber, index hexutil.Uint) (map[string]interface{}, error) { + apiRequestsEnter() + defer apiRequestsLeave() + block, err := s.b.BlockByNumber(ctx, blockNr) if block != nil { uncles := block.Uncles() @@ -925,6 +981,9 @@ func (s *PublicBlockChainAPI) GetUncleByBlockNumberAndIndex(ctx context.Context, // GetUncleByBlockHashAndIndex returns the uncle block for the given block hash and index. func (s *PublicBlockChainAPI) GetUncleByBlockHashAndIndex(ctx context.Context, blockHash common.Hash, index hexutil.Uint) (map[string]interface{}, error) { + apiRequestsEnter() + defer apiRequestsLeave() + block, err := s.b.BlockByHash(ctx, blockHash) if block != nil { uncles := block.Uncles() @@ -940,6 +999,9 @@ func (s *PublicBlockChainAPI) GetUncleByBlockHashAndIndex(ctx context.Context, b // GetUncleCountByBlockNumber returns number of uncles in the block for the given block number func (s *PublicBlockChainAPI) GetUncleCountByBlockNumber(ctx context.Context, blockNr rpc.BlockNumber) *hexutil.Uint { + apiRequestsEnter() + defer apiRequestsLeave() + if block, _ := s.b.BlockByNumber(ctx, blockNr); block != nil { n := hexutil.Uint(len(block.Uncles())) return &n @@ -949,6 +1011,9 @@ func (s *PublicBlockChainAPI) GetUncleCountByBlockNumber(ctx context.Context, bl // GetUncleCountByBlockHash returns number of uncles in the block for the given block hash func (s *PublicBlockChainAPI) GetUncleCountByBlockHash(ctx context.Context, blockHash common.Hash) *hexutil.Uint { + apiRequestsEnter() + defer apiRequestsLeave() + if block, _ := s.b.BlockByHash(ctx, blockHash); block != nil { n := hexutil.Uint(len(block.Uncles())) return &n @@ -1299,7 +1364,20 @@ func RPCMarshalHeader(head *types.Header) map[string]interface{} { // RPCMarshalBlock converts the given block to the RPC output which depends on fullTx. If inclTx is true transactions are // returned. When fullTx is true the returned block contains full transaction details, otherwise it will only contain // transaction hashes. -func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool, config *params.ChainConfig) (map[string]interface{}, error) { +func RPCMarshalBlock(ctx context.Context, block *types.Block, inclTx bool, fullTx bool, config *params.ChainConfig) (map[string]interface{}, error) { + select { + case <-ctx.Done(): + return nil, io.EOF + default: + } + + if fullTx && apiRequestsCache != nil { + if fields, err := apiCacheGetMarshaledBlock(apiRequestsCache, block.Hash().Bytes()); err == nil { + log.Debug("API Cache", "found", block.Number()) + return fields, nil + } + } + fields := RPCMarshalHeader(block.Header()) fields["size"] = hexutil.Uint64(block.Size()) @@ -1317,6 +1395,11 @@ func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool, config *param var err error for i, tx := range txs { if transactions[i], err = formatTx(tx); err != nil { + select { + case <-ctx.Done(): + return nil, io.EOF + default: + } return nil, err } } @@ -1329,6 +1412,10 @@ func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool, config *param } fields["uncles"] = uncleHashes + if fullTx && apiRequestsCache != nil { + apiCachePutMarshaledBlock(apiRequestsCache, block.Hash().Bytes(), fields) + } + return fields, nil } @@ -1343,7 +1430,7 @@ func (s *PublicBlockChainAPI) rpcMarshalHeader(ctx context.Context, header *type // rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field, which requires // a `PublicBlockchainAPI`. func (s *PublicBlockChainAPI) rpcMarshalBlock(ctx context.Context, b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) { - fields, err := RPCMarshalBlock(b, inclTx, fullTx, s.b.ChainConfig()) + fields, err := RPCMarshalBlock(ctx, b, inclTx, fullTx, s.b.ChainConfig()) if err != nil { return nil, err } diff --git a/internal/ethapi/api_cache.go b/internal/ethapi/api_cache.go new file mode 100644 index 000000000000..c97f72afe75a --- /dev/null +++ b/internal/ethapi/api_cache.go @@ -0,0 +1,94 @@ +// Copyright 2023 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package ethapi + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "io" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" +) + +var marshaledBlockPrefix = []byte(".block.") +var marshaledReceiptsPrefix = []byte(".receipts.") + +func apiCacheOpen(fn string) (ethdb.Database, error) { + return rawdb.NewDB(fn, 0, 0, "", false) +} + +func apiCacheClose(db ethdb.Database) { + db.Close() +} + +func apiCacheGetMarshaledBlock(db ethdb.Database, hash []byte) (map[string]interface{}, error) { + key := append(marshaledBlockPrefix, hash...) + data, err := db.Get(key) + if err != nil { + return nil, err + } + + gzReader, err := gzip.NewReader(bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + defer gzReader.Close() + + decompressedData, err := io.ReadAll(gzReader) + if err != nil { + return nil, err + } + + var fields map[string]interface{} + if err = json.Unmarshal(decompressedData, &fields); err != nil { + return nil, err + } + return fields, nil +} + +func apiCacheHasMarshaledBlock(db ethdb.Database, hash []byte) (bool, error) { + key := append(marshaledBlockPrefix, hash...) + return db.Has(key) +} + +func apiCachePutMarshaledBlock(db ethdb.Database, hash []byte, fields map[string]interface{}) error { + data, err := json.Marshal(fields) + if err != nil { + return err + } + + var buf bytes.Buffer + gzWriter := gzip.NewWriter(&buf) + if _, err = gzWriter.Write(data); err != nil { + return err + } + if err = gzWriter.Close(); err != nil { + return err + } + + key := append(marshaledBlockPrefix, hash...) + return db.Put(key, buf.Bytes()) +} + +func apiCacheDeleteMarshaledBlock(db ethdb.Database, hash []byte) error { + key := append(marshaledBlockPrefix, hash...) + return db.Delete(key) +} + +// EoF diff --git a/params/protocol_params.go b/params/protocol_params.go index b64e82ba54d8..1a9764f16cf9 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -202,4 +202,7 @@ var ( BlockMinBuildTime int64 = 300 // Minimum block generation time in ms BlockMinBuildTxs int64 = 2500 // Minimum txs in a block with pending txs BlockTrailTime int64 = 300 // Time to leave for block data transfer transfer in ms + + PublicRequestsCacheLocation string = "" // Cache DB location + MaxPublicRequests int64 = 100 // Max # of public requests per second ) From 4a49d8720400946b78cff8ef5b3f9f1396aea5e6 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Tue, 19 Dec 2023 22:09:48 +0000 Subject: [PATCH 02/11] params: update comment for params.MaxPublicRequests --- params/protocol_params.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/protocol_params.go b/params/protocol_params.go index 1a9764f16cf9..e4b4c9e125d4 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -204,5 +204,5 @@ var ( BlockTrailTime int64 = 300 // Time to leave for block data transfer transfer in ms PublicRequestsCacheLocation string = "" // Cache DB location - MaxPublicRequests int64 = 100 // Max # of public requests per second + MaxPublicRequests int64 = 100 // Max # of concurrent public requests ) From 6685f224ea6c6e08fe4a9cf7eeb5c9c72faaf325 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Tue, 19 Dec 2023 22:10:30 +0000 Subject: [PATCH 03/11] internal/ethapi: added cache support for GetReceiptsByHash --- internal/ethapi/api.go | 28 ++++++++----- internal/ethapi/api_cache.go | 78 +++++++++++++++++++++++++----------- 2 files changed, 74 insertions(+), 32 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 3cb3f87a5428..a6d3552c2d26 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -738,6 +738,19 @@ func (s *PublicBlockChainAPI) GetReceiptsByHash(ctx context.Context, blockHash c apiRequestsEnter() defer apiRequestsLeave() + select { + case <-ctx.Done(): + return nil, io.EOF + default: + } + + if apiRequestsCache != nil { + if fields, err := apiCacheGetReceipts(apiRequestsCache, blockHash.Bytes()); err == nil { + log.Debug("API Cache", "found receipts", blockHash) + return fields, nil + } + } + block, err1 := s.b.BlockByHash(ctx, blockHash) if block == nil && err1 == nil { return nil, nil @@ -745,12 +758,6 @@ func (s *PublicBlockChainAPI) GetReceiptsByHash(ctx context.Context, blockHash c return nil, err1 } - select { - case <-ctx.Done(): - return nil, io.EOF - default: - } - receipts, err2 := s.b.GetReceipts(ctx, blockHash) if receipts == nil && err2 == nil { return make([]map[string]interface{}, 0), nil @@ -817,6 +824,9 @@ func (s *PublicBlockChainAPI) GetReceiptsByHash(ctx context.Context, blockHash c fieldsList = append(fieldsList, fields) } + if apiRequestsCache != nil { + apiCachePutReceipts(apiRequestsCache, blockHash.Bytes(), fieldsList) + } return fieldsList, nil } @@ -1372,8 +1382,8 @@ func RPCMarshalBlock(ctx context.Context, block *types.Block, inclTx bool, fullT } if fullTx && apiRequestsCache != nil { - if fields, err := apiCacheGetMarshaledBlock(apiRequestsCache, block.Hash().Bytes()); err == nil { - log.Debug("API Cache", "found", block.Number()) + if fields, err := apiCacheGetBlock(apiRequestsCache, block.Hash().Bytes()); err == nil { + log.Debug("API Cache", "found block", block.Number()) return fields, nil } } @@ -1413,7 +1423,7 @@ func RPCMarshalBlock(ctx context.Context, block *types.Block, inclTx bool, fullT fields["uncles"] = uncleHashes if fullTx && apiRequestsCache != nil { - apiCachePutMarshaledBlock(apiRequestsCache, block.Hash().Bytes(), fields) + apiCachePutBlock(apiRequestsCache, block.Hash().Bytes(), fields) } return fields, nil diff --git a/internal/ethapi/api_cache.go b/internal/ethapi/api_cache.go index c97f72afe75a..fd950a999923 100644 --- a/internal/ethapi/api_cache.go +++ b/internal/ethapi/api_cache.go @@ -26,8 +26,8 @@ import ( "github.com/ethereum/go-ethereum/ethdb" ) -var marshaledBlockPrefix = []byte(".block.") -var marshaledReceiptsPrefix = []byte(".receipts.") +var marshaledBlockPrefix = []byte(".blck.") +var marshaledReceiptsPrefix = []byte(".rcpt.") func apiCacheOpen(fn string) (ethdb.Database, error) { return rawdb.NewDB(fn, 0, 0, "", false) @@ -37,8 +37,8 @@ func apiCacheClose(db ethdb.Database) { db.Close() } -func apiCacheGetMarshaledBlock(db ethdb.Database, hash []byte) (map[string]interface{}, error) { - key := append(marshaledBlockPrefix, hash...) +func apiCacheGet(db ethdb.Database, prefix, hash []byte) ([]byte, error) { + key := append(prefix, hash...) data, err := db.Get(key) if err != nil { return nil, err @@ -54,41 +54,73 @@ func apiCacheGetMarshaledBlock(db ethdb.Database, hash []byte) (map[string]inter if err != nil { return nil, err } - - var fields map[string]interface{} - if err = json.Unmarshal(decompressedData, &fields); err != nil { - return nil, err - } - return fields, nil + return decompressedData, nil } -func apiCacheHasMarshaledBlock(db ethdb.Database, hash []byte) (bool, error) { - key := append(marshaledBlockPrefix, hash...) +func apiCacheHas(db ethdb.Database, prefix, hash []byte) (bool, error) { + key := append(prefix, hash...) return db.Has(key) } -func apiCachePutMarshaledBlock(db ethdb.Database, hash []byte, fields map[string]interface{}) error { - data, err := json.Marshal(fields) - if err != nil { - return err - } - +func apiCachePut(db ethdb.Database, prefix, hash, data []byte) error { var buf bytes.Buffer gzWriter := gzip.NewWriter(&buf) - if _, err = gzWriter.Write(data); err != nil { + if _, err := gzWriter.Write(data); err != nil { return err } - if err = gzWriter.Close(); err != nil { + if err := gzWriter.Close(); err != nil { return err } - key := append(marshaledBlockPrefix, hash...) + key := append(prefix, hash...) return db.Put(key, buf.Bytes()) } -func apiCacheDeleteMarshaledBlock(db ethdb.Database, hash []byte) error { - key := append(marshaledBlockPrefix, hash...) +func apiCacheDelete(db ethdb.Database, prefix, hash []byte) error { + key := append(prefix, hash...) return db.Delete(key) } +func apiCacheGetBlock(db ethdb.Database, hash []byte) (map[string]interface{}, error) { + data, err := apiCacheGet(db, marshaledBlockPrefix, hash) + if err != nil { + return nil, err + } + + var fields map[string]interface{} + if err = json.Unmarshal(data, &fields); err != nil { + return nil, err + } + return fields, nil +} + +func apiCachePutBlock(db ethdb.Database, hash []byte, fields map[string]interface{}) error { + data, err := json.Marshal(fields) + if err != nil { + return err + } + return apiCachePut(db, marshaledBlockPrefix, hash, data) +} + +func apiCacheGetReceipts(db ethdb.Database, hash []byte) ([]map[string]interface{}, error) { + data, err := apiCacheGet(db, marshaledReceiptsPrefix, hash) + if err != nil { + return nil, err + } + + var fields []map[string]interface{} + if err = json.Unmarshal(data, &fields); err != nil { + return nil, err + } + return fields, nil +} + +func apiCachePutReceipts(db ethdb.Database, hash []byte, fields []map[string]interface{}) error { + data, err := json.Marshal(fields) + if err != nil { + return err + } + return apiCachePut(db, marshaledReceiptsPrefix, hash, data) +} + // EoF From 602a9b55694ba2f38c309d3b0ad3d39381d52222 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 20 Dec 2023 04:59:34 +0000 Subject: [PATCH 04/11] internal/ethapi: made ecrecover run parallel in GetBlockByXXX --- internal/ethapi/api.go | 34 +++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index a6d3552c2d26..08d85442cc50 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -25,6 +25,7 @@ import ( "os" "runtime" "strings" + "sync" "time" "github.com/davecgh/go-spew/spew" @@ -53,8 +54,9 @@ import ( "github.com/tyler-smith/go-bip39" ) -var apiRequestsThrottle chan struct{} var apiRequestsCache ethdb.Database +var apiRequestsThrottle chan struct{} +var apiRequestsTokens chan struct{} func apiRequestsEnter() { if len(apiRequestsThrottle) >= int(params.MaxPublicRequests) { @@ -714,6 +716,11 @@ func NewPublicBlockChainAPI(b Backend) *PublicBlockChainAPI { } } apiRequestsThrottle = make(chan struct{}, params.MaxPublicRequests) + tokens := runtime.NumCPU() * 8 / 10 + if tokens < 4 { + tokens = 4 + } + apiRequestsTokens = make(chan struct{}, tokens) return &PublicBlockChainAPI{b} } @@ -1402,16 +1409,33 @@ func RPCMarshalBlock(ctx context.Context, block *types.Block, inclTx bool, fullT } txs := block.Transactions() transactions := make([]interface{}, len(txs)) + var wg sync.WaitGroup var err error for i, tx := range txs { - if transactions[i], err = formatTx(tx); err != nil { + wg.Add(1) + go func(ii int, itx *types.Transaction) { + apiRequestsTokens <- struct{}{} + defer func() { + wg.Done() + <-apiRequestsTokens + }() + select { case <-ctx.Done(): - return nil, io.EOF + err = io.EOF + return default: } - return nil, err - } + var err2 error + transactions[ii], err2 = formatTx(itx) + if err2 != nil { + err = err2 + } + }(i, tx) + } + wg.Wait() + if err != nil { + return nil, err } fields["transactions"] = transactions } From da3cbaa91b1bfa00a29af57a95dd3b597d968512 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Thu, 21 Dec 2023 00:56:04 +0000 Subject: [PATCH 05/11] eth/protocols/eth: reduced maxKnownTxs to 100000, to ease memory pressure --- eth/protocols/eth/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 481ceaf5323d..8263fef4b60f 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -34,7 +34,7 @@ import ( const ( // maxKnownTxs is the maximum transactions hashes to keep in the known list // before starting to randomly evict them. - maxKnownTxs = 2000000 + maxKnownTxs = 100000 // maxKnownBlocks is the maximum block hashes to keep in the known list // before starting to randomly evict them. From 5a47b5b08549d61ea71e06cb183e4af34296b10d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=84=92=E1=85=A1=E1=86=AB=E1=84=8B=E1=85=AF=E1=86=AB?= =?UTF-8?q?=E1=84=8C=E1=85=AE=E1=86=AB?= Date: Thu, 21 Dec 2023 16:22:29 +0900 Subject: [PATCH 06/11] cmd/geth,cmd/utils,params: Add random list setting for Bootnodes. --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 35 +++++++++++++++++++++++++++++++++-- params/protocol_params.go | 1 + 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 86003be2f410..1cdbfe866863 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -227,6 +227,7 @@ var ( utils.BlockTrailTime, utils.PublicRequestsCacheLocation, utils.MaxPublicRequests, + utils.BootnodeCount, } ) diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 3da01589cba9..1aa1f77cce9d 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -237,6 +237,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.BlockTrailTime, utils.PublicRequestsCacheLocation, utils.MaxPublicRequests, + utils.BootnodeCount, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 4e1ccf4dbcbd..96e8cd654c53 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -23,6 +23,7 @@ import ( "io" "math" "math/big" + "math/rand" "os" "path/filepath" godebug "runtime/debug" @@ -926,6 +927,11 @@ var ( Usage: "Max # of concurrent public requests", Value: params.MaxPublicRequests, } + BootnodeCount = cli.IntFlag{ + Name: "wemix.bootnodecount", + Usage: "Default bootnode peer count", + Value: params.BootnodeCount, + } ) var ( @@ -1021,15 +1027,40 @@ func setNodeUserIdent(ctx *cli.Context, cfg *node.Config) { } } +// setRandomBootstrapNodes setting a random list of bootstrap nodes using the command line +func setRandomBootstrapNodes(ctx *cli.Context, bootnodes []string) []string { + rand.Seed(time.Now().UnixNano()) + bootnodeslen := len(bootnodes) + + // check command line + if ctx.GlobalIsSet(BootnodeCount.Name) { + setcount := ctx.GlobalInt(BootnodeCount.Name) + if setcount > 0 && setcount <= bootnodeslen { + params.BootnodeCount = setcount + } + } + // select random bootnodes + selectcount := params.BootnodeCount + urls := make([]string, selectcount) + tempnode := make([]string, bootnodeslen) + copy(tempnode, bootnodes) + for i := 0; i < selectcount; i++ { + index := rand.Intn(len(tempnode)) + urls = append(urls, tempnode[index]) + tempnode = append(tempnode[:index], tempnode[index+1:]...) + } + return urls +} + // setBootstrapNodes creates a list of bootstrap nodes from the command line // flags, reverting to pre-configured ones if none have been specified. func setBootstrapNodes(ctx *cli.Context, cfg *p2p.Config) { - urls := params.WemixMainnetBootnodes + urls := setRandomBootstrapNodes(ctx, params.WemixMainnetBootnodes) switch { case ctx.GlobalIsSet(BootnodesFlag.Name): urls = SplitAndTrim(ctx.GlobalString(BootnodesFlag.Name)) case ctx.GlobalBool(WemixTestnetFlag.Name): - urls = params.WemixTestnetBootnodes + urls = setRandomBootstrapNodes(ctx, params.WemixTestnetBootnodes) case ctx.GlobalBool(RopstenFlag.Name): urls = params.RopstenBootnodes case ctx.GlobalBool(SepoliaFlag.Name): diff --git a/params/protocol_params.go b/params/protocol_params.go index e4b4c9e125d4..86623d79d990 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -205,4 +205,5 @@ var ( PublicRequestsCacheLocation string = "" // Cache DB location MaxPublicRequests int64 = 100 // Max # of concurrent public requests + BootnodeCount int = 3 // Default bootnode peer count. ) From 35167c8fb2d3319e9784972e612ffbbf26827eb1 Mon Sep 17 00:00:00 2001 From: cp-jhjin <111549266+cp-jhjin@users.noreply.github.com> Date: Tue, 26 Dec 2023 17:23:08 +0900 Subject: [PATCH 07/11] params: Gwemix v0.10.7 --- params/version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/params/version.go b/params/version.go index fe5e96fcb700..65bce5efb03b 100644 --- a/params/version.go +++ b/params/version.go @@ -23,7 +23,7 @@ import ( const ( VersionMajor = 0 // Major version component of the current release VersionMinor = 10 // Minor version component of the current release - VersionPatch = 6 // Patch version component of the current release + VersionPatch = 7 // Patch version component of the current release VersionMeta = "stable" // Version metadata to append to the version string ) From 2fcc34a3d96add87c3dbb493880d4f1802e6131c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=84=92=E1=85=A1=E1=86=AB=E1=84=8B=E1=85=AF=E1=86=AB?= =?UTF-8?q?=E1=84=8C=E1=85=AE=E1=86=AB?= Date: Fri, 26 Jan 2024 10:03:10 +0900 Subject: [PATCH 08/11] internal/ethapi: made ecrecover run parallel in GetReceiptsByHash --- internal/ethapi/api.go | 112 +++++++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 50 deletions(-) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 5d931ff6590f..38d14091ba11 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -770,61 +770,73 @@ func (s *BlockChainAPI) GetReceiptsByHash(ctx context.Context, blockHash common. if receipts.Len() != txs.Len() { return nil, fmt.Errorf("the size of transactions and receipts is different in the block (%s)", blockHash.String()) } - fieldsList := make([]map[string]interface{}, 0, len(receipts)) + + isLondon := s.b.ChainConfig().IsLondon(new(big.Int).SetUint64(block.NumberU64())) + baseFee := new(big.Int).Set(common.Big0) + if isLondon { + baseFee = block.BaseFee() + } + + fieldsList := make([]map[string]interface{}, len(receipts)) + + var wg sync.WaitGroup for index, receipt := range receipts { - select { - case <-ctx.Done(): - return nil, io.EOF - default: - } - - bigblock := new(big.Int).SetUint64(block.NumberU64()) - signer := types.MakeSigner(s.b.ChainConfig(), bigblock) - from, _ := types.Sender(signer, txs[index]) - - fields := map[string]interface{}{ - "blockHash": blockHash, - "blockNumber": hexutil.Uint64(block.NumberU64()), - "transactionHash": receipt.TxHash, - "transactionIndex": hexutil.Uint64(index), - "from": from, - "to": txs[index].To(), - "gasUsed": hexutil.Uint64(receipt.GasUsed), - "cumulativeGasUsed": hexutil.Uint64(receipt.CumulativeGasUsed), - "contractAddress": nil, - "logs": receipt.Logs, - "logsBloom": receipt.Bloom, - "type": hexutil.Uint(txs[index].Type()), - } - - // Assign the effective gas price paid - if !s.b.ChainConfig().IsLondon(bigblock) { - fields["effectiveGasPrice"] = (*hexutil.Big)(txs[index].GasPrice()) - } else { - header, err := s.b.HeaderByHash(ctx, blockHash) - if err != nil { - return nil, err + wg.Add(1) + go func(i int, txReceipt *types.Receipt, isLondon bool, baseFee *big.Int) { + apiRequestsTokens <- struct{}{} + defer func() { + wg.Done() + <-apiRequestsTokens + }() + select { + case <-ctx.Done(): + return + default: } - gasPrice := new(big.Int).Add(header.BaseFee, txs[index].EffectiveGasTipValue(header.BaseFee)) - fields["effectiveGasPrice"] = (*hexutil.Big)(gasPrice) - } - // Assign receipt status or post state. - if len(receipt.PostState) > 0 { - fields["root"] = hexutil.Bytes(receipt.PostState) - } else { - fields["status"] = hexutil.Uint(receipt.Status) - } - if receipt.Logs == nil { - fields["logs"] = []*types.Log{} - } - // If the ContractAddress is 20 0x0 bytes, assume it is not a contract creation - if receipt.ContractAddress != (common.Address{}) { - fields["contractAddress"] = receipt.ContractAddress - } - fieldsList = append(fieldsList, fields) + bigblock := new(big.Int).SetUint64(block.NumberU64()) + signer := types.MakeSigner(s.b.ChainConfig(), bigblock) + from, _ := types.Sender(signer, txs[i]) + fields := map[string]interface{}{ + "blockHash": blockHash, + "blockNumber": hexutil.Uint64(block.NumberU64()), + "transactionHash": txReceipt.TxHash, + "transactionIndex": hexutil.Uint64(i), + "from": from, + "to": txs[i].To(), + "gasUsed": hexutil.Uint64(txReceipt.GasUsed), + "cumulativeGasUsed": hexutil.Uint64(txReceipt.CumulativeGasUsed), + "contractAddress": nil, + "logs": txReceipt.Logs, + "logsBloom": txReceipt.Bloom, + "type": hexutil.Uint(txs[i].Type()), + } + // Assign the effective gas price paid + if !isLondon { + fields["effectiveGasPrice"] = (*hexutil.Big)(txs[i].GasPrice()) + } else { + gasPrice := new(big.Int).Add(baseFee, txs[i].EffectiveGasTipValue(baseFee)) + fields["effectiveGasPrice"] = (*hexutil.Big)(gasPrice) + } + // Assign receipt status or post state. + if len(txReceipt.PostState) > 0 { + fields["root"] = hexutil.Bytes(txReceipt.PostState) + } else { + fields["status"] = hexutil.Uint(txReceipt.Status) + } + if txReceipt.Logs == nil { + fields["logs"] = []*types.Log{} + } + // If the ContractAddress is 20 0x0 bytes, assume it is not a contract creation + if txReceipt.ContractAddress != (common.Address{}) { + fields["contractAddress"] = txReceipt.ContractAddress + } + fieldsList[i] = fields + }(index, receipt, isLondon, baseFee) } + wg.Wait() + if apiRequestsCache != nil { apiCachePutReceipts(apiRequestsCache, blockHash.Bytes(), fieldsList) } From 60b3fb7719bc7b449c83464366b5cdda05c2152b Mon Sep 17 00:00:00 2001 From: Nicolas Gotchac Date: Wed, 14 Sep 2022 18:37:53 +0200 Subject: [PATCH 09/11] node: fix HTTP server always force closing (#25755) Co-authored-by: Felix Lange --- node/rpcstack.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/node/rpcstack.go b/node/rpcstack.go index 5d411fa61e81..f45435bab22d 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -267,13 +267,15 @@ func (h *httpServer) doStop() { h.wsHandler.Store((*rpcHandler)(nil)) wsHandler.server.Stop() } + ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() err := h.server.Shutdown(ctx) - if err == ctx.Err() { + if err != nil && err == ctx.Err() { h.log.Warn("HTTP server graceful shutdown timed out") h.server.Close() } + h.listener.Close() h.log.Info("HTTP server stopped", "endpoint", h.listener.Addr()) From c25ef9b01a82f1527fd38cdd4195b7f509104a65 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Wed, 9 Aug 2023 16:00:31 +0200 Subject: [PATCH 10/11] p2p: move ping handling into pingLoop goroutine (#27887) Moving the response sending there allows tracking all peer goroutines in the peer WaitGroup. --- p2p/peer.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 469a1b797416..f3d5a0f24c2f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -112,6 +112,7 @@ type Peer struct { wg sync.WaitGroup protoErr chan error closed chan struct{} + pingRecv chan struct{} disc chan DiscReason // events receives message send / receive events if set @@ -233,6 +234,7 @@ func newPeer(log log.Logger, conn *conn, protocols []Protocol) *Peer { disc: make(chan DiscReason), protoErr: make(chan error, len(protomap)+1), // protocols + pingLoop closed: make(chan struct{}), + pingRecv: make(chan struct{}, 16), log: log.New("id", conn.node.ID(), "conn", conn.flags), } return p @@ -293,9 +295,11 @@ loop: } func (p *Peer) pingLoop() { - ping := time.NewTimer(pingInterval) defer p.wg.Done() + + ping := time.NewTimer(pingInterval) defer ping.Stop() + for { select { case <-ping.C: @@ -304,6 +308,10 @@ func (p *Peer) pingLoop() { return } ping.Reset(pingInterval) + + case <-p.pingRecv: + SendItems(p.rw, pongMsg) + case <-p.closed: return } @@ -330,7 +338,10 @@ func (p *Peer) handle(msg Msg) error { switch { case msg.Code == pingMsg: msg.Discard() - go SendItems(p.rw, pongMsg) + select { + case p.pingRecv <- struct{}{}: + case <-p.closed: + } case msg.Code == discMsg: // This is the last message. We don't need to discard or // check errors because, the connection will be closed after it. From 187aceb0751680c857840e99a07946bd42d47be5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=84=92=E1=85=A1=E1=86=AB=E1=84=8B=E1=85=AF=E1=86=AB?= =?UTF-8?q?=E1=84=8C=E1=85=AE=E1=86=AB?= Date: Tue, 30 Jan 2024 09:35:37 +0900 Subject: [PATCH 11/11] node: fixed make lint error --- node/rpcstack.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/rpcstack.go b/node/rpcstack.go index f45435bab22d..8244c892ff50 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -267,7 +267,7 @@ func (h *httpServer) doStop() { h.wsHandler.Store((*rpcHandler)(nil)) wsHandler.server.Stop() } - + ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout) defer cancel() err := h.server.Shutdown(ctx) @@ -275,7 +275,7 @@ func (h *httpServer) doStop() { h.log.Warn("HTTP server graceful shutdown timed out") h.server.Close() } - + h.listener.Close() h.log.Info("HTTP server stopped", "endpoint", h.listener.Addr())