diff --git a/nimbus/beacon/api_handler/api_forkchoice.nim b/nimbus/beacon/api_handler/api_forkchoice.nim index 4bcec9dd30..f574df2018 100644 --- a/nimbus/beacon/api_handler/api_forkchoice.nim +++ b/nimbus/beacon/api_handler/api_forkchoice.nim @@ -110,7 +110,7 @@ proc forkchoiceUpdated*(ben: BeaconEngineRef, # Update sync header (if any) com.syncReqNewHead(header) - com.reqBeaconSyncTargetCB(header) + com.reqBeaconSyncTargetCB(header, update.finalizedBlockHash) return simpleFCU(PayloadExecutionStatus.syncing) diff --git a/nimbus/common/common.nim b/nimbus/common/common.nim index 05804fcd55..738b93e365 100644 --- a/nimbus/common/common.nim +++ b/nimbus/common/common.nim @@ -42,7 +42,7 @@ type SyncReqNewHeadCB* = proc(header: Header) {.gcsafe, raises: [].} ## Update head for syncing - ReqBeaconSyncTargetCB* = proc(header: Header) {.gcsafe, raises: [].} + ReqBeaconSyncTargetCB* = proc(header: Header; finHash: Hash32) {.gcsafe, raises: [].} ## Ditto (for beacon sync) NotifyBadBlockCB* = proc(invalid, origin: Header) {.gcsafe, raises: [].} @@ -337,10 +337,10 @@ proc syncReqNewHead*(com: CommonRef; header: Header) if not com.syncReqNewHead.isNil: com.syncReqNewHead(header) -proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header) = +proc reqBeaconSyncTargetCB*(com: CommonRef; header: Header; finHash: Hash32) = ## Used by RPC updater if not com.reqBeaconSyncTargetCB.isNil: - com.reqBeaconSyncTargetCB(header) + com.reqBeaconSyncTargetCB(header, finHash) proc notifyBadBlock*(com: CommonRef; invalid, origin: Header) {.gcsafe, raises: [].} = diff --git a/nimbus/sync/beacon.nim b/nimbus/sync/beacon.nim index aa1fef6cc0..5cd3a52d9b 100644 --- a/nimbus/sync/beacon.nim +++ b/nimbus/sync/beacon.nim @@ -63,7 +63,7 @@ proc init*( desc.initSync(ethNode, maxPeers) desc.ctx.pool.nBodiesBatch = chunkSize # Initalise for `persistBlocks()` - desc.ctx.pool.chain = chain.com.newChain() + desc.ctx.pool.chain = chain desc proc start*(ctx: BeaconSyncRef) = diff --git a/nimbus/sync/beacon/README.md b/nimbus/sync/beacon/README.md index 0c4f1bf781..1ad9f777b8 100644 --- a/nimbus/sync/beacon/README.md +++ b/nimbus/sync/beacon/README.md @@ -47,26 +47,26 @@ A sequence *@[h(1),h(2),..]* of block headers is called a *linked chain* if General header linked chains layout diagram - 0 C D E (1) + 0 C D H (1) o----------------o---------------------o----------------o---> | <-- linked --> | <-- unprocessed --> | <-- linked --> | -Here, the single upper letter symbols *0*, *C*, *D*, *E* denote block numbers. +Here, the single upper letter symbols *0*, *C*, *D*, *H* denote block numbers. For convenience, these letters are also identified with its associated block header or the full blocks. Saying *"the header 0"* is short for *"the header with block number 0"*. -Meaning of *0*, *C*, *D*, *E*: +Meaning of *0*, *C*, *D*, *H*: * *0* -- Genesis, block number number *0* * *C* -- coupler, maximal block number of linked chain starting at *0* -* *D* -- dangling, minimal block number of linked chain ending at *E* +* *D* -- dangling, minimal block number of linked chain ending at *H* with *C <= D* -* *E* -- end, block number of some finalised block (not necessarily the latest - one) +* *H* -- head, end block number of **consensus head** (not necessarily the + latest one as this is moving while processing) -This definition implies *0 <= C <= D <= E* and the state of the header linked -chains can uniquely be described by the triple of block numbers *(C,D,E)*. +This definition implies *0 <= C <= D <= H* and the state of the header linked +chains can uniquely be described by the triple of block numbers *(C,D,H)*. ### Storage of header chains: @@ -78,7 +78,7 @@ correspond to finalised blocks, e.g. the sub-interval *[0,**base**]* where half open interval *(**base**,C]* are always stored on the *beaconHeader* column of the *KVT* database. -The block numbers from the interval *[D,E]* also reside on the *beaconHeader* +The block numbers from the interval *[D,H]* also reside on the *beaconHeader* column of the *KVT* database table. @@ -89,7 +89,7 @@ Minimal layout on a pristine system 0 (2) C D - E + H o---> When first initialised, the header linked chains are set to *(0,0,0)*. @@ -101,32 +101,32 @@ A header chain with an non empty open interval *(C,D)* can be updated only by increasing *C* or decreasing *D* by adding/prepending headers so that the linked chain condition is not violated. -Only when the gap open interval *(C,D)* vanishes, the right end *E* can be +Only when the gap open interval *(C,D)* vanishes, the right end *H* can be increased to a larger target block number *T*, say. This block number will typically be the **consensus head**. Then * *C==D* beacuse the open interval *(C,D)* is empty -* *C==E* because *C* is maximal (see definition of `C` above) +* *C==H* because *C* is maximal (see definition of `C` above) -and the header chains *(E,E,E)* (depicted in *(3)* below) can be set to +and the header chains *(H,H,H)* (depicted in *(3)* below) can be set to *(C,T,T)* as depicted in *(4)* below. -Layout before updating of *E* +Layout before updating of *H* C (3) D - 0 E T + 0 H T o----------------o---------------------o----> | <-- linked --> | -New layout with moving *D* and *E* to *T* +New layout with moving *D* and *H* to *T* D' (4) - 0 C E' + 0 C H' o----------------o---------------------o----> | <-- linked --> | <-- unprocessed --> | -with *D'=T* and *E'=T*. +with *D'=T* and *H'=T*. Note that diagram *(3)* is a generalisation of *(2)*. @@ -134,7 +134,7 @@ Note that diagram *(3)* is a generalisation of *(2)*. ### Complete a header linked chain: The header chain is *relatively complete* if it satisfies clause *(3)* above -for *0 < C*. It is *fully complete* if *E==T*. It should be obvious that the +for *0 < C*. It is *fully complete* if *H==T*. It should be obvious that the latter condition is temporary only on a live system (as *T* is contiuously updated.) @@ -146,20 +146,22 @@ database state will be updated incrementally. Block chain import/execution ----------------------------- -The following diagram with a parially imported/executed block chain amends the +The following diagram with a partially imported/executed block chain amends the layout *(1)*: - 0 B C D E (5) - o------------------o-------o---------------------o----------------o--> + 0 B L C D H (5) + o------------o-----o-------o---------------------o----------------o--> | <-- imported --> | | | | | <------- linked ------> | <-- unprocessed --> | <-- linked --> | +where -where *B* is the **base**, i.e. the **base state** block number of the last -imported/executed block. It also refers to the global state block number of -the ledger database. +* *B* is the **base state** stored on the persistent state database. *B* is + not addressed directly except upon start up or resuming sync when *B == L*. +* *L* is the last imported/executed block, typically up to the **canonical + consensus head**. -The headers corresponding to the half open interval `(B,C]` will be completed +The headers corresponding to the half open interval `(L,C]` will be completed by fetching block bodies and then import/execute them together with the already cached headers. @@ -260,9 +262,11 @@ be available if *nimbus* is compiled with the additional make flags |:-------------------|:------------:|:--------------------| | | | | | beacon_base | block height | **B**, *increasing* | +| beacon_latest | block height | **L**, *increasing* | | beacon_coupler | block height | **C**, *increasing* | | beacon_dangling | block height | **D** | -| beacon_end | block height | **E**, *increasing* | +| beacon_final | block height | **F**, *increasing* | +| beacon_head | block height | **H**, *increasing* | | beacon_target | block height | **T**, *increasing* | | | | | | beacon_header_lists_staged | size | # of staged header list records | diff --git a/nimbus/sync/beacon/worker.nim b/nimbus/sync/beacon/worker.nim index 10ca2f854e..a45882efc8 100644 --- a/nimbus/sync/beacon/worker.nim +++ b/nimbus/sync/beacon/worker.nim @@ -8,7 +8,6 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. - {.push raises:[].} import @@ -112,8 +111,8 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} = debug info # Check for a possible header layout and body request changes - discard ctx.updateLinkedHChainsLayout info - discard ctx.updateBlockRequests info + ctx.updateSyncStateLayout info + ctx.updateBlockRequests info # Execute staged block records. if ctx.blocksStagedCanImportOk(): @@ -127,13 +126,10 @@ proc runDaemon*(ctx: BeaconCtxRef) {.async.} = defer: ctx.pool.importRunningOk = false # Import from staged queue. - while ctx.blocksStagedImport info: + while await ctx.blocksStagedImport(info): ctx.updateMetrics() - # Allow pseudo/async thread switch - await sleepAsync asyncThreadSwitchTimeSlot - - # At the end of the cycle, leave time to refill headers/blocks + # At the end of the cycle, leave time to trigger refill headers/blocks await sleepAsync daemonWaitInterval ctx.updateMetrics() @@ -174,12 +170,16 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} = trace info, peer, nInvocations=buddy.only.nMultiLoop, lastIdleGap=buddy.only.multiRunIdle.toStr + # Update consensus header target when needed. It comes with a finalised + # header hash where we need to complete the block number. + await buddy.headerStagedUpdateTarget info + if not await buddy.napUnlessSomethingToFetch info: # # Layout of a triple of linked header chains (see `README.md`) # :: - # 0 C D E - # | <--- [0,C] --> | <----- (C,D) -----> | <-- [D,E] ---> | + # 0 C D H + # | <--- [0,C] --> | <----- (C,D) -----> | <-- [D,H] ---> | # o----------------o---------------------o----------------o---> # | <-- linked --> | <-- unprocessed --> | <-- linked --> | # @@ -194,7 +194,7 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} = # # The block numbers range concurrently taken from `(C,D)` are chosen # from the upper range. So exactly one of the actors has a range - # `[whatever,D-1]` adjacent to `[D,E]`. Call this actor the lead actor. + # `[whatever,D-1]` adjacent to `[D,H]`. Call this actor the lead actor. # # For the lead actor, headers can be downloaded all by the hashes as # the parent hash for the header with block number `D` is known. All @@ -217,7 +217,7 @@ proc runPeer*(buddy: BeaconBuddyRef) {.async.} = if await buddy.headersStagedCollect info: # * Save updated state and headers - # * Decrease the dangling left boundary `D` of the trusted range `[D,E]` + # * Decrease the dangling left boundary `D` of the trusted range `[D,H]` discard buddy.ctx.headersStagedProcess info # Fetch bodies and combine them with headers to blocks to be staged. These diff --git a/nimbus/sync/beacon/worker/blocks_staged.nim b/nimbus/sync/beacon/worker/blocks_staged.nim index 90921664f4..c9528fcfaa 100644 --- a/nimbus/sync/beacon/worker/blocks_staged.nim +++ b/nimbus/sync/beacon/worker/blocks_staged.nim @@ -26,6 +26,13 @@ logScope: # Private functions # ------------------------------------------------------------------------------ +func getNthHash(blk: BlocksForImport; n: int): Hash32 = + if n + 1 < blk.blocks.len: + blk.blocks[n + 1].header.parentHash + else: + rlp.encode(blk.blocks[n].header).keccak256 + + proc fetchAndCheck( buddy: BeaconBuddyRef; ivReq: BnRange; @@ -218,38 +225,73 @@ proc blocksStagedCollect*( return true -proc blocksStagedImport*(ctx: BeaconCtxRef; info: static[string]): bool = +proc blocksStagedImport*( + ctx: BeaconCtxRef; + info: static[string]; + ): Future[bool] + {.async.} = ## Import/execute blocks record from staged queue ## let qItem = ctx.blk.staged.ge(0).valueOr: return false # Fetch least record, accept only if it matches the global ledger state - let base = ctx.dbStateBlockNumber() - if qItem.key != base + 1: - trace info & ": there is a gap", B=base.bnStr, stagedBottom=qItem.key.bnStr - return false + block: + let imported = ctx.chain.latestNumber() + if qItem.key != imported + 1: + trace info & ": there is a gap L vs. staged", + B=ctx.chain.baseNumber.bnStr, L=imported.bnStr, staged=qItem.key.bnStr + return false # Remove from queue discard ctx.blk.staged.delete qItem.key - # Execute blocks - let stats = ctx.pool.chain.persistBlocks(qItem.data.blocks).valueOr: - # FIXME: should that be rather an `raiseAssert` here? - warn info & ": block exec error", B=base.bnStr, - iv=BnRange.new(qItem.key,qItem.key+qItem.data.blocks.len.uint64-1), - error=error - doAssert base == ctx.dbStateBlockNumber() - return false - - trace info & ": imported staged blocks", B=ctx.dbStateBlockNumber.bnStr, - first=qItem.key.bnStr, stats + let + nBlocks = qItem.data.blocks.len + iv = BnRange.new(qItem.key, qItem.key + nBlocks.uint64 - 1) + + var maxImport = iv.maxPt + for n in 0 ..< nBlocks: + let nBn = qItem.data.blocks[n].header.number + ctx.pool.chain.importBlock(qItem.data.blocks[n]).isOkOr: + warn info & ": import block error", iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + nBn=nBn.bnStr, txLevel=ctx.chain.db.level, `error`=error + # Restore what is left over below + maxImport = ctx.chain.latestNumber() + break - # Remove stashed headers - for bn in qItem.key ..< qItem.key + qItem.data.blocks.len.uint64: + # Occasionally mark the chain finalized + if (n + 1) mod finaliserChainLengthMax == 0 or (n + 1) == nBlocks: + let + nHash = qItem.data.getNthHash(n) + finHash = if nBn < ctx.layout.final: nHash else: ctx.layout.finalHash + + doAssert nBn == ctx.chain.latestNumber() + ctx.pool.chain.forkChoice(headHash=nHash, finalizedHash=finHash).isOkOr: + warn info & ": fork choice error", iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level, nHash, + finHash=(if finHash == nHash: "nHash" else: "F"), `error`=error + # Restore what is left over below + maxImport = ctx.chain.latestNumber() + break + + # Allow pseudo/async thread switch. + await sleepAsync asyncThreadSwitchTimeSlot + + # Import probably incomplete, so a partial roll back may be needed + if maxImport < iv.maxPt: + ctx.blocksUnprocCommit(0, maxImport+1, qItem.data.blocks[^1].header.number) + + # Remove stashed headers for imported blocks + for bn in iv.minPt .. maxImport: ctx.dbUnstashHeader bn - true + trace info & ": import done", iv, + B=ctx.chain.baseNumber.bnStr, L=ctx.chain.latestNumber.bnStr, + F=ctx.layout.final.bnStr, txLevel=ctx.chain.db.level + return true func blocksStagedBottomKey*(ctx: BeaconCtxRef): BlockNumber = diff --git a/nimbus/sync/beacon/worker/blocks_unproc.nim b/nimbus/sync/beacon/worker/blocks_unproc.nim index 17b7c686d3..b837637f63 100644 --- a/nimbus/sync/beacon/worker/blocks_unproc.nim +++ b/nimbus/sync/beacon/worker/blocks_unproc.nim @@ -112,6 +112,18 @@ proc blocksUnprocInit*(ctx: BeaconCtxRef) = ## Constructor ctx.blk.unprocessed = BnRangeSet.init() +proc blocksUnprocSet*(ctx: BeaconCtxRef) = + ## Clear + ctx.blk.unprocessed.clear() + ctx.blk.borrowed = 0u + +proc blocksUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) = + ## Set up new unprocessed range + ctx.blocksUnprocSet() + # Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)` + if minPt <= maxPt: + discard ctx.blk.unprocessed.merge(minPt, maxPt) + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/db.nim b/nimbus/sync/beacon/worker/db.nim index 1ec0bacbf7..cb8741ffbc 100644 --- a/nimbus/sync/beacon/worker/db.nim +++ b/nimbus/sync/beacon/worker/db.nim @@ -15,10 +15,9 @@ import pkg/eth/[common, rlp], pkg/stew/[byteutils, interval_set, sorted_set], pkg/results, - ../../../db/storage_types, - ../../../common, + "../../.."/[common, core/chain, db/storage_types], ../worker_desc, - ./headers_unproc + "."/[blocks_unproc, headers_unproc] logScope: topics = "beacon db" @@ -43,37 +42,24 @@ formatIt(Hash32): # Private helpers # ------------------------------------------------------------------------------ -proc fetchLinkedHChainsLayout(ctx: BeaconCtxRef): Opt[LinkedHChainsLayout] = +proc fetchSyncStateLayout(ctx: BeaconCtxRef): Opt[SyncStateLayout] = let data = ctx.db.ctx.getKvt().get(LhcStateKey.toOpenArray).valueOr: return err() try: - result = ok(rlp.decode(data, LinkedHChainsLayout)) + return ok(rlp.decode(data, SyncStateLayout)) except RlpError: - return err() - - -proc fetchSavedState(ctx: BeaconCtxRef): Opt[SavedDbStateSpecs] = - let db = ctx.db - var val: SavedDbStateSpecs - val.number = db.getSavedStateBlockNumber() - - if db.getBlockHash(val.number, val.hash): - var header: Header - if db.getBlockHeader(val.hash, header): - val.parent = header.parentHash - return ok(val) - + discard err() # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc dbStoreLinkedHChainsLayout*(ctx: BeaconCtxRef): bool = +proc dbStoreSyncStateLayout*(ctx: BeaconCtxRef) = ## Save chain layout to persistent db - const info = "dbStoreLinkedHChainsLayout" - if ctx.layout == ctx.lhc.lastLayout: - return false + const info = "dbStoreSyncStateLayout" + if ctx.layout == ctx.sst.lastLayout: + return let data = rlp.encode(ctx.layout) ctx.db.ctx.getKvt().put(LhcStateKey.toOpenArray, data).isOkOr: @@ -86,42 +72,54 @@ proc dbStoreLinkedHChainsLayout*(ctx: BeaconCtxRef): bool = let number = ctx.db.getSavedStateBlockNumber() ctx.db.persistent(number).isOkOr: debug info & ": failed to save persistently", error=($$error) - return false + return else: trace info & ": not saved, tx pending", txLevel - return false + return trace info & ": saved pesistently on DB" - true -proc dbLoadLinkedHChainsLayout*(ctx: BeaconCtxRef) = +proc dbLoadSyncStateLayout*(ctx: BeaconCtxRef) = ## Restore chain layout from persistent db const info = "dbLoadLinkedHChainsLayout" - let rc = ctx.fetchLinkedHChainsLayout() + let + rc = ctx.fetchSyncStateLayout() + latest = ctx.chain.latestNumber() + if rc.isOk: - ctx.lhc.layout = rc.value - let (uMin,uMax) = (rc.value.coupler+1, rc.value.dangling-1) - if uMin <= uMax: - # Add interval of unprocessed block range `(C,D)` from `README.md` - ctx.headersUnprocSet(uMin, uMax) - trace info & ": restored layout", C=rc.value.coupler.bnStr, - D=rc.value.dangling.bnStr, E=rc.value.endBn.bnStr + ctx.sst.layout = rc.value + + # Add interval of unprocessed block range `(L,C]` from `README.md` + ctx.blocksUnprocSet(latest+1, ctx.layout.coupler) + ctx.blk.topRequest = ctx.layout.coupler + + # Add interval of unprocessed header range `(C,D)` from `README.md` + ctx.headersUnprocSet(ctx.layout.coupler+1, ctx.layout.dangling-1) + + trace info & ": restored layout", L=latest.bnStr, + C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr, + F=ctx.layout.final.bnStr, H=ctx.layout.head.bnStr + else: - let val = ctx.fetchSavedState().expect "saved states" - ctx.lhc.layout = LinkedHChainsLayout( - coupler: val.number, - couplerHash: val.hash, - dangling: val.number, - danglingParent: val.parent, - endBn: val.number, - endHash: val.hash) - trace info & ": new layout", B=val.number, C=rc.value.coupler.bnStr, - D=rc.value.dangling.bnStr, E=rc.value.endBn.bnStr + let + latestHash = ctx.chain.latestHash() + latestParent = ctx.chain.latestHeader.parentHash + + ctx.sst.layout = SyncStateLayout( + coupler: latest, + couplerHash: latestHash, + dangling: latest, + danglingParent: latestParent, + final: latest, + finalHash: latestHash, + head: latest, + headHash: latestHash) - ctx.lhc.lastLayout = ctx.layout + trace info & ": new layout", L="C", C="D", D="F", F="H", H=latest.bnStr + ctx.sst.lastLayout = ctx.layout # ------------------ @@ -170,13 +168,6 @@ proc dbUnstashHeader*(ctx: BeaconCtxRef; bn: BlockNumber) = ## Remove header from temporary DB list discard ctx.db.ctx.getKvt().del(beaconHeaderKey(bn).toOpenArray) -# ------------------ - -proc dbStateBlockNumber*(ctx: BeaconCtxRef): BlockNumber = - ## Currently only a wrapper around the function returning the current - ## database state block number - ctx.db.getSavedStateBlockNumber() - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/beacon/worker/headers_staged.nim b/nimbus/sync/beacon/worker/headers_staged.nim index 3a70dc4739..f4f2434d67 100644 --- a/nimbus/sync/beacon/worker/headers_staged.nim +++ b/nimbus/sync/beacon/worker/headers_staged.nim @@ -54,6 +54,33 @@ proc fetchAndCheck( # Public functions # ------------------------------------------------------------------------------ +proc headerStagedUpdateTarget*( + buddy: BeaconBuddyRef; + info: static[string]; + ) {.async.} = + ## Fetch finalised beacon header if there is an update available + let ctx = buddy.ctx + if not ctx.layout.headLocked and + ctx.target.final == 0 and + ctx.target.finalHash != zeroHash32 and + not ctx.target.locked: + const iv = BnRange.new(1u,1u) # dummy interval + + ctx.target.locked = true + let rc = await buddy.headersFetchReversed(iv, ctx.target.finalHash, info) + ctx.target.locked = false + + if rc.isOK: + let hash = rlp.encode(rc.value[0]).keccak256 + if hash != ctx.target.finalHash: + # Oops + buddy.ctrl.zombie = true + trace info & ": finalised header hash mismatch", peer=buddy.peer, hash, + expected=ctx.target.finalHash + else: + ctx.target.final = rc.value[0].number + + proc headersStagedCollect*( buddy: BeaconBuddyRef; info: static[string]; @@ -86,7 +113,7 @@ proc headersStagedCollect*( iv = ctx.headersUnprocFetch(nFetchHeadersBatch).expect "valid interval" # Check for top header hash. If the range to fetch directly joins below - # the top level linked chain `[D,E]`, then there is the hash available for + # the top level linked chain `[D,H]`, then there is the hash available for # the top level header to fetch. Otherwise -- with multi-peer mode -- the # range of headers is fetched opportunistically using block numbers only. isOpportunistic = uTop + 1 < ctx.layout.dangling @@ -158,13 +185,13 @@ proc headersStagedCollect*( break # Store `lhc` chain on the `staged` queue - let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr: + let qItem = ctx.hdr.staged.insert(iv.maxPt).valueOr: raiseAssert info & ": duplicate key on staged queue iv=" & $iv qItem.data = lhc[] trace info & ": staged headers", peer, topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len, - nStaged=ctx.lhc.staged.len, isOpportunistic, ctrl=buddy.ctrl.state + nStaged=ctx.hdr.staged.len, isOpportunistic, ctrl=buddy.ctrl.state return true @@ -175,7 +202,7 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = ## of records processed and saved. while true: # Fetch largest block - let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr: + let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr: trace info & ": no staged headers", error=error break # all done @@ -192,7 +219,7 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = # Process item from `staged` queue. So it is not needed in the list, # anymore. - discard ctx.lhc.staged.delete(iv.maxPt) + discard ctx.hdr.staged.delete(iv.maxPt) if qItem.data.hash != ctx.layout.danglingParent: # Discard wrong chain and merge back the range into the `unproc` list. @@ -205,14 +232,14 @@ proc headersStagedProcess*(ctx: BeaconCtxRef; info: static[string]): int = ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs) ctx.layout.dangling = iv.minPt ctx.layout.danglingParent = qItem.data.parentHash - discard ctx.dbStoreLinkedHChainsLayout() + ctx.dbStoreSyncStateLayout() result.inc # count records trace info & ": staged records saved", - nStaged=ctx.lhc.staged.len, nSaved=result + nStaged=ctx.hdr.staged.len, nSaved=result - if headersStagedQueueLengthLwm < ctx.lhc.staged.len: + if headersStagedQueueLengthLwm < ctx.hdr.staged.len: ctx.poolMode = true @@ -227,14 +254,14 @@ func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = ## (downloading deterministically by hashes) and many fast opportunistic ## actors filling the staged queue. ## - if ctx.lhc.staged.len == 0: + if ctx.hdr.staged.len == 0: # nothing to do return # Update counter ctx.pool.nReorg.inc - let nStaged = ctx.lhc.staged.len + let nStaged = ctx.hdr.staged.len if headersStagedQueueLengthHwm < nStaged: trace info & ": hwm reached, flushing staged queue", nStaged, max=headersStagedQueueLengthLwm @@ -244,32 +271,32 @@ func headersStagedReorg*(ctx: BeaconCtxRef; info: static[string]) = # remain. for _ in 0 .. nStaged - headersStagedQueueLengthLwm: let - qItem = ctx.lhc.staged.ge(BlockNumber 0).expect "valid record" + qItem = ctx.hdr.staged.ge(BlockNumber 0).expect "valid record" key = qItem.key nHeaders = qItem.data.revHdrs.len.uint64 ctx.headersUnprocCommit(0, key - nHeaders + 1, key) - discard ctx.lhc.staged.delete key + discard ctx.hdr.staged.delete key func headersStagedTopKey*(ctx: BeaconCtxRef): BlockNumber = ## Retrieve to staged block number - let qItem = ctx.lhc.staged.le(high BlockNumber).valueOr: + let qItem = ctx.hdr.staged.le(high BlockNumber).valueOr: return BlockNumber(0) qItem.key func headersStagedQueueLen*(ctx: BeaconCtxRef): int = ## Number of staged records - ctx.lhc.staged.len + ctx.hdr.staged.len func headersStagedQueueIsEmpty*(ctx: BeaconCtxRef): bool = ## `true` iff no data are on the queue. - ctx.lhc.staged.len == 0 + ctx.hdr.staged.len == 0 # ---------------- func headersStagedInit*(ctx: BeaconCtxRef) = ## Constructor - ctx.lhc.staged = LinkedHChainQueue.init() + ctx.hdr.staged = LinkedHChainQueue.init() # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/beacon/worker/headers_unproc.nim b/nimbus/sync/beacon/worker/headers_unproc.nim index 0a2730781f..e4bfb078e9 100644 --- a/nimbus/sync/beacon/worker/headers_unproc.nim +++ b/nimbus/sync/beacon/worker/headers_unproc.nim @@ -28,7 +28,7 @@ proc headersUnprocFetch*( ## `0` is interpreted as `2^64`. ## let - q = ctx.lhc.unprocessed + q = ctx.hdr.unprocessed # Fetch top/right interval with largest block numbers jv = q.le().valueOr: @@ -49,18 +49,18 @@ proc headersUnprocFetch*( BnRange.new(jv.maxPt - maxLen + 1, jv.maxPt) discard q.reduce(iv) - ctx.lhc.borrowed += iv.len + ctx.hdr.borrowed += iv.len ok(iv) proc headersUnprocCommit*(ctx: BeaconCtxRef; borrowed: uint) = ## Commit back all processed range - ctx.lhc.borrowed -= borrowed + ctx.hdr.borrowed -= borrowed proc headersUnprocCommit*(ctx: BeaconCtxRef; borrowed: uint; retuor: BnRange) = ## Merge back unprocessed range `retour` ctx.headersUnprocCommit borrowed - doAssert ctx.lhc.unprocessed.merge(retuor) == retuor.len + doAssert ctx.hdr.unprocessed.merge(retuor) == retuor.len proc headersUnprocCommit*( ctx: BeaconCtxRef; @@ -69,7 +69,7 @@ proc headersUnprocCommit*( rMaxPt: BlockNumber) = ## Variant of `headersUnprocCommit()` ctx.headersUnprocCommit borrowed - doAssert ctx.lhc.unprocessed.merge(rMinPt, rMaxPt) == rMaxPt - rMinPt + 1 + doAssert ctx.hdr.unprocessed.merge(rMinPt, rMaxPt) == rMaxPt - rMinPt + 1 @@ -81,53 +81,48 @@ proc headersUnprocCovered*( ## Check whether range is fully contained # Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)` if minPt <= maxPt: - return ctx.lhc.unprocessed.covered(minPt, maxPt) + return ctx.hdr.unprocessed.covered(minPt, maxPt) proc headersUnprocCovered*(ctx: BeaconCtxRef; pt: BlockNumber): bool = ## Check whether point is contained - ctx.lhc.unprocessed.covered(pt, pt) == 1 + ctx.hdr.unprocessed.covered(pt, pt) == 1 proc headersUnprocTop*(ctx: BeaconCtxRef): BlockNumber = - let iv = ctx.lhc.unprocessed.le().valueOr: + let iv = ctx.hdr.unprocessed.le().valueOr: return BlockNumber(0) iv.maxPt proc headersUnprocTotal*(ctx: BeaconCtxRef): uint64 = - ctx.lhc.unprocessed.total() + ctx.hdr.unprocessed.total() proc headersUnprocBorrowed*(ctx: BeaconCtxRef): uint64 = - ctx.lhc.borrowed + ctx.hdr.borrowed proc headersUnprocChunks*(ctx: BeaconCtxRef): int = - ctx.lhc.unprocessed.chunks() + ctx.hdr.unprocessed.chunks() proc headersUnprocIsEmpty*(ctx: BeaconCtxRef): bool = - ctx.lhc.unprocessed.chunks() == 0 + ctx.hdr.unprocessed.chunks() == 0 # ------------ proc headersUnprocInit*(ctx: BeaconCtxRef) = ## Constructor - ctx.lhc.unprocessed = BnRangeSet.init() + ctx.hdr.unprocessed = BnRangeSet.init() proc headersUnprocSet*(ctx: BeaconCtxRef) = ## Clear - ctx.lhc.unprocessed.clear() - ctx.lhc.borrowed = 0u - -proc headersUnprocSet*(ctx: BeaconCtxRef; iv: BnRange) = - ## Set up new unprocessed range - ctx.headersUnprocSet() - discard ctx.lhc.unprocessed.merge(iv) + ctx.hdr.unprocessed.clear() + ctx.hdr.borrowed = 0u proc headersUnprocSet*(ctx: BeaconCtxRef; minPt, maxPt: BlockNumber) = ## Set up new unprocessed range ctx.headersUnprocSet() # Argument `maxPt` would be internally adjusted to `max(minPt,maxPt)` if minPt <= maxPt: - discard ctx.lhc.unprocessed.merge(minPt, maxPt) + discard ctx.hdr.unprocessed.merge(minPt, maxPt) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/beacon/worker/helpers.nim b/nimbus/sync/beacon/worker/helpers.nim index cc41031244..b55c25a716 100644 --- a/nimbus/sync/beacon/worker/helpers.nim +++ b/nimbus/sync/beacon/worker/helpers.nim @@ -14,12 +14,26 @@ import pkg/chronos, - pkg/eth/common + pkg/eth/common, + pkg/stew/interval_set -proc bnStr*(w: BlockNumber): string = +func bnStr*(w: BlockNumber): string = "#" & $w +func bnStr*(h: Header): string = + h.number.bnStr + +func bnStr*(b: EthBlock): string = + b.header.bnStr + +func bnStr*(w: Interval[BlockNumber,uint64]): string = + if w.len == 1: w.minPt.bnStr else: w.minPt.bnStr & ".." & w.maxPt.bnStr + func toStr*(a: chronos.Duration): string = a.toString 2 + +proc `$`*(w: Interval[BlockNumber,uint64]): string = + w.bnStr + # End diff --git a/nimbus/sync/beacon/worker/start_stop.nim b/nimbus/sync/beacon/worker/start_stop.nim index 6f7a34fb3e..359e7baa88 100644 --- a/nimbus/sync/beacon/worker/start_stop.nim +++ b/nimbus/sync/beacon/worker/start_stop.nim @@ -29,12 +29,15 @@ when enableTicker: ## Legacy stuff, will be probably be superseded by `metrics` return proc: auto = TickerStats( - base: ctx.dbStateBlockNumber(), + stored: ctx.db.getSavedStateBlockNumber(), + base: ctx.chain.baseNumber(), + latest: ctx.chain.latestNumber(), coupler: ctx.layout.coupler, dangling: ctx.layout.dangling, - endBn: ctx.layout.endBn, - target: ctx.lhc.target.header.number, - newTargetOk: ctx.lhc.target.changed, + final: ctx.layout.final, + head: ctx.layout.head, + target: ctx.target.consHead.number, + targetOk: ctx.target.final != 0, nHdrStaged: ctx.headersStagedQueueLen(), hdrStagedTop: ctx.headersStagedTopKey(), @@ -54,11 +57,14 @@ when enableTicker: proc updateBeaconHeaderCB(ctx: BeaconCtxRef): ReqBeaconSyncTargetCB = ## Update beacon header. This function is intended as a call back function ## for the RPC module. - return proc(h: Header) {.gcsafe, raises: [].} = + return proc(h: Header; f: Hash32) {.gcsafe, raises: [].} = # Rpc checks empty header against a zero hash rather than `emptyRoot` - if ctx.lhc.target.header.number < h.number: - ctx.lhc.target.header = h - ctx.lhc.target.changed = true + if not ctx.target.locked: + if f != zeroHash32 and ctx.target.consHead.number < h.number: + ctx.target.consHead = h + ctx.target.final = BlockNumber(0) + ctx.target.finalHash = f + ctx.target.changed = true # ------------------------------------------------------------------------------ # Public functions @@ -90,7 +96,7 @@ proc setupDatabase*(ctx: BeaconCtxRef) = ctx.blocksUnprocInit() # Load initial state from database if there is any - ctx.dbLoadLinkedHChainsLayout() + ctx.dbLoadSyncStateLayout() # Set blocks batch import value for `persistBlocks()` if ctx.pool.nBodiesBatch < nFetchBodiesRequest: diff --git a/nimbus/sync/beacon/worker/start_stop/ticker.nim b/nimbus/sync/beacon/worker/start_stop/ticker.nim index b4afe3406a..84938ff29c 100644 --- a/nimbus/sync/beacon/worker/start_stop/ticker.nim +++ b/nimbus/sync/beacon/worker/start_stop/ticker.nim @@ -26,12 +26,15 @@ type TickerStats* = object ## Full sync state (see `TickerFullStatsUpdater`) + stored*: BlockNumber base*: BlockNumber + latest*: BlockNumber coupler*: BlockNumber dangling*: BlockNumber - endBn*: BlockNumber + final*: BlockNumber + head*: BlockNumber target*: BlockNumber - newTargetOk*: bool + targetOk*: bool hdrUnprocTop*: BlockNumber nHdrUnprocessed*: uint64 @@ -73,11 +76,14 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} = if data != t.lastStats or tickerLogSuppressMax < (now - t.visited): let - B = if data.base == data.coupler: "C" else: data.base.bnStr + S = data.stored.bnStr + B = if data.base == data.latest: "L" else: data.base.bnStr + L = if data.latest == data.coupler: "C" else: data.latest.bnStr C = if data.coupler == data.dangling: "D" else: data.coupler.bnStr - D = if data.dangling == data.endBn: "E" else: data.dangling.bnStr - E = if data.endBn == data.target: "T" else: data.endBn.bnStr - T = if data.newTargetOk: "?" & $data.target else: data.target.bnStr + D = if data.dangling == data.final: "F" else: data.dangling.bnStr + F = if data.final == data.head: "H" else: data.final.bnStr + H = if data.head == data.target: "T" else: data.head.bnStr + T = if data.targetOk: data.target.bnStr else: "?" & $data.target hS = if data.nHdrStaged == 0: "n/a" else: data.hdrStagedTop.bnStr & "(" & $data.nHdrStaged & ")" @@ -91,7 +97,7 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} = else: data.blkUnprocTop.bnStr & "(" & data.nBlkUnprocessed.toSI & "," & $data.nBlkUnprocFragm & ")" - reorg = data.reorg + rrg = data.reorg peers = data.nBuddies # With `int64`, there are more than 29*10^10 years range for seconds @@ -101,7 +107,13 @@ proc tickerLogger(t: TickerRef) {.gcsafe.} = t.lastStats = data t.visited = now - info "Sync state", up, peers, B, C, D, E, T, hS, hU, bS, bU, reorg, mem + if data.stored == data.base: + info "Sync state", up, peers, + B, L, C, D, F, H, T, hS, hU, bS, bU, rrg, mem + else: + info "Sync state", up, peers, + S=data.stored.bnStr, + B, L, C, D, F, H, T, hS, hU, bS, bU, rrg, mem # ------------------------------------------------------------------------------ # Private functions: ticking log messages diff --git a/nimbus/sync/beacon/worker/update.nim b/nimbus/sync/beacon/worker/update.nim index 129e1c3d75..aab532a6f2 100644 --- a/nimbus/sync/beacon/worker/update.nim +++ b/nimbus/sync/beacon/worker/update.nim @@ -14,6 +14,7 @@ import pkg/[chronicles, chronos], pkg/eth/[common, rlp], pkg/stew/sorted_set, + ../../../core/chain, ../worker_desc, ./update/metrics, "."/[blocks_unproc, db, headers_staged, headers_unproc] @@ -25,59 +26,62 @@ logScope: # Private functions # ------------------------------------------------------------------------------ -proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]): bool = +proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]) = ## ## Layout (see (3) in README): ## :: - ## 0 C==D==E T + ## 0 C==D==H T ## o----------------o---------------------o----> ## | <-- linked --> | ## ## or ## :: - ## 0==T C==D==E + ## 0==T C==D==H ## o----------------o--------------------------> ## | <-- linked --> | ## - ## with `T == target.header.number` or `T == 0` + ## with `T == target.consHead.number` or `T == 0` ## ## to be updated to ## :: - ## 0 C==D D'==E' + ## 0 C==D D'==H' ## o----------------o---------------------o----> ## | <-- linked --> | <-- unprocessed --> | ## - var target = ctx.lhc.target.header.number + var target = ctx.target.consHead.number - # Need: `E < T` and `C == D` - if target != 0 and target <= ctx.layout.endBn: # violates `E < T` - trace info & ": not applicable", E=ctx.layout.endBn.bnStr, T=target.bnStr - return false + # Need: `H < T` and `C == D` + if target != 0 and target <= ctx.layout.head: # violates `H < T` + trace info & ": not applicable", H=ctx.layout.head.bnStr, T=target.bnStr + return if ctx.layout.coupler != ctx.layout.dangling: # violates `C == D` trace info & ": not applicable", C=ctx.layout.coupler.bnStr, D=ctx.layout.dangling.bnStr - return false + return - # Check consistency: `C == D <= E` for maximal `C` => `D == E` - doAssert ctx.layout.dangling == ctx.layout.endBn + # Check consistency: `C == D <= H` for maximal `C` => `D == H` + doAssert ctx.layout.dangling == ctx.layout.head - let rlpHeader = rlp.encode(ctx.lhc.target.header) + let rlpHeader = rlp.encode(ctx.target.consHead) - ctx.lhc.layout = LinkedHChainsLayout( + ctx.sst.layout = SyncStateLayout( coupler: ctx.layout.coupler, couplerHash: ctx.layout.couplerHash, dangling: target, - danglingParent: ctx.lhc.target.header.parentHash, - endBn: target, - endHash: rlpHeader.keccak256) + danglingParent: ctx.target.consHead.parentHash, + final: ctx.target.final, + finalHash: ctx.target.finalHash, + head: target, + headHash: rlpHeader.keccak256, + headLocked: true) # Save this header on the database so it needs not be fetched again from # somewhere else. ctx.dbStashHeaders(target, @[rlpHeader]) # Save state - discard ctx.dbStoreLinkedHChainsLayout() + ctx.dbStoreSyncStateLayout() # Update range doAssert ctx.headersUnprocTotal() == 0 @@ -87,78 +91,83 @@ proc updateTargetChange(ctx: BeaconCtxRef; info: static[string]): bool = trace info & ": updated", C=ctx.layout.coupler.bnStr, uTop=ctx.headersUnprocTop(), - D=ctx.layout.dangling.bnStr, E=ctx.layout.endBn.bnStr, T=target.bnStr - true + D=ctx.layout.dangling.bnStr, H=ctx.layout.head.bnStr, T=target.bnStr -proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]): bool = +proc mergeAdjacentChains(ctx: BeaconCtxRef; info: static[string]) = ## Merge if `C+1` == `D` ## - if ctx.lhc.layout.coupler+1 < ctx.lhc.layout.dangling or # gap btw. `C` & `D` - ctx.lhc.layout.coupler == ctx.lhc.layout.dangling: # merged already - return false + if ctx.layout.coupler+1 < ctx.layout.dangling or # gap btw. `C` & `D` + ctx.layout.coupler == ctx.layout.dangling: # merged already + return # No overlap allowed! - doAssert ctx.lhc.layout.coupler+1 == ctx.lhc.layout.dangling + doAssert ctx.layout.coupler+1 == ctx.layout.dangling # Verify adjacent chains - if ctx.lhc.layout.couplerHash != ctx.lhc.layout.danglingParent: + if ctx.layout.couplerHash != ctx.layout.danglingParent: # FIXME: Oops -- any better idea than to defect? raiseAssert info & ": hashes do not match" & - " C=" & ctx.lhc.layout.coupler.bnStr & - " D=" & $ctx.lhc.layout.dangling.bnStr + " C=" & ctx.layout.coupler.bnStr & " D=" & $ctx.layout.dangling.bnStr - trace info & ": merging", C=ctx.lhc.layout.coupler.bnStr, - D=ctx.lhc.layout.dangling.bnStr + trace info & ": merging", C=ctx.layout.coupler.bnStr, + D=ctx.layout.dangling.bnStr # Merge adjacent linked chains - ctx.lhc.layout = LinkedHChainsLayout( - coupler: ctx.layout.endBn, # `C` - couplerHash: ctx.layout.endHash, - dangling: ctx.layout.endBn, # `D` - danglingParent: ctx.dbPeekParentHash(ctx.layout.endBn).expect "Hash32", - endBn: ctx.layout.endBn, # `E` - endHash: ctx.layout.endHash) + ctx.sst.layout = SyncStateLayout( + coupler: ctx.layout.head, # `C` + couplerHash: ctx.layout.headHash, + dangling: ctx.layout.head, # `D` + danglingParent: ctx.dbPeekParentHash(ctx.layout.head).expect "Hash32", + final: ctx.layout.final, # `F` + finalHash: ctx.layout.finalHash, + head: ctx.layout.head, # `H` + headHash: ctx.layout.headHash, + headLocked: ctx.layout.headLocked) # Save state - discard ctx.dbStoreLinkedHChainsLayout() - - true + ctx.dbStoreSyncStateLayout() # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc updateLinkedHChainsLayout*(ctx: BeaconCtxRef; info: static[string]): bool = +proc updateSyncStateLayout*(ctx: BeaconCtxRef; info: static[string]) = ## Update layout + # Check whether the target has been reached. In that case, unlock the + # consensus head `H` from the current layout so that it can be updated + # in time. + if ctx.layout.headLocked: + # So we have a session + let latest= ctx.chain.latestNumber() + if ctx.layout.head <= latest: + doAssert ctx.layout.head == latest + ctx.layout.headLocked = false + # Check whether there is something to do regarding beacon node change - if ctx.lhc.target.changed: - ctx.lhc.target.changed = false - result = ctx.updateTargetChange info + if not ctx.layout.headLocked and ctx.target.changed and ctx.target.final != 0: + ctx.target.changed = false + ctx.updateTargetChange info # Check whether header downloading is done - if ctx.mergeAdjacentChains info: - result = true + ctx.mergeAdjacentChains info -proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]): bool = +proc updateBlockRequests*(ctx: BeaconCtxRef; info: static[string]) = ## Update block requests if there staged block queue is empty - let base = ctx.dbStateBlockNumber() - if base < ctx.layout.coupler: # so half open interval `(B,C]` is not empty + let latest = ctx.chain.latestNumber() + if latest < ctx.layout.coupler: # so half open interval `(L,C]` is not empty - # One can fill/import/execute blocks by number from `(B,C]` + # One can fill/import/execute blocks by number from `(L,C]` if ctx.blk.topRequest < ctx.layout.coupler: # So there is some space - trace info & ": updating", B=base.bnStr, topReq=ctx.blk.topRequest.bnStr, - C=ctx.layout.coupler.bnStr + trace info & ": updating", L=latest.bnStr, + topReq=ctx.blk.topRequest.bnStr, C=ctx.layout.coupler.bnStr ctx.blocksUnprocCommit( - 0, max(base, ctx.blk.topRequest) + 1, ctx.layout.coupler) + 0, max(latest, ctx.blk.topRequest) + 1, ctx.layout.coupler) ctx.blk.topRequest = ctx.layout.coupler - return true - - false proc updateMetrics*(ctx: BeaconCtxRef) = diff --git a/nimbus/sync/beacon/worker/update/metrics.nim b/nimbus/sync/beacon/worker/update/metrics.nim index c2f87216c4..5012defadd 100644 --- a/nimbus/sync/beacon/worker/update/metrics.nim +++ b/nimbus/sync/beacon/worker/update/metrics.nim @@ -12,12 +12,15 @@ import pkg/metrics, + ../../../../core/chain, ../../worker_desc, - ".."/[db, blocks_staged, headers_staged] - + ".."/[blocks_staged, headers_staged] declareGauge beacon_base, "" & - "Max block number of imported/executed blocks" + "Max block number of imported finalised blocks" + +declareGauge beacon_latest, "" & + "Block number of latest imported blocks" declareGauge beacon_coupler, "" & "Max block number for header chain starting at genesis" @@ -25,7 +28,10 @@ declareGauge beacon_coupler, "" & declareGauge beacon_dangling, "" & "Starting/min block number for higher up headers chain" -declareGauge beacon_end, "" & +declareGauge beacon_final, "" & + "Max number of finalised block in higher up headers chain" + +declareGauge beacon_head, "" & "Ending/max block number of higher up headers chain" declareGauge beacon_target, "" & @@ -50,11 +56,13 @@ declareGauge beacon_buddies, "" & template updateMetricsImpl*(ctx: BeaconCtxRef) = - metrics.set(beacon_base, ctx.dbStateBlockNumber().int64) + metrics.set(beacon_base, ctx.chain.baseNumber().int64) + metrics.set(beacon_latest, ctx.chain.latestNumber().int64) metrics.set(beacon_coupler, ctx.layout.coupler.int64) metrics.set(beacon_dangling, ctx.layout.dangling.int64) - metrics.set(beacon_end, ctx.layout.endBn.int64) - metrics.set(beacon_target, ctx.lhc.target.header.number.int64) + metrics.set(beacon_final, ctx.layout.final.int64) + metrics.set(beacon_head, ctx.layout.head.int64) + metrics.set(beacon_target, ctx.target.consHead.number.int64) metrics.set(beacon_header_lists_staged, ctx.headersStagedQueueLen()) metrics.set(beacon_headers_unprocessed, diff --git a/nimbus/sync/beacon/worker_config.nim b/nimbus/sync/beacon/worker_config.nim index 23d6ca5eaf..6a616332cd 100644 --- a/nimbus/sync/beacon/worker_config.nim +++ b/nimbus/sync/beacon/worker_config.nim @@ -121,6 +121,10 @@ const ## entry block number is too high and so leaves a gap to the ledger state ## block number.) + finaliserChainLengthMax* = 32 + ## When importing with `importBlock()`, finalise after at most this many + ## invocations of `importBlock()`. + # ---------------------- static: diff --git a/nimbus/sync/beacon/worker_desc.nim b/nimbus/sync/beacon/worker_desc.nim index 3c65abc988..b71046d201 100644 --- a/nimbus/sync/beacon/worker_desc.nim +++ b/nimbus/sync/beacon/worker_desc.nim @@ -54,39 +54,57 @@ type # ------------------- - LinkedHChainsLayout* = object - ## Layout of a linked header chains defined by the triple `(C,D,E)` as + SyncStateTarget* = object + ## Beacon state to be implicitely updated by RPC method + locked*: bool ## Don't update while fetching header + changed*: bool ## Tell that something has changed + consHead*: Header ## Consensus head + final*: BlockNumber ## Finalised block number + finalHash*: Hash32 ## Finalised hash + + SyncStateLayout* = object + ## Layout of a linked header chains defined by the triple `(C,D,H)` as ## described in the `README.md` text. ## :: - ## 0 C D E - ## o----------------o---------------------o----------------o---> - ## | <-- linked --> | <-- unprocessed --> | <-- linked --> | + ## 0 B L C D F H + ## o----------o-----o-------o---------------------o------------o---o---> + ## | <- imported -> | | | | + ## | <------ linked ------> | <-- unprocessed --> | <-- linked --> | + ## + ## Additional positions known but not declared in this descriptor: + ## * `B`: base state (from `forked_chain` importer) + ## * `L`: last imported block, canonical consensus head + ## * `F`: finalised head (from CL) ## coupler*: BlockNumber ## Right end `C` of linked chain `[0,C]` couplerHash*: Hash32 ## Hash of `C` - dangling*: BlockNumber ## Left end `D` of linked chain `[D,E]` + dangling*: BlockNumber ## Left end `D` of linked chain `[D,H]` danglingParent*: Hash32 ## Parent hash of `D` - endBn*: BlockNumber ## `E`, block num of some finalised block - endHash*: Hash32 ## Hash of `E` + final*: BlockNumber ## Finalised block number `F` + finalHash*: Hash32 ## Hash of `F` - TargetReqHeader* = object - ## Beacon state to be implicitely updated by RPC method - changed*: bool ## Set a marker if something has changed - header*: Header ## Beacon chain, finalised header + head*: BlockNumber ## `H`, block num of some finalised block + headHash*: Hash32 ## Hash of `H` + headLocked*: bool ## No need to update `H` yet + + SyncState* = object + ## Sync state for header and block chains + target*: SyncStateTarget ## Consensus head, see `T` in `README.md` + layout*: SyncStateLayout ## Current header chains layout + lastLayout*: SyncStateLayout ## Previous layout (for delta update) - LinkedHChainsSync* = object - ## Sync state for linked header chains - target*: TargetReqHeader ## Consensus head, see `T` in `README.md` + # ------------------- + + HeaderImportSync* = object + ## Header sync staging area unprocessed*: BnRangeSet ## Block or header ranges to fetch borrowed*: uint64 ## Total of temp. fetched ranges staged*: LinkedHChainQueue ## Blocks fetched but not stored yet - layout*: LinkedHChainsLayout ## Current header chains layout - lastLayout*: LinkedHChainsLayout ## Previous layout (for delta update) BlocksImportSync* = object - ## Sync state for blocks to import/execute + ## Block sync staging area unprocessed*: BnRangeSet ## Blocks download requested borrowed*: uint64 ## Total of temp. fetched ranges topRequest*: BlockNumber ## Max requested block number @@ -107,14 +125,15 @@ type BeaconCtxData* = object ## Globally shared data extension nBuddies*: int ## Number of active workers - lhcSyncState*: LinkedHChainsSync ## Syncing by linked header chains - blkSyncState*: BlocksImportSync ## For importing/executing blocks + syncState*: SyncState ## Save/resume state descriptor + hdrSync*: HeaderImportSync ## Syncing by linked header chains + blkSync*: BlocksImportSync ## For importing/executing blocks nextUpdate*: Moment ## For updating metrics # Blocks import/execution settings for running `persistBlocks()` with # `nBodiesBatch` blocks in each round (minimum value is # `nFetchBodiesRequest`.) - chain*: ChainRef + chain*: ForkedChainRef ## Database importRunningOk*: bool ## Advisory lock, fetch vs. import nBodiesBatch*: int ## Default `nFetchBodiesBatchDefault` blocksStagedQuLenMax*: int ## Default `blocksStagedQueueLenMaxDefault` @@ -136,29 +155,34 @@ type # Public helpers # ------------------------------------------------------------------------------ -func lhc*(ctx: BeaconCtxRef): var LinkedHChainsSync = +func sst*(ctx: BeaconCtxRef): var SyncState = + ## Shortcut + ctx.pool.syncState + +func hdr*(ctx: BeaconCtxRef): var HeaderImportSync = ## Shortcut - ctx.pool.lhcSyncState + ctx.pool.hdrSync func blk*(ctx: BeaconCtxRef): var BlocksImportSync = ## Shortcut - ctx.pool.blkSyncState + ctx.pool.blkSync -func layout*(ctx: BeaconCtxRef): var LinkedHChainsLayout = +func layout*(ctx: BeaconCtxRef): var SyncStateLayout = ## Shortcut - ctx.pool.lhcSyncState.layout + ctx.sst.layout + +func target*(ctx: BeaconCtxRef): var SyncStateTarget = + ## Shortcut + ctx.sst.target + +func chain*(ctx: BeaconCtxRef): ForkedChainRef = + ## Getter + ctx.pool.chain func db*(ctx: BeaconCtxRef): CoreDbRef = ## Getter ctx.pool.chain.db -# ------------------------------------------------------------------------------ -# Public logging/debugging helpers -# ------------------------------------------------------------------------------ - -proc `$`*(w: BnRange): string = - if w.len == 1: $w.minPt else: $w.minPt & ".." & $w.maxPt - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------