Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement SingleAttestation #7126

Draft
wants to merge 18 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions packages/api/src/beacon/routes/beacon/pool.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
/* eslint-disable @typescript-eslint/naming-convention */
import {ValueOf} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {isForkPostElectra} from "@lodestar/params";
import {phase0, capella, CommitteeIndex, Slot, ssz, electra, AttesterSlashing} from "@lodestar/types";
import {ForkPostElectra, ForkPreElectra, isForkPostElectra} from "@lodestar/params";
import {
phase0,
capella,
CommitteeIndex,
Slot,
ssz,
electra,
AttesterSlashing,
SingleAttestation,
} from "@lodestar/types";
import {Schema, Endpoint, RouteDefinitions} from "../../../utils/index.js";
import {
ArrayOf,
Expand All @@ -21,6 +30,8 @@ import {fromHeaders} from "../../../utils/headers.js";

// See /packages/api/src/routes/index.ts for reasoning and instructions to add new routes

const SingleAttestationListTypePhase0 = ArrayOf(ssz.phase0.Attestation);
const SingleAttestationListTypeElectra = ArrayOf(ssz.electra.SingleAttestation);
const AttestationListTypePhase0 = ArrayOf(ssz.phase0.Attestation);
const AttestationListTypeElectra = ArrayOf(ssz.electra.Attestation);
const AttesterSlashingListTypePhase0 = ArrayOf(ssz.phase0.AttesterSlashing);
Expand Down Expand Up @@ -143,7 +154,7 @@ export type Endpoints = {
*/
submitPoolAttestations: Endpoint<
"POST",
{signedAttestations: AttestationListPhase0},
{signedAttestations: SingleAttestation<ForkPreElectra>[]},
{body: unknown},
EmptyResponseData,
EmptyMeta
Expand All @@ -159,7 +170,7 @@ export type Endpoints = {
*/
submitPoolAttestationsV2: Endpoint<
"POST",
{signedAttestations: AttestationList},
{signedAttestations: SingleAttestation[]},
{body: unknown; headers: {[MetaHeader.Version]: string}},
EmptyResponseData,
EmptyMeta
Expand Down Expand Up @@ -317,10 +328,10 @@ export function getDefinitions(config: ChainForkConfig): RouteDefinitions<Endpoi
url: "/eth/v1/beacon/pool/attestations",
method: "POST",
req: {
writeReqJson: ({signedAttestations}) => ({body: AttestationListTypePhase0.toJson(signedAttestations)}),
parseReqJson: ({body}) => ({signedAttestations: AttestationListTypePhase0.fromJson(body)}),
writeReqSsz: ({signedAttestations}) => ({body: AttestationListTypePhase0.serialize(signedAttestations)}),
parseReqSsz: ({body}) => ({signedAttestations: AttestationListTypePhase0.deserialize(body)}),
writeReqJson: ({signedAttestations}) => ({body: SingleAttestationListTypePhase0.toJson(signedAttestations)}),
parseReqJson: ({body}) => ({signedAttestations: SingleAttestationListTypePhase0.fromJson(body)}),
writeReqSsz: ({signedAttestations}) => ({body: SingleAttestationListTypePhase0.serialize(signedAttestations)}),
parseReqSsz: ({body}) => ({signedAttestations: SingleAttestationListTypePhase0.deserialize(body)}),
schema: {
body: Schema.ObjectArray,
},
Expand All @@ -335,34 +346,34 @@ export function getDefinitions(config: ChainForkConfig): RouteDefinitions<Endpoi
const fork = config.getForkName(signedAttestations[0]?.data.slot ?? 0);
return {
body: isForkPostElectra(fork)
? AttestationListTypeElectra.toJson(signedAttestations as AttestationListElectra)
: AttestationListTypePhase0.toJson(signedAttestations as AttestationListPhase0),
? SingleAttestationListTypeElectra.toJson(signedAttestations as SingleAttestation<ForkPostElectra>[])
: SingleAttestationListTypePhase0.toJson(signedAttestations as SingleAttestation<ForkPreElectra>[]),
headers: {[MetaHeader.Version]: fork},
};
},
parseReqJson: ({body, headers}) => {
const fork = toForkName(fromHeaders(headers, MetaHeader.Version));
return {
signedAttestations: isForkPostElectra(fork)
? AttestationListTypeElectra.fromJson(body)
: AttestationListTypePhase0.fromJson(body),
? SingleAttestationListTypeElectra.fromJson(body)
: SingleAttestationListTypePhase0.fromJson(body),
};
},
writeReqSsz: ({signedAttestations}) => {
const fork = config.getForkName(signedAttestations[0]?.data.slot ?? 0);
return {
body: isForkPostElectra(fork)
? AttestationListTypeElectra.serialize(signedAttestations as AttestationListElectra)
: AttestationListTypePhase0.serialize(signedAttestations as AttestationListPhase0),
? SingleAttestationListTypeElectra.serialize(signedAttestations as SingleAttestation<ForkPostElectra>[])
: SingleAttestationListTypePhase0.serialize(signedAttestations as SingleAttestation<ForkPreElectra>[]),
headers: {[MetaHeader.Version]: fork},
};
},
parseReqSsz: ({body, headers}) => {
const fork = toForkName(fromHeaders(headers, MetaHeader.Version));
return {
signedAttestations: isForkPostElectra(fork)
? AttestationListTypeElectra.deserialize(body)
: AttestationListTypePhase0.deserialize(body),
? SingleAttestationListTypeElectra.deserialize(body)
: SingleAttestationListTypePhase0.deserialize(body),
};
},
schema: {
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/beacon/routes/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
Epoch,
phase0,
capella,
electra,
Slot,
ssz,
StringType,
Expand Down Expand Up @@ -51,6 +52,8 @@ export enum EventType {
block = "block",
/** The node has received a valid attestation (from P2P or API) */
attestation = "attestation",
/** The node has received a valid SingleAttestation (from P2P or API) */
singleAttestation = "single_attestation",
/** The node has received a valid voluntary exit (from P2P or API) */
voluntaryExit = "voluntary_exit",
/** The node has received a valid proposer slashing (from P2P or API) */
Expand Down Expand Up @@ -79,6 +82,7 @@ export const eventTypes: {[K in EventType]: K} = {
[EventType.head]: EventType.head,
[EventType.block]: EventType.block,
[EventType.attestation]: EventType.attestation,
[EventType.singleAttestation]: EventType.singleAttestation,
[EventType.voluntaryExit]: EventType.voluntaryExit,
[EventType.proposerSlashing]: EventType.proposerSlashing,
[EventType.attesterSlashing]: EventType.attesterSlashing,
Expand Down Expand Up @@ -108,6 +112,7 @@ export type EventData = {
executionOptimistic: boolean;
};
[EventType.attestation]: Attestation;
[EventType.singleAttestation]: electra.SingleAttestation;
[EventType.voluntaryExit]: phase0.SignedVoluntaryExit;
[EventType.proposerSlashing]: phase0.ProposerSlashing;
[EventType.attesterSlashing]: AttesterSlashing;
Expand Down Expand Up @@ -238,6 +243,7 @@ export function getTypeByEvent(config: ChainForkConfig): {[K in EventType]: Type
return sszTypesFor(fork).Attestation.fromJson(attestation);
},
},
[EventType.singleAttestation]: ssz.electra.SingleAttestation,
[EventType.voluntaryExit]: ssz.phase0.SignedVoluntaryExit,
[EventType.proposerSlashing]: ssz.phase0.ProposerSlashing,
[EventType.attesterSlashing]: {
Expand Down
13 changes: 13 additions & 0 deletions packages/api/test/unit/beacon/testData/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ export const eventTestData: EventData = {
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
[EventType.singleAttestation]: ssz.electra.SingleAttestation.fromJson({
committee_index: "1",
attester_index: "1",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
}),
[EventType.voluntaryExit]: ssz.phase0.SignedVoluntaryExit.fromJson({
message: {epoch: "1", validator_index: "1"},
signature:
Expand Down
19 changes: 16 additions & 3 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {Attestation, Epoch, isElectraAttestation, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE, isForkPostElectra} from "@lodestar/params";
import {Attestation, Epoch, isElectraAttestation, phase0, SingleAttestation, ssz} from "@lodestar/types";
import {
ForkName,
ForkPostElectra,
ForkPreElectra,
SYNC_COMMITTEE_SUBNET_SIZE,
isForkPostElectra,
} from "@lodestar/params";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
Expand Down Expand Up @@ -113,7 +119,14 @@ export function getBeaconPoolApi({
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
if (isForkPostElectra(fork)) {
nflaig marked this conversation as resolved.
Show resolved Hide resolved
chain.emitter.emit(
routes.events.EventType.singleAttestation,
attestation as SingleAttestation<ForkPostElectra>
);
} else {
chain.emitter.emit(routes.events.EventType.attestation, attestation as SingleAttestation<ForkPreElectra>);
}

const sentPeers = await network.publishBeaconAttestation(attestation, subnet);
metrics?.onPoolSubmitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers);
Expand Down
93 changes: 80 additions & 13 deletions packages/beacon-node/src/chain/seenCache/seenAttestationData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ export type AttestationDataCacheEntry = {
// part of shuffling data, so this does not take memory
committeeValidatorIndices: Uint32Array;
// undefined for phase0 Attestation
// TODO: remove this as it's not in SingleAttestation
committeeBits?: BitArray;
// TODO: remove this? this is available in SingleAttestation
committeeIndex: CommitteeIndex;
// IndexedAttestationData signing root, 32 bytes
signingRoot: Uint8Array;
Expand Down Expand Up @@ -53,8 +55,14 @@ const DEFAULT_CACHE_SLOT_DISTANCE = 2;
* Having this cache help saves a lot of cpu time since most of the gossip attestations are on the same slot.
*/
export class SeenAttestationDatas {
private cacheEntryByAttDataBase64BySlot = new MapDef<Slot, Map<SeenAttDataKey, AttestationDataCacheEntry>>(
() => new Map<SeenAttDataKey, AttestationDataCacheEntry>()
private cacheEntryByAttDataByIndexBySlot = new MapDef<
Slot,
MapDef<CommitteeIndex, Map<AttDataBase64, AttestationDataCacheEntry>>
>(
() =>
new MapDef<CommitteeIndex, Map<AttDataBase64, AttestationDataCacheEntry>>(
() => new Map<AttDataBase64, AttestationDataCacheEntry>()
)
);
private lowestPermissibleSlot = 0;

Expand All @@ -68,29 +76,70 @@ export class SeenAttestationDatas {
}

// TODO: Move InsertOutcome type definition to a common place
/**
* @deprecated this will be removed soon, rename addItem() to add()
*/
add(slot: Slot, attDataKey: SeenAttDataKey, cacheEntry: AttestationDataCacheEntry): InsertOutcome {
if (slot < this.lowestPermissibleSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.too_old});
return InsertOutcome.Old;
}

const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.getOrDefault(slot);
if (cacheEntryByAttDataBase64.has(attDataKey)) {
const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.getOrDefault(slot);
// just for compilation, we will remove this whole function anyway
const committeeIndex = cacheEntry.committeeIndex;
const cacheEntryByAttData = cacheEntryByAttDataByIndex.getOrDefault(committeeIndex);
if (cacheEntryByAttData.has(attDataKey)) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.already_known});
return InsertOutcome.AlreadyKnown;
}

if (cacheEntryByAttDataBase64.size >= this.maxCacheSizePerSlot) {
if (cacheEntryByAttData.size >= this.maxCacheSizePerSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.reached_limit});
return InsertOutcome.ReachLimit;
}

cacheEntryByAttDataBase64.set(attDataKey, cacheEntry);
cacheEntryByAttData.set(attDataKey, cacheEntry);
return InsertOutcome.NewData;
}

// TODO: rename to add()
// preElectra: add(slot, 0, attDataBase64, cacheEntry) since committeeIndex stay in AttestationData
// electra: add(slot, committeeIndex, attDataBase64, cacheEntry)
addItem(
slot: Slot,
committeeIndex: CommitteeIndex,
attDataBase64: AttDataBase64,
cacheEntry: AttestationDataCacheEntry
): InsertOutcome {
if (slot < this.lowestPermissibleSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.too_old});
return InsertOutcome.Old;
}

const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.getOrDefault(slot);
const cacheEntryByAttData = cacheEntryByAttDataByIndex.getOrDefault(committeeIndex);
if (cacheEntryByAttData.has(attDataBase64)) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.already_known});
return InsertOutcome.AlreadyKnown;
}

if (cacheEntryByAttData.size >= this.maxCacheSizePerSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.reached_limit});
return InsertOutcome.ReachLimit;
}

cacheEntryByAttData.set(attDataBase64, cacheEntry);
return InsertOutcome.NewData;
}

/**
* @deprecated this will be removed soon, rename getItem() to get()
*/
get(slot: Slot, attDataBase64: SeenAttDataKey): AttestationDataCacheEntry | null {
const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.get(slot);
const committeeIndex = 0;
// hard code just for compilation, we will remove this whole function anyway
const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataByIndexBySlot.get(slot)?.get(committeeIndex);
const cacheEntry = cacheEntryByAttDataBase64?.get(attDataBase64);
if (cacheEntry) {
this.metrics?.seenCache.attestationData.hit.inc();
Expand All @@ -100,22 +149,40 @@ export class SeenAttestationDatas {
return cacheEntry ?? null;
}

// TODO: rename to get()
// preElectra: getItem(slot, 0, attDataBase64) since committeeIndex stay in AttestationData
// electra: getItem(slot, committeeIndex, attDataBase64)
getItem(slot: Slot, committeeIndex: CommitteeIndex, attDataBase64: SeenAttDataKey): AttestationDataCacheEntry | null {
const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.get(slot);
const cacheEntryByAttData = cacheEntryByAttDataByIndex?.get(committeeIndex);
const cacheEntry = cacheEntryByAttData?.get(attDataBase64);
if (cacheEntry) {
this.metrics?.seenCache.attestationData.hit.inc();
} else {
this.metrics?.seenCache.attestationData.miss.inc();
}
return cacheEntry ?? null;
}

onSlot(clockSlot: Slot): void {
this.lowestPermissibleSlot = Math.max(clockSlot - this.cacheSlotDistance, 0);
for (const slot of this.cacheEntryByAttDataBase64BySlot.keys()) {
for (const slot of this.cacheEntryByAttDataByIndexBySlot.keys()) {
if (slot < this.lowestPermissibleSlot) {
this.cacheEntryByAttDataBase64BySlot.delete(slot);
this.cacheEntryByAttDataByIndexBySlot.delete(slot);
}
}
}

private onScrapeLodestarMetrics(metrics: Metrics): void {
metrics?.seenCache.attestationData.totalSlot.set(this.cacheEntryByAttDataBase64BySlot.size);
metrics?.seenCache.attestationData.totalSlot.set(this.cacheEntryByAttDataByIndexBySlot.size);
// tracking number of attestation data at current slot may not be correct if scrape time is not at the end of slot
// so we track it at the previous slot
const previousSlot = this.lowestPermissibleSlot + this.cacheSlotDistance - 1;
metrics?.seenCache.attestationData.countPerSlot.set(
this.cacheEntryByAttDataBase64BySlot.get(previousSlot)?.size ?? 0
);
const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.get(previousSlot);
let count = 0;
for (const cacheEntryByAttDataBase64 of cacheEntryByAttDataByIndex?.values() ?? []) {
count += cacheEntryByAttDataBase64.size;
}
metrics?.seenCache.attestationData.countPerSlot.set(count);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import {SlotOptionalRoot, SlotRootHex} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {
getBlockRootFromAttestationSerialized,
getBlockRootFromSignedAggregateAndProofSerialized,
getSlotFromAttestationSerialized,
getSlotFromSignedAggregateAndProofSerialized,
getSlotFromBlobSidecarSerialized,
getSlotFromSignedBeaconBlockSerialized,
getSlotFromBeaconAttestationSerialized,
getBlockRootFromBeaconAttestationSerialized,
} from "../../util/sszBytes.js";
import {GossipType} from "../gossip/index.js";
import {ExtractSlotRootFns} from "./types.js";
Expand All @@ -16,9 +17,9 @@ import {ExtractSlotRootFns} from "./types.js";
*/
export function createExtractBlockSlotRootFns(): ExtractSlotRootFns {
return {
[GossipType.beacon_attestation]: (data: Uint8Array): SlotRootHex | null => {
const slot = getSlotFromAttestationSerialized(data);
const root = getBlockRootFromAttestationSerialized(data);
[GossipType.beacon_attestation]: (data: Uint8Array, fork: ForkName): SlotRootHex | null => {
const slot = getSlotFromBeaconAttestationSerialized(fork, data);
const root = getBlockRootFromBeaconAttestationSerialized(fork, data);

if (slot === null || root === null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOp
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
// TODO: modify after we change attestationPool due to SingleAttestation
const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
Expand Down
Loading
Loading