Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fix a high memory consumption that also is part of the issue pokt-network#1457.
Under high load of requests (1000/rps or more) the RAM got crazy and scale up to 40GB or close to that.
Now after the fix of pokt-network#1457 with the worker pool, the node remains under 14gb of ram in my local tests.
  • Loading branch information
jorgecuesta committed Jan 22, 2024
1 parent 16f6f31 commit 5cabe5c
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 23 deletions.
2 changes: 2 additions & 0 deletions app/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func NewInMemoryTendermintNodeAminoWithValidators(t *testing.T, genesisState []b
panic(err)
}
pocketTypes.CleanPocketNodes()
pocketTypes.StopEvidenceWorker()
PCA = nil
inMemKB = nil
err := inMemDB.Close()
Expand Down Expand Up @@ -170,6 +171,7 @@ func NewInMemoryTendermintNodeProtoWithValidators(t *testing.T, genesisState []b
}

pocketTypes.CleanPocketNodes()
pocketTypes.StopEvidenceWorker()

PCA = nil
inMemKB = nil
Expand Down
1 change: 1 addition & 0 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ func InitPocketCoreConfig(chains *types.HostedBlockchains, logger log.Logger) {
}

func ShutdownPocketCore() {
types.StopEvidenceWorker()
types.FlushSessionCache()
types.StopServiceMetrics()
}
Expand Down
44 changes: 28 additions & 16 deletions app/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ func TestQueryRelay(t *testing.T) {
_, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock)
select {
case <-evtChan:
assert.Equal(t, uint64(1), types.GlobalEvidenceWorker.SuccessfulTasks())
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: relay.Proof.Blockchain,
Expand Down Expand Up @@ -858,16 +859,18 @@ func TestQueryRelayMultipleNodes(t *testing.T) {
}
_, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock)
<-evtChan // Wait for block
chain := sdk.PlaceholderHash
sessionBlockHeight := int64(1)
// setup relay
for _, v := range validators {
relay := types.Relay{
Payload: payload,
Meta: types.RelayMeta{BlockHeight: 5}, // todo race condition here
Proof: types.RelayProof{
Entropy: 32598345349034509,
SessionBlockHeight: 1,
SessionBlockHeight: sessionBlockHeight,
ServicerPubKey: v.PublicKey().RawString(),
Blockchain: sdk.PlaceholderHash,
Blockchain: chain,
Token: aat,
Signature: "",
},
Expand All @@ -886,23 +889,32 @@ func TestQueryRelayMultipleNodes(t *testing.T) {
BodyString(expectedRequest).
Reply(200).
BodyString(expectedResponse)
}

validatorAddress := sdk.GetAddress(v.PublicKey())
node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress)
select {
case <-evtChan:
// verify that each store task was successful
assert.Equal(t, uint64(len(validators)), types.GlobalEvidenceWorker.SuccessfulTasks())
// check the evidence store of each node.
for _, v := range validators {
validatorAddress := sdk.GetAddress(v.PublicKey())
_node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress)

assert.Nil(t, nodeErr)
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: chain,
SessionBlockHeight: sessionBlockHeight,
}, types.RelayEvidence, sdk.NewInt(10000), _node.EvidenceStore)
assert.Nil(t, err)
assert.NotNil(t, inv)
assert.Equal(t, int64(1), inv.NumOfProofs)
}

