Skip to content

Commit

Permalink
fix: improve forkchoice (#7142)
Browse files Browse the repository at this point in the history
* fix: reuse deltas in computeDeltas

* fix: remodel queuedAttestations

* fix: call forkchoice.updateTime() once per clock slot

* fix: recomputeForkChoiceHead() at slot boundary

* fix: improve computeDeltas() - handle newBalances = oldBalances

* fix: do not compute forkchoice head at slot boundary

* fix: prepareNextSlot unit test
  • Loading branch information
twoeths authored Oct 11, 2024
1 parent 0d87c28 commit a570048
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 58 deletions.
12 changes: 0 additions & 12 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,6 @@ export function getValidatorApi(
notWhileSyncing();
await waitForSlot(slot); // Must never request for a future slot > currentSlot

// Process the queued attestations in the forkchoice for correct head estimation
// forkChoice.updateTime() might have already been called by the onSlot clock
// handler, in which case this should just return.
chain.forkChoice.updateTime(slot);
parentBlockRoot = fromHex(chain.getProposerHead(slot).blockRoot);
} else {
parentBlockRoot = inParentBlockRoot;
Expand Down Expand Up @@ -459,10 +455,6 @@ export function getValidatorApi(
notWhileSyncing();
await waitForSlot(slot); // Must never request for a future slot > currentSlot

// Process the queued attestations in the forkchoice for correct head estimation
// forkChoice.updateTime() might have already been called by the onSlot clock
// handler, in which case this should just return.
chain.forkChoice.updateTime(slot);
parentBlockRoot = fromHex(chain.getProposerHead(slot).blockRoot);
} else {
parentBlockRoot = inParentBlockRoot;
Expand Down Expand Up @@ -535,10 +527,6 @@ export function getValidatorApi(
notWhileSyncing();
await waitForSlot(slot); // Must never request for a future slot > currentSlot

// Process the queued attestations in the forkchoice for correct head estimation
// forkChoice.updateTime() might have already been called by the onSlot clock
// handler, in which case this should just return.
chain.forkChoice.updateTime(slot);
const parentBlockRoot = fromHex(chain.getProposerHead(slot).blockRoot);
notOnOutOfRangeData(parentBlockRoot);

Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import type {BeaconChain} from "../chain.js";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {ForkchoiceCaller} from "../forkChoice/index.js";
import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt, BlockInputType} from "./types.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";
Expand Down Expand Up @@ -208,7 +209,7 @@ export async function importBlock(
// 5. Compute head. If new head, immediately stateCache.setHeadState()

const oldHead = this.forkChoice.getHead();
const newHead = this.recomputeForkChoiceHead();
const newHead = this.recomputeForkChoiceHead(ForkchoiceCaller.importBlock);
const currFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;

if (newHead.blockRoot !== oldHead.blockRoot) {
Expand Down
12 changes: 6 additions & 6 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import {
} from "./interface.js";
import {IChainOptions} from "./options.js";
import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {initializeForkChoice} from "./forkChoice/index.js";
import {ForkchoiceCaller, initializeForkChoice} from "./forkChoice/index.js";
import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js";
import {
SeenAttesters,
Expand Down Expand Up @@ -784,9 +784,9 @@ export class BeaconChain implements IBeaconChain {
};
}

recomputeForkChoiceHead(): ProtoBlock {
recomputeForkChoiceHead(caller: ForkchoiceCaller): ProtoBlock {
this.metrics?.forkChoice.requests.inc();
const timer = this.metrics?.forkChoice.findHead.startTimer({entrypoint: FindHeadFnName.recomputeForkChoiceHead});
const timer = this.metrics?.forkChoice.findHead.startTimer({caller});

try {
return this.forkChoice.updateAndGetHead({mode: UpdateHeadOpt.GetCanonicialHead}).head;
Expand All @@ -800,7 +800,7 @@ export class BeaconChain implements IBeaconChain {

predictProposerHead(slot: Slot): ProtoBlock {
this.metrics?.forkChoice.requests.inc();
const timer = this.metrics?.forkChoice.findHead.startTimer({entrypoint: FindHeadFnName.predictProposerHead});
const timer = this.metrics?.forkChoice.findHead.startTimer({caller: FindHeadFnName.predictProposerHead});

try {
return this.forkChoice.updateAndGetHead({mode: UpdateHeadOpt.GetPredictedProposerHead, slot}).head;
Expand All @@ -814,7 +814,7 @@ export class BeaconChain implements IBeaconChain {

getProposerHead(slot: Slot): ProtoBlock {
this.metrics?.forkChoice.requests.inc();
const timer = this.metrics?.forkChoice.findHead.startTimer({entrypoint: FindHeadFnName.getProposerHead});
const timer = this.metrics?.forkChoice.findHead.startTimer({caller: FindHeadFnName.getProposerHead});
const secFromSlot = this.clock.secFromSlot(slot);

try {
Expand Down Expand Up @@ -1060,8 +1060,8 @@ export class BeaconChain implements IBeaconChain {
if (this.forkChoice.irrecoverableError) {
this.processShutdownCallback(this.forkChoice.irrecoverableError);
}
this.forkChoice.updateTime(slot);

this.forkChoice.updateTime(slot);
this.metrics?.clockSlot.set(slot);

this.attestationPool.prune(slot);
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/forkChoice/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ export type ForkChoiceOpts = RawForkChoiceOpts & {
forkchoiceConstructor?: typeof ForkChoice;
};

export enum ForkchoiceCaller {
prepareNextSlot = "prepare_next_slot",
importBlock = "import_block",
}

/**
* Fork Choice extended with a ChainEventEmitter
*/
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {ShufflingCache} from "./shufflingCache.js";
import {BlockRewards} from "./rewards/blockRewards.js";
import {AttestationsRewards} from "./rewards/attestationsRewards.js";
import {SyncCommitteeRewards} from "./rewards/syncCommitteeRewards.js";
import {ForkchoiceCaller} from "./forkChoice/index.js";

export {BlockType, type AssembledBlockType};
export {type ProposerPreparationData};
Expand Down Expand Up @@ -204,7 +205,7 @@ export interface IBeaconChain {

getStatus(): phase0.Status;

recomputeForkChoiceHead(): ProtoBlock;
recomputeForkChoiceHead(caller: ForkchoiceCaller): ProtoBlock;

/** When proposerBoostReorg is enabled, this is called at slot n-1 to predict the head block to build on if we are proposing at slot n */
predictProposerHead(slot: Slot): ProtoBlock;
Expand Down
5 changes: 4 additions & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {isQueueErrorAborted} from "../util/queue/index.js";
import {prepareExecutionPayload, getPayloadAttributesForSSE} from "./produceBlock/produceBlockBody.js";
import {IBeaconChain} from "./interface.js";
import {RegenCaller} from "./regen/index.js";
import {ForkchoiceCaller} from "./forkChoice/index.js";

/* With 12s slot times, this scheduler will run 4s before the start of each slot (`12 / 3 = 4`). */
export const SCHEDULER_LOOKAHEAD_FACTOR = 3;
Expand Down Expand Up @@ -77,7 +78,9 @@ export class PrepareNextSlotScheduler {
await sleep(slotMs - slotMs / SCHEDULER_LOOKAHEAD_FACTOR, this.signal);

// calling updateHead() here before we produce a block to reduce reorg possibility
const {slot: headSlot, blockRoot: headRoot} = this.chain.recomputeForkChoiceHead();
const {slot: headSlot, blockRoot: headRoot} = this.chain.recomputeForkChoiceHead(
ForkchoiceCaller.prepareNextSlot
);

// PS: previously this was comparing slots, but that gave no leway on the skipped
// slots on epoch bounday. Making it more fluid.
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/metrics/metrics/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ export function createBeaconMetrics(register: RegistryMetricCreator) {
// Non-spec'ed

forkChoice: {
findHead: register.histogram<{entrypoint: string}>({
findHead: register.histogram<{caller: string}>({
name: "beacon_fork_choice_find_head_seconds",
help: "Time taken to find head in seconds",
buckets: [0.1, 1, 10],
labelNames: ["entrypoint"],
labelNames: ["caller"],
}),
requests: register.gauge({
name: "beacon_fork_choice_requests_total",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ describe("PrepareNextSlot scheduler", () => {
scheduler.prepareForNextSlot(2 * SLOTS_PER_EPOCH - 1),
vi.advanceTimersByTimeAsync((config.SECONDS_PER_SLOT * 1000 * 2) / 3),
]);
expect(chainStub.recomputeForkChoiceHead).toHaveBeenCalledWith();
expect(chainStub.recomputeForkChoiceHead).toHaveBeenCalledOnce();
expect(regenStub.getBlockSlotState).not.toHaveBeenCalled();
});

Expand Down
51 changes: 33 additions & 18 deletions packages/fork-choice/src/forkChoice/forkChoice.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Logger, fromHex, toRootHex} from "@lodestar/utils";
import {Logger, MapDef, fromHex, toRootHex} from "@lodestar/utils";
import {SLOTS_PER_HISTORICAL_ROOT, SLOTS_PER_EPOCH, INTERVALS_PER_SLOT} from "@lodestar/params";
import {bellatrix, Slot, ValidatorIndex, phase0, ssz, RootHex, Epoch, Root, BeaconBlock} from "@lodestar/types";
import {
Expand Down Expand Up @@ -34,7 +34,6 @@ import {ForkChoiceError, ForkChoiceErrorCode, InvalidBlockCode, InvalidAttestati
import {
IForkChoice,
LatestMessage,
QueuedAttestation,
PowBlockHex,
EpochDifference,
AncestorResult,
Expand Down Expand Up @@ -91,7 +90,9 @@ export class ForkChoice implements IForkChoice {
* Attestations that arrived at the current slot and must be queued for later processing.
* NOT currently tracked in the protoArray
*/
private readonly queuedAttestations = new Set<QueuedAttestation>();
private readonly queuedAttestations: MapDef<Slot, MapDef<RootHex, Set<ValidatorIndex>>> = new MapDef(
() => new MapDef(() => new Set())
);

// Note: as of Jun 2022 Lodestar metrics show that 100% of the times updateHead() is called, synced = false.
// Because we are processing attestations from gossip, recomputing scores is always necessary
Expand Down Expand Up @@ -128,9 +129,13 @@ export class ForkChoice implements IForkChoice {
}

getMetrics(): ForkChoiceMetrics {
let numAttestations = 0;
for (const indicesByRoot of this.queuedAttestations.values()) {
numAttestations += Array.from(indicesByRoot.values()).reduce((acc, indices) => acc + indices.size, 0);
}
return {
votes: this.votes.length,
queuedAttestations: this.queuedAttestations.size,
queuedAttestations: numAttestations,
validatedAttestationDatas: this.validatedAttestationDatas.size,
balancesLength: this.balances.length,
nodes: this.protoArray.nodes.length,
Expand Down Expand Up @@ -716,12 +721,13 @@ export class ForkChoice implements IForkChoice {
// Attestations can only affect the fork choice of subsequent slots.
// Delay consideration in the fork choice until their slot is in the past.
// ```
this.queuedAttestations.add({
slot: slot,
attestingIndices: attestation.attestingIndices,
blockRoot: blockRootHex,
targetEpoch,
});
const byRoot = this.queuedAttestations.getOrDefault(slot);
const validatorIndices = byRoot.getOrDefault(blockRootHex);
for (const validatorIndex of attestation.attestingIndices) {
if (!this.fcStore.equivocatingIndices.has(validatorIndex)) {
validatorIndices.add(validatorIndex);
}
}
}
}

Expand Down Expand Up @@ -751,6 +757,11 @@ export class ForkChoice implements IForkChoice {

/**
* Call `onTick` for all slots between `fcStore.getCurrentSlot()` and the provided `currentSlot`.
* This should only be called once per slot because:
* - calling this multiple times in the same slot does not update `votes`
* - new attestations in the current slot must stay in the queue
* - new attestations in the old slots are applied to the `votes` already
* - also side effect of this function is `validatedAttestationDatas` reseted
*/
updateTime(currentSlot: Slot): void {
if (this.fcStore.currentSlot >= currentSlot) return;
Expand Down Expand Up @@ -1352,15 +1363,19 @@ export class ForkChoice implements IForkChoice {
*/
private processAttestationQueue(): void {
const currentSlot = this.fcStore.currentSlot;
for (const attestation of this.queuedAttestations.values()) {
// Delay consideration in the fork choice until their slot is in the past.
if (attestation.slot < currentSlot) {
this.queuedAttestations.delete(attestation);
const {blockRoot, targetEpoch} = attestation;
const blockRootHex = blockRoot;
for (const validatorIndex of attestation.attestingIndices) {
this.addLatestMessage(validatorIndex, targetEpoch, blockRootHex);
for (const [slot, byRoot] of this.queuedAttestations.entries()) {
const targetEpoch = computeEpochAtSlot(slot);
if (slot < currentSlot) {
this.queuedAttestations.delete(slot);
for (const [blockRoot, validatorIndices] of byRoot.entries()) {
const blockRootHex = blockRoot;
for (const validatorIndex of validatorIndices) {
// equivocatingIndices was checked in onAttestation
this.addLatestMessage(validatorIndex, targetEpoch, blockRootHex);
}
}
} else {
break;
}
}
}
Expand Down
11 changes: 0 additions & 11 deletions packages/fork-choice/src/forkChoice/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,3 @@ export type LatestMessage = {
epoch: Epoch;
root: RootHex;
};

/**
* Used for queuing attestations from the current slot. Only contains the minimum necessary
* information about the attestation.
*/
export type QueuedAttestation = {
slot: Slot;
attestingIndices: ValidatorIndex[];
blockRoot: RootHex;
targetEpoch: Epoch;
};
11 changes: 6 additions & 5 deletions packages/fork-choice/src/protoArray/computeDeltas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ import {EffectiveBalanceIncrements} from "@lodestar/state-transition";
import {VoteTracker} from "./interface.js";
import {ProtoArrayError, ProtoArrayErrorCode} from "./errors.js";

// reuse arrays to avoid memory reallocation and gc
const deltas = new Array<number>();

/**
* Returns a list of `deltas`, where there is one delta for each of the indices in `indices`
*
Expand All @@ -19,10 +22,8 @@ export function computeDeltas(
newBalances: EffectiveBalanceIncrements,
equivocatingIndices: Set<ValidatorIndex>
): number[] {
const deltas = new Array<number>(numProtoNodes);
for (let i = 0; i < numProtoNodes; i++) {
deltas[i] = 0;
}
deltas.length = numProtoNodes;
deltas.fill(0);

// avoid creating new variables in the loop to potentially reduce GC pressure
let oldBalance, newBalance: number;
Expand All @@ -47,7 +48,7 @@ export function computeDeltas(
// It is possible that there was a vote for an unknown validator if we change our justified
// state to a new state with a higher epoch that is on a different fork because that fork may have
// on-boarded fewer validators than the prior fork.
newBalance = newBalances[vIndex] ?? 0;
newBalance = newBalances === oldBalances ? oldBalance : newBalances[vIndex] ?? 0;

if (equivocatingIndices.size > 0 && equivocatingIndices.has(vIndex)) {
// this function could be called multiple times but we only want to process slashing validator for 1 time
Expand Down

0 comments on commit a570048

Please sign in to comment.