Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix of block proc metrics #290

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions gossip/c_block_callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ var (
snapshotStorageReadTimer = metrics.GetOrRegisterTimer("chain/snapshot/storage/reads", nil)
snapshotCommitTimer = metrics.GetOrRegisterTimer("chain/snapshot/commits", nil)

blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil)
blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil)
blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil)
blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil)
blockInsertTimer = metrics.GetOrRegisterTimer("chain/inserts", nil)
blockExecutionTimer = metrics.GetOrRegisterTimer("chain/execution", nil)
blockWriteTimer = metrics.GetOrRegisterTimer("chain/write", nil)

_ = metrics.GetOrRegisterMeter("chain/reorg/executes", nil)
_ = metrics.GetOrRegisterMeter("chain/reorg/add", nil)
Expand Down Expand Up @@ -258,7 +257,7 @@ func consensusCallbackBeginBlockFn(
}

evmProcessor := blockProc.EVMModule.Start(blockCtx, statedb, evmStateReader, onNewLogAll, es.Rules)
substart := time.Now()
startOfBlockExecution := time.Now()

// Execute pre-internal transactions
preInternalTxs := blockProc.PreTxTransactor.PopInternalTxs(blockCtx, bs, es, sealing, statedb)
Expand Down Expand Up @@ -311,11 +310,25 @@ func consensusCallbackBeginBlockFn(

_ = evmProcessor.Execute(txs, false)

startOfBlockWriting := time.Now()

evmBlock, skippedTxs, allReceipts := evmProcessor.Finalize()
Copy link
Contributor

@thaarok thaarok Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rus-alex There is an overlap between the blockExecutionTimer and blockWriterTimer intervals, which complicates using them for the Opera profiling, as evmProcessor.Finalize() is included in both of them.

As evmProcessor.Finalize() includes mostly statedb.Commit and trieproc time is measured for the blockWriterTimer too, I suggest to include it only into the blockWriterTimer interval - move the blockExecutionTimer end before the startOfBlockWriting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

block.SkippedTxs = skippedTxs
block.Root = hash.Hash(evmBlock.Root)
block.GasUsed = evmBlock.GasUsed

// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads)
storageReadTimer.Update(statedb.StorageReads)
accountUpdateTimer.Update(statedb.AccountUpdates)
storageUpdateTimer.Update(statedb.StorageUpdates)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads)
triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(startOfBlockExecution) - trieproc - triehash)

// memorize event position of each tx
txPositions := make(map[common.Hash]ExtendedTxPosition)
for _, e := range blockEvents {
Expand Down Expand Up @@ -387,21 +400,9 @@ func consensusCallbackBeginBlockFn(
updateLowestBlockToFill(blockCtx.Idx, store)
updateLowestEpochToFill(es.Epoch, store)

// Update the metrics touched during block processing
accountReadTimer.Update(statedb.AccountReads)
storageReadTimer.Update(statedb.StorageReads)
accountUpdateTimer.Update(statedb.AccountUpdates)
storageUpdateTimer.Update(statedb.StorageUpdates)
snapshotAccountReadTimer.Update(statedb.SnapshotAccountReads)
snapshotStorageReadTimer.Update(statedb.SnapshotStorageReads)
triehash := statedb.AccountHashes + statedb.StorageHashes // save to not double count in validation
trieproc := statedb.SnapshotAccountReads + statedb.AccountReads + statedb.AccountUpdates
trieproc += statedb.SnapshotStorageReads + statedb.StorageReads + statedb.StorageUpdates
blockExecutionTimer.Update(time.Since(substart) - trieproc - triehash)
// Update the metrics touched during block validation
accountHashTimer.Update(statedb.AccountHashes)
storageHashTimer.Update(statedb.StorageHashes)
blockValidationTimer.Update(time.Since(substart) - (statedb.AccountHashes + statedb.StorageHashes - triehash))
// Update the metrics touched by new block
headBlockGauge.Update(int64(blockCtx.Idx))
headHeaderGauge.Update(int64(blockCtx.Idx))
Expand All @@ -424,7 +425,7 @@ func consensusCallbackBeginBlockFn(
accountCommitTimer.Update(statedb.AccountCommits)
storageCommitTimer.Update(statedb.StorageCommits)
snapshotCommitTimer.Update(statedb.SnapshotCommits)
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockWriteTimer.Update(time.Since(startOfBlockWriting) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockInsertTimer.UpdateSince(start)

now := time.Now()
Expand Down
27 changes: 22 additions & 5 deletions gossip/c_event_callbacks.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package gossip

import (
"context"
"errors"
"math/big"
"sync/atomic"
"time"

"github.com/Fantom-foundation/lachesis-base/gossip/dagprocessor"
"github.com/Fantom-foundation/lachesis-base/hash"
"github.com/Fantom-foundation/lachesis-base/inter/dag"
"github.com/Fantom-foundation/lachesis-base/inter/idx"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/metrics"

"github.com/Fantom-foundation/go-opera/eventcheck"
"github.com/Fantom-foundation/go-opera/eventcheck/epochcheck"
Expand All @@ -28,6 +31,10 @@ var (
errDirtyEvmSnap = errors.New("EVM snapshot is dirty")
)

var (
blockValidationTimer = metrics.GetOrRegisterTimer("chain/validation", nil)
)

func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) error {
// set some unique ID
e.SetID(s.uniqueEventIDs.sample())
Expand Down Expand Up @@ -69,7 +76,7 @@ func (s *Service) buildEvent(e *inter.MutableEventPayload, onIndexed func()) err
}

// processSavedEvent performs processing which depends on event being saved in DB
func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochState) error {
func (s *Service) processSavedEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error {
err := s.dagIndexer.Add(e)
if err != nil {
return err
Expand All @@ -80,18 +87,26 @@ func (s *Service) processSavedEvent(e *inter.EventPayload, es *iblockproc.EpochS
return errWrongMedianTime
}

begin := ctx.Value("startOfValidation").(time.Time)
blockValidationTimer.Update(time.Since(begin))

// aBFT processing
return s.engine.Process(e)
err = s.engine.Process(e)
if err != nil {
return err
}

return nil
}

// saveAndProcessEvent deletes event in a case if it fails validation during event processing
func (s *Service) saveAndProcessEvent(e *inter.EventPayload, es *iblockproc.EpochState) error {
func (s *Service) saveAndProcessEvent(ctx context.Context, e *inter.EventPayload, es *iblockproc.EpochState) error {
fixEventTxHashes(e)
// indexing event
s.store.SetEvent(e)
defer s.dagIndexer.DropNotFlushed()

err := s.processSavedEvent(e, es)
err := s.processSavedEvent(ctx, e, es)
if err != nil {
s.store.DelEvent(e.ID())
return err
Expand Down Expand Up @@ -182,6 +197,8 @@ func (s *Service) processEvent(e *inter.EventPayload) error {
atomic.StoreUint32(&s.eventBusyFlag, 1)
defer atomic.StoreUint32(&s.eventBusyFlag, 0)

ctx := context.WithValue(context.Background(), "startOfValidation", time.Now())

// repeat the checks under the mutex which may depend on volatile data
if s.store.HasEvent(e.ID()) {
return eventcheck.ErrAlreadyConnectedEvent
Expand Down Expand Up @@ -211,7 +228,7 @@ func (s *Service) processEvent(e *inter.EventPayload) error {
return err
}

err = s.saveAndProcessEvent(e, &es)
err = s.saveAndProcessEvent(ctx, e, &es)
if err != nil {
return err
}
Expand Down