From c8a1eee92c2ec6d76bbef6a9d49bfa4a75e7f84b Mon Sep 17 00:00:00 2001 From: wei-wang-cb <96202950+wei-wang-cb@users.noreply.github.com> Date: Wed, 23 Aug 2023 09:45:31 -0700 Subject: [PATCH] Guarantee the order of transactions in the block endpoint (#489) * feat: Guarantee the order of transactions in the block endpoint This commit guarantees the order of transactions in the block endpoint. It does so by creating a map of fetched transactions and using that map to return the transactions in the original order they had in the block. - Added a new struct `FetchedTransaction` to represent fetched transactions - Updated `fetchChannelTransactions` to use the new `FetchedTransaction` struct - Updated `UnsafeTransactions` to use the new `FetchedTransaction` struct and create a map of fetched transactions - Modified the return statement in `UnsafeTransactions` to use the fetched transactions from the map * chore: add timeout to mock server This commit adds a timeout to the mock server to prevent it from hanging indefinitely. This is mainly to make salus security scans happy. * chore: avoid memory leak in client This commit fixes a memory leak in the client. The client was not reading the response body of the http request when closing the body, which could lead to a memory leak. This commit fixes this by reading the response body and discarding it. This commit makes salus check pass. * chore: make check-gen happy * trigger github workflow --- client/api_account.go | 11 ++++++++-- client/api_block.go | 11 ++++++++-- client/api_call.go | 6 ++++- client/api_construction.go | 41 ++++++++++++++++++++++++++++------- client/api_events.go | 6 ++++- client/api_mempool.go | 11 ++++++++-- client/api_network.go | 16 +++++++++++--- client/api_search.go | 6 ++++- constructor/worker/worker.go | 6 ++++- examples/server/main.go | 12 +++++++++- fetcher/block.go | 28 +++++++++++++++++++----- templates/client/api.mustache | 6 ++++- 12 files changed, 131 insertions(+), 29 deletions(-) diff --git a/client/api_account.go b/client/api_account.go index 0f92cae7..66b88254 100644 --- a/client/api_account.go +++ b/client/api_account.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -87,7 +88,10 @@ func (a *AccountAPIService) AccountBalance( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -190,7 +194,10 @@ func (a *AccountAPIService) AccountCoins( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/client/api_block.go b/client/api_block.go index df227105..707e7ec1 100644 --- a/client/api_block.go +++ b/client/api_block.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -84,7 +85,10 @@ func (a *BlockAPIService) Block( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -190,7 +194,10 @@ func (a *BlockAPIService) BlockTransaction( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/client/api_call.go b/client/api_call.go index 55dd756c..9d3ac8c1 100644 --- a/client/api_call.go +++ b/client/api_call.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -87,7 +88,10 @@ func (a *CallAPIService) Call( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/client/api_construction.go b/client/api_construction.go index 38de2b61..fe9a4fc4 100644 --- a/client/api_construction.go +++ b/client/api_construction.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -79,7 +80,10 @@ func (a *ConstructionAPIService) ConstructionCombine( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -173,7 +177,10 @@ func (a *ConstructionAPIService) ConstructionDerive( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -267,7 +274,10 @@ func (a *ConstructionAPIService) ConstructionHash( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -368,7 +378,10 @@ func (a *ConstructionAPIService) ConstructionMetadata( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -463,7 +476,10 @@ func (a *ConstructionAPIService) ConstructionParse( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -563,7 +579,10 @@ func (a *ConstructionAPIService) ConstructionPayloads( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -661,7 +680,10 @@ func (a *ConstructionAPIService) ConstructionPreprocess( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -758,7 +780,10 @@ func (a *ConstructionAPIService) ConstructionSubmit( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/client/api_events.go b/client/api_events.go index 92bc94a2..5d2cf05a 100644 --- a/client/api_events.go +++ b/client/api_events.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -82,7 +83,10 @@ func (a *EventsAPIService) EventsBlocks( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/client/api_mempool.go b/client/api_mempool.go index 2d7f1236..8402fae7 100644 --- a/client/api_mempool.go +++ b/client/api_mempool.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -77,7 +78,10 @@ func (a *MempoolAPIService) Mempool( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -176,7 +180,10 @@ func (a *MempoolAPIService) MempoolTransaction( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/client/api_network.go b/client/api_network.go index 2db0a84c..0ff33515 100644 --- a/client/api_network.go +++ b/client/api_network.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -77,7 +78,10 @@ func (a *NetworkAPIService) NetworkList( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -173,7 +177,10 @@ func (a *NetworkAPIService) NetworkOptions( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } @@ -267,7 +274,10 @@ func (a *NetworkAPIService) NetworkStatus( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/client/api_search.go b/client/api_search.go index 7d873143..04859beb 100644 --- a/client/api_search.go +++ b/client/api_search.go @@ -19,6 +19,7 @@ package client import ( _context "context" "fmt" + "io" _ioutil "io/ioutil" _nethttp "net/http" @@ -82,7 +83,10 @@ func (a *SearchAPIService) SearchTransactions( } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) } diff --git a/constructor/worker/worker.go b/constructor/worker/worker.go index 9e25107f..109cf6e3 100644 --- a/constructor/worker/worker.go +++ b/constructor/worker/worker.go @@ -19,6 +19,7 @@ import ( "context" "errors" "fmt" + "io" "io/ioutil" "log" "net/http" @@ -871,7 +872,10 @@ func HTTPRequestWorker(rawInput string) (string, error) { if err != nil { return "", fmt.Errorf("failed to send request: %w", err) } - defer resp.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() body, err := ioutil.ReadAll(resp.Body) if err != nil { diff --git a/examples/server/main.go b/examples/server/main.go index cef1103e..30ee022b 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -18,6 +18,7 @@ import ( "fmt" "log" "net/http" + "time" "github.com/coinbase/rosetta-sdk-go/asserter" "github.com/coinbase/rosetta-sdk-go/examples/server/services" @@ -76,5 +77,14 @@ func main() { loggedRouter := server.LoggerMiddleware(router) corsRouter := server.CorsMiddleware(loggedRouter) log.Printf("Listening on port %d\n", serverPort) - log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", serverPort), corsRouter)) + + srv := &http.Server{ + Addr: fmt.Sprintf(":%d", serverPort), + Handler: corsRouter, + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + IdleTimeout: 15 * time.Second, + } + + log.Fatal(srv.ListenAndServe()) } diff --git a/fetcher/block.go b/fetcher/block.go index 53c04e1e..3e85dfe9 100644 --- a/fetcher/block.go +++ b/fetcher/block.go @@ -39,6 +39,14 @@ const ( minRoutines = 1 ) +// FetchedTransaction is a structure that represents a transaction fetched +// from the network. It includes the TransactionIdentifier which uniquely +// identifies the transaction on the blockchain, and the Transaction itself. +type FetchedTransaction struct { + TxIdentifier *types.TransactionIdentifier + Tx *types.Transaction +} + // addTransactionIdentifiers appends a slice of // types.TransactionIdentifiers to a channel. // When all types.TransactionIdentifiers are added, @@ -69,7 +77,7 @@ func (f *Fetcher) fetchChannelTransactions( network *types.NetworkIdentifier, block *types.BlockIdentifier, txsToFetch chan *types.TransactionIdentifier, - fetchedTxs chan *types.Transaction, + fetchedTxs chan *FetchedTransaction, ) *Error { // We keep the lock for all transactions we fetch in this goroutine. if err := f.connectionSemaphore.Acquire(ctx, semaphoreRequestWeight); err != nil { @@ -121,7 +129,10 @@ func (f *Fetcher) fetchChannelTransactions( } select { - case fetchedTxs <- tx.Transaction: + case fetchedTxs <- &FetchedTransaction{ + TxIdentifier: transactionIdentifier, + Tx: tx.Transaction, + }: case <-ctx.Done(): return &Error{ Err: ctx.Err(), @@ -149,7 +160,7 @@ func (f *Fetcher) UnsafeTransactions( } txsToFetch := make(chan *types.TransactionIdentifier) - fetchedTxs := make(chan *types.Transaction) + fetchedTxs := make(chan *FetchedTransaction) var fetchErr *Error g, ctx := errgroup.WithContext(ctx) g.Go(func() error { @@ -188,9 +199,14 @@ func (f *Fetcher) UnsafeTransactions( close(fetchedTxs) }() - txs := make([]*types.Transaction, 0) - for tx := range fetchedTxs { - txs = append(txs, tx) + fetchedTxMap := make(map[string]*types.Transaction) + for fetchedTx := range fetchedTxs { + fetchedTxMap[fetchedTx.TxIdentifier.Hash] = fetchedTx.Tx + } + + txs := make([]*types.Transaction, len(transactionIdentifiers)) + for i, txID := range transactionIdentifiers { + txs[i] = fetchedTxMap[txID.Hash] } if err := g.Wait(); err != nil { diff --git a/templates/client/api.mustache b/templates/client/api.mustache index 1532edf2..a2e8243f 100644 --- a/templates/client/api.mustache +++ b/templates/client/api.mustache @@ -7,6 +7,7 @@ import ( _ioutil "io/ioutil" _nethttp "net/http" "fmt" + "io" "github.com/coinbase/rosetta-sdk-go/types" {{#imports}} "{{import}}" @@ -83,7 +84,10 @@ func (a *{{{classname}}}Service) {{{nickname}}}(ctx _context.Context{{#hasParams } localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body) - defer localVarHTTPResponse.Body.Close() + defer func() { + _, _ = io.Copy(io.Discard, localVarHTTPResponse.Body) + _ = localVarHTTPResponse.Body.Close() + }() if err != nil { return nil, nil, fmt.Errorf("failed to read response: %w", err) }