From 44a8aa74d7a016d11b274d141a1f20845d35aff7 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 25 Apr 2022 03:16:46 +1000 Subject: [PATCH 1/4] blockValidationTimer measures event validation time --- gossip/c_block_callbacks.go | 8 +++----- gossip/c_event_callbacks.go | 29 ++++++++++++++++++++++++----- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index 103b66404..6597c0707 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -50,10 +50,9 @@ var ( snapshotStorageReadTimer = metrics.GetOrRegisterTimer("chain/snapshot/storage/reads", nil) snapshotCommitTimer = metrics.GetOrRegisterTimer("chain/snapshot/commits", nil) - blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil) - blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil) - blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil) - blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil) + blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil) + blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil) + blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil) _ = metrics.GetOrRegisterMeter("chain/reorg/executes", nil) _ = metrics.GetOrRegisterMeter("chain/reorg/add", nil) @@ -401,7 +400,6 @@ func consensusCallbackBeginBlockFn( // Update the metrics touched during block validation accountHashTimer.Update(statedb.AccountHashes) storageHashTimer.Update(statedb.StorageHashes) - blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash)) // Update the metrics touched by new block headBlockGauge.Update(int64(blockCtx.Idx)) headHeaderGauge.Update(int64(blockCtx.Idx)) diff --git a/gossip/c_event_callbacks.go b/gossip/c_event_callbacks.go index ba96b2058..d867b3d76 100644 --- a/gossip/c_event_callbacks.go +++ b/gossip/c_event_callbacks.go @@ -1,15 +1,18 @@ package gossip import ( + "context" "errors" "math/big" "sync/atomic" + "time" "github.com/Fantom-foundation/lachesis-base/gossip/dagprocessor" "github.com/Fantom-foundation/lachesis-base/hash" "github.com/Fantom-foundation/lachesis-base/inter/dag" "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/metrics" "github.com/Fantom-foundation/go-opera/eventcheck" "github.com/Fantom-foundation/go-opera/eventcheck/epochcheck" @@ -28,6 +31,10 @@ var ( errDirtyEvmSnap = errors.New("EVM snapshot is dirty") ) +var ( + blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil) +) + func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) error { // set some unique ID e.SetID(s.uniqueEventIDs.sample()) @@ -69,7 +76,7 @@ func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) err } // processSavedEvent performs processing which depends on event being saved in DB -func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochState) error { +func (s *Service) processSavedEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error { err := s.dagIndexer.Add(e) if err != nil { return err @@ -80,18 +87,28 @@ func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochS return errWrongMedianTime } + finish := time.Now() + // aBFT processing - return s.engine.Process(e) + err = s.engine.Process(e) + if err != nil { + return err + } + + begin := ctx.Value("begin").(time.Time) + blockValidationTimer.Update(finish.Sub(begin)) + + return nil } // saveAndProcessEvent deletes event in a case if it fails validation during event processing -func (s *Service) saveAndProcessEvent(e *inter.EventPayload, es *iblockproc.EpochState) error { +func (s *Service) saveAndProcessEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error { fixEventTxHashes(e) // indexing event s.store.SetEvent(e) defer s.dagIndexer.DropNotFlushed() - err := s.processSavedEvent(e, es) + err := s.processSavedEvent(ctx, e, es) if err != nil { s.store.DelEvent(e.ID()) return err @@ -182,6 +199,8 @@ func (s *Service) processEvent(e *inter.EventPayload) error { atomic.StoreUint32(&s.eventBusyFlag, 1) defer atomic.StoreUint32(&s.eventBusyFlag, 0) + ctx := context.WithValue(context.Background(), "begin", time.Now()) + // repeat the checks under the mutex which may depend on volatile data if s.store.HasEvent(e.ID()) { return eventcheck.ErrAlreadyConnectedEvent @@ -211,7 +230,7 @@ func (s *Service) processEvent(e *inter.EventPayload) error { return err } - err = s.saveAndProcessEvent(e, &es) + err = s.saveAndProcessEvent(ctx, e, &es) if err != nil { return err } From 77936434bad33a508feea50787520c1b1ebbca5b Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 25 Apr 2022 18:08:57 +1000 Subject: [PATCH 2/4] blockExecutionTimer and blockWriteTimer fix --- gossip/c_block_callbacks.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index 6597c0707..dc30c15f1 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -315,6 +315,19 @@ func consensusCallbackBeginBlockFn( block.Root = hash.Hash(evmBlock.Root) block.GasUsed = evmBlock.GasUsed + // Update the metrics touched during block processing + accountReadTimer.Update(statedb.AccountReads) + storageReadTimer.Update(statedb.StorageReads) + accountUpdateTimer.Update(statedb.AccountUpdates) + storageUpdateTimer.Update(statedb.StorageUpdates) + snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) + snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) + triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation + trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates + trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates + blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) + substart = time.Now() + // memorize event position of each tx txPositions := make(map[common.Hash]ExtendedTxPosition) for _, e := range blockEvents { @@ -386,17 +399,6 @@ func consensusCallbackBeginBlockFn( updateLowestBlockToFill(blockCtx.Idx, store) updateLowestEpochToFill(es.Epoch, store) - // Update the metrics touched during block processing - accountReadTimer.Update(statedb.AccountReads) - storageReadTimer.Update(statedb.StorageReads) - accountUpdateTimer.Update(statedb.AccountUpdates) - storageUpdateTimer.Update(statedb.StorageUpdates) - snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads) - snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads) - triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation - trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates - trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates - blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) // Update the metrics touched during block validation accountHashTimer.Update(statedb.AccountHashes) storageHashTimer.Update(statedb.StorageHashes) From c67ba89b7e981d1a1a14582d81fd725d580bbd3f Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 25 Apr 2022 19:54:33 +1000 Subject: [PATCH 3/4] tuning --- gossip/c_block_callbacks.go | 9 +++++---- gossip/c_event_callbacks.go | 8 +++----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index dc30c15f1..c691ba8b6 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -257,7 +257,7 @@ func consensusCallbackBeginBlockFn( } evmProcessor := blockProc.EVMModule.Start(blockCtx, statedb, evmStateReader, onNewLogAll, es.Rules) - substart := time.Now() + startOfBlockExecution := time.Now() // Execute pre-internal transactions preInternalTxs := blockProc.PreTxTransactor.PopInternalTxs(blockCtx, bs, es, sealing, statedb) @@ -310,6 +310,8 @@ func consensusCallbackBeginBlockFn( _ = evmProcessor.Execute(txs, false) + startOfBlockWriting := time.Now() + evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize() block.SkippedTxs = skippedTxs block.Root = hash.Hash(evmBlock.Root) @@ -325,8 +327,7 @@ func consensusCallbackBeginBlockFn( triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates - blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash) - substart = time.Now() + blockExecutionTimer.Update(time.Since(startOfBlockExecution) - trieproc - triehash) // memorize event position of each tx txPositions := make(map[common.Hash]ExtendedTxPosition) @@ -424,7 +425,7 @@ func consensusCallbackBeginBlockFn( accountCommitTimer.Update(statedb.AccountCommits) storageCommitTimer.Update(statedb.StorageCommits) snapshotCommitTimer.Update(statedb.SnapshotCommits) - blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits) + blockWriteTimer.Update(time.Since(startOfBlockWriting) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits) blockInsertTimer.UpdateSince(start) now := time.Now() diff --git a/gossip/c_event_callbacks.go b/gossip/c_event_callbacks.go index d867b3d76..76aaee4e1 100644 --- a/gossip/c_event_callbacks.go +++ b/gossip/c_event_callbacks.go @@ -87,7 +87,8 @@ func (s *Service) processSavedEvent(ctx context.Context, e *inter.EventPayload, return errWrongMedianTime } - finish := time.Now() + begin := ctx.Value("startOfValidation").(time.Time) + blockValidationTimer.Update(time.Since(begin)) // aBFT processing err = s.engine.Process(e) @@ -95,9 +96,6 @@ func (s *Service) processSavedEvent(ctx context.Context, e *inter.EventPayload, return err } - begin := ctx.Value("begin").(time.Time) - blockValidationTimer.Update(finish.Sub(begin)) - return nil } @@ -199,7 +197,7 @@ func (s *Service) processEvent(e *inter.EventPayload) error { atomic.StoreUint32(&s.eventBusyFlag, 1) defer atomic.StoreUint32(&s.eventBusyFlag, 0) - ctx := context.WithValue(context.Background(), "begin", time.Now()) + ctx := context.WithValue(context.Background(), "startOfValidation", time.Now()) // repeat the checks under the mutex which may depend on volatile data if s.store.HasEvent(e.ID()) { From bbf4551835471f7e03d3603f62ddfaaa990623bd Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 2 May 2022 12:21:47 +1000 Subject: [PATCH 4/4] no overlap between blockExecution and blockWriter timers --- gossip/c_block_callbacks.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/gossip/c_block_callbacks.go b/gossip/c_block_callbacks.go index c691ba8b6..638e4abc2 100644 --- a/gossip/c_block_callbacks.go +++ b/gossip/c_block_callbacks.go @@ -310,13 +310,6 @@ func consensusCallbackBeginBlockFn( _ = evmProcessor.Execute(txs, false) - startOfBlockWriting := time.Now() - - evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize() - block.SkippedTxs = skippedTxs - block.Root = hash.Hash(evmBlock.Root) - block.GasUsed = evmBlock.GasUsed - // Update the metrics touched during block processing accountReadTimer.Update(statedb.AccountReads) storageReadTimer.Update(statedb.StorageReads) @@ -329,6 +322,13 @@ func consensusCallbackBeginBlockFn( trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates blockExecutionTimer.Update(time.Since(startOfBlockExecution) - trieproc - triehash) + startOfBlockWriting := time.Now() + + evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize() + block.SkippedTxs = skippedTxs + block.Root = hash.Hash(evmBlock.Root) + block.GasUsed = evmBlock.GasUsed + // memorize event position of each tx txPositions := make(map[common.Hash]ExtendedTxPosition) for _, e := range blockEvents {