assert.Nil(t, nodeErr)
inv, err := types.GetEvidence(types.SessionHeader{
ApplicationPubKey: aat.ApplicationPublicKey,
Chain: relay.Proof.Blockchain,
SessionBlockHeight: relay.Proof.SessionBlockHeight,
}, types.RelayEvidence, sdk.NewInt(10000), node.EvidenceStore)
assert.Nil(t, err)
assert.NotNil(t, inv)
assert.Equal(t, inv.NumOfProofs, int64(1))
cleanup()
stopCli()
gock.Off()
}
cleanup()
stopCli()
gock.Off()
return
})
}
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ replace github.com/tendermint/tm-db => github.com/pokt-network/tm-db v0.5.2-0.20
require (
github.com/cosmos/gogoproto v1.4.10
github.com/cucumber/godog v0.12.5
github.com/alitto/pond v1.8.1
github.com/cucumber/godog v0.12.6
github.com/go-kit/kit v0.12.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
Expand All @@ -19,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/regen-network/cosmos-proto v0.3.0
github.com/spf13/cobra v1.4.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/tendermint/go-amino v0.15.1
github.com/tendermint/tendermint v0.33.7
Expand All @@ -43,7 +45,7 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.5.9 // indirect
Expand All @@ -52,7 +54,7 @@ require (
github.com/gtank/ristretto255 v0.1.2 // indirect
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-memdb v1.3.0 // indirect
github.com/hashicorp/go-memdb v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmhodges/levigo v1.0.0 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type PocketConfig struct {
ValidatorCacheSize int64 `json:"validator_cache_size"`
ApplicationCacheSize int64 `json:"application_cache_size"`
RPCTimeout int64 `json:"rpc_timeout"`
RPCMaxIdleConns int `json:"rpc_max_idle_conns"`
RPCMaxConnsPerHost int `json:"rpc_max_conns_per_host"`
RPCMaxIdleConnsPerHost int `json:"rpc_max_idle_conns_per_host"`
PrometheusAddr string `json:"pocket_prometheus_port"`
PrometheusMaxOpenfiles int `json:"prometheus_max_open_files"`
MaxClaimAgeForProofRetry int `json:"max_claim_age_for_proof_retry"`
Expand Down Expand Up @@ -104,6 +107,9 @@ const (
DefaultPocketPrometheusListenAddr = "8083"
DefaultPrometheusMaxOpenFile = 3
DefaultRPCTimeout = 30000
DefaultRPCMaxIdleConns = 1000
DefaultRPCMaxConnsPerHost = 1000
DefaultRPCMaxIdleConnsPerHost = 1000
DefaultMaxClaimProofRetryAge = 32
DefaultProofPrevalidation = false
DefaultCtxCacheSize = 20
Expand Down Expand Up @@ -138,6 +144,9 @@ func DefaultConfig(dataDir string) Config {
ValidatorCacheSize: DefaultValidatorCacheSize,
ApplicationCacheSize: DefaultApplicationCacheSize,
RPCTimeout: DefaultRPCTimeout,
RPCMaxIdleConns: DefaultRPCMaxIdleConns,
RPCMaxConnsPerHost: DefaultRPCMaxConnsPerHost,
RPCMaxIdleConnsPerHost: DefaultRPCMaxIdleConnsPerHost,
PrometheusAddr: DefaultPocketPrometheusListenAddr,
PrometheusMaxOpenfiles: DefaultPrometheusMaxOpenFile,
MaxClaimAgeForProofRetry: DefaultMaxClaimProofRetryAge,
Expand Down
8 changes: 6 additions & 2 deletions x/pocketcore/keeper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,12 @@ func (k Keeper) HandleRelay(ctx sdk.Ctx, relay pc.Relay) (*pc.RelayResponse, sdk
}
return nil, err
}
// store the proof before execution, because the proof corresponds to the previous relay
relay.Proof.Store(maxPossibleRelays, node.EvidenceStore)
// move this to a worker that will insert this proof in a series style to avoid memory consumption and relay proof race conditions
// https://github.com/pokt-network/pocket-core/issues/1457
pc.GlobalEvidenceWorker.Submit(func() {
// store the proof before execution, because the proof corresponds to the previous relay
relay.Proof.Store(maxPossibleRelays, node.EvidenceStore)
})
// attempt to execute
respPayload, err := relay.Execute(hostedBlockchains, &nodeAddress)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions x/pocketcore/keeper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ func testRelayAt(
assert.Nil(t, er)
validRelay.Proof.Signature = hex.EncodeToString(clientSig)

httpClient := types.GetChainsClient()
defer gock.Off() // Flush pending mocks after test execution
defer gock.RestoreClient(httpClient)
gock.InterceptClient(httpClient)
gock.New("https://www.google.com:443").
Post("/").
Reply(200).
Expand Down
23 changes: 23 additions & 0 deletions x/pocketcore/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"encoding/hex"
"fmt"
"github.com/alitto/pond"
"github.com/pokt-network/pocket-core/crypto"
"github.com/pokt-network/pocket-core/types"
"github.com/tendermint/tendermint/config"
Expand All @@ -20,13 +21,16 @@ var (
globalRPCTimeout time.Duration
GlobalPocketConfig types.PocketConfig
GlobalTenderMintConfig config.Config
GlobalEvidenceWorker *pond.WorkerPool
)

func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) {
ConfigOnce.Do(func() {
InitGlobalServiceMetric(chains, logger, c.PocketConfig.PrometheusAddr, c.PocketConfig.PrometheusMaxOpenfiles)
})
InitHttpClient(c.PocketConfig.RPCMaxIdleConns, c.PocketConfig.RPCMaxConnsPerHost, c.PocketConfig.RPCMaxIdleConnsPerHost)
InitPocketNodeCaches(c, logger)
InitEvidenceWorker(c, logger)
GlobalPocketConfig = c.PocketConfig
GlobalTenderMintConfig = c.TendermintConfig
if GlobalPocketConfig.LeanPocket {
Expand All @@ -37,6 +41,18 @@ func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) {
SetRPCTimeout(c.PocketConfig.RPCTimeout)
}

func InitEvidenceWorker(_ types.Config, logger log.Logger) {
panicHandler := func(p interface{}) {
logger.Error(fmt.Sprintf("evidence storage task panicked: %v", p))
}
GlobalEvidenceWorker = pond.New(
1, 0,
pond.IdleTimeout(100),
pond.PanicHandler(panicHandler),
pond.Strategy(pond.Balanced()),
)
}

func ConvertEvidenceToProto(config types.Config) error {
// we have to add a random pocket node so that way lean pokt can still support getting the global evidence cache
node := AddPocketNode(crypto.GenerateEd25519PrivKey().GenPrivateKey(), log.NewNopLogger())
Expand Down Expand Up @@ -67,6 +83,13 @@ func ConvertEvidenceToProto(config types.Config) error {
return nil
}

func StopEvidenceWorker() {
if !GlobalEvidenceWorker.Stopped() {
GlobalEvidenceWorker.StopAndWait()
}
GlobalEvidenceWorker = nil
}

func FlushSessionCache() {
for _, k := range GlobalPocketNodes {
if k.SessionStore != nil {
Expand Down
36 changes: 35 additions & 1 deletion x/pocketcore/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,47 @@ import (

const DEFAULTHTTPMETHOD = "POST"

var (
chainHttpClient *http.Client
)

// "Relay" - A read / write API request from a hosted (non native) external blockchain
type Relay struct {
Payload Payload `json:"payload"` // the data payload of the request
Meta RelayMeta `json:"meta"` // metadata for the relay request
Proof RelayProof `json:"proof"` // the authentication scheme needed for work
}

func GetChainsClient() *http.Client {
if chainHttpClient == nil {
InitHttpClient(1000, 1000, 1000)
}

return chainHttpClient
}

func InitHttpClient(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int) {
var chainsTransport *http.Transport

t, ok := http.DefaultTransport.(*http.Transport)
if ok {
chainsTransport = t.Clone()
// this params may could be handled by config.json, but how much people know about this?
// tbd: figure out the right values to this, rn the priority is stop recreating new http client and connections.
chainsTransport.MaxIdleConns = maxIdleConns
chainsTransport.MaxConnsPerHost = maxConnsPerHost
chainsTransport.MaxIdleConnsPerHost = maxIdleConnsPerHost
} // if not ok, probably is because is *gock.Transport - test only

chainHttpClient = &http.Client{
Timeout: globalRPCTimeout * time.Second,
}

if chainsTransport != nil {
chainHttpClient.Transport = chainsTransport
}
}

// "Validate" - Checks the validity of a relay request using store data
func (r *Relay) Validate(
ctx sdk.Ctx,
Expand Down Expand Up @@ -346,7 +380,7 @@ func executeHTTPRequest(payload, url, userAgent string, basicAuth BasicAuth, met
}
}
// execute the request
resp, err := (&http.Client{Timeout: globalRPCTimeout * time.Millisecond}).Do(req)
resp, err := GetChainsClient().Do(req)
if err != nil {
return "", err
}
Expand Down
5 changes: 4 additions & 1 deletion x/pocketcore/types/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ func TestRelay_Execute(t *testing.T) {
},
}
validRelay.Proof.RequestHash = validRelay.RequestHashString()
httpClient := GetChainsClient()
defer gock.Off() // Flush pending mocks after test execution

defer gock.RestoreClient(httpClient)
gock.InterceptClient(httpClient)
gock.New("https://server.com").
Post("/relay").
Reply(200).
Expand All @@ -182,6 +184,7 @@ func TestRelay_Execute(t *testing.T) {
URL: "https://server.com/relay/",
}},
}

response, err := validRelay.Execute(&hb, &nodeAddr)
assert.True(t, err == nil)
assert.Equal(t, response, "bar")
Expand Down

0 comments on commit 5cabe5c

Please sign in to comment.