Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal implementation of Portal ping payload extensions spec #3010

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions fluffy/network/beacon/beacon_network.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# fluffy
# Copyright (c) 2022-2024 Status Research & Development GmbH
# Copyright (c) 2022-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand All @@ -14,14 +14,16 @@ import
eth/p2p/discoveryv5/[protocol, enr],
beacon_chain/spec/forks,
beacon_chain/gossip_processing/light_client_processor,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
../wire/[portal_protocol, portal_stream, portal_protocol_config, ping_extensions],
"."/[beacon_content, beacon_db, beacon_validation, beacon_chain_historical_summaries]

export beacon_content, beacon_db

logScope:
topics = "portal_beacon"

const pingExtensionCapabilities = {CapabilitiesType, BasicRadiusType}

type BeaconNetwork* = ref object
portalProtocol*: PortalProtocol
beaconDb*: BeaconDb
Expand Down Expand Up @@ -213,6 +215,7 @@ proc new*(
stream,
bootstrapRecords,
config = portalConfig,
pingExtensionCapabilities = pingExtensionCapabilities,
)

let beaconBlockRoot =
Expand Down
7 changes: 5 additions & 2 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand All @@ -17,7 +17,7 @@ import
../../common/common_types,
../../database/content_db,
../../network_metadata,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
../wire/[portal_protocol, portal_stream, portal_protocol_config, ping_extensions],
"."/[history_content, history_validation, history_type_conversions],
../beacon/beacon_chain_historical_roots,
./content/content_deprecated
Expand All @@ -30,6 +30,8 @@ logScope:

export blocks_rlp

const pingExtensionCapabilities = {CapabilitiesType, HistoryRadiusType}

type
HistoryNetwork* = ref object
portalProtocol*: PortalProtocol
Expand Down Expand Up @@ -350,6 +352,7 @@ proc new*(
stream,
bootstrapRecords,
config = portalConfig,
pingExtensionCapabilities = pingExtensionCapabilities,
)

HistoryNetwork(
Expand Down
7 changes: 5 additions & 2 deletions fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand All @@ -16,7 +16,7 @@ import
eth/p2p/discoveryv5/[protocol, enr],
../../database/content_db,
../history/history_network,
../wire/[portal_protocol, portal_stream, portal_protocol_config],
../wire/[portal_protocol, portal_stream, portal_protocol_config, ping_extensions],
./state_content,
./state_validation,
./state_gossip
Expand All @@ -31,6 +31,8 @@ declareCounter state_network_offers_success,
declareCounter state_network_offers_failed,
"Portal state network offers which failed validation", labels = ["protocol_id"]

const pingExtensionCapabilities = {CapabilitiesType, BasicRadiusType}

type StateNetwork* = ref object
portalProtocol*: PortalProtocol
contentQueue*: AsyncQueue[(Opt[NodeId], ContentKeysList, seq[seq[byte]])]
Expand Down Expand Up @@ -69,6 +71,7 @@ proc new*(
s,
bootstrapRecords,
config = portalConfig,
pingExtensionCapabilities = pingExtensionCapabilities,
)

StateNetwork(
Expand Down
12 changes: 5 additions & 7 deletions fluffy/network/wire/messages.nim
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Nimbus - Portal Network- Message types
# Copyright (c) 2021-2024 Status Research & Development GmbH
# Copyright (c) 2021-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
Expand Down Expand Up @@ -29,10 +29,6 @@ type
ContentKeysList* = List[ContentKeyByteList, contentKeysLimit]
ContentKeysBitList* = BitList[contentKeysLimit]

# TODO: should become part of the specific networks, considering it is custom.
CustomPayload* = object
dataRadius*: UInt256

MessageKind* = enum
ping = 0x00
pong = 0x01
Expand All @@ -50,11 +46,13 @@ type

PingMessage* = object
enrSeq*: uint64
customPayload*: ByteList[2048]
payload_type*: uint16
payload*: ByteList[1100]

PongMessage* = object
enrSeq*: uint64
customPayload*: ByteList[2048]
payload_type*: uint16
payload*: ByteList[1100]

FindNodesMessage* = object
distances*: List[uint16, 256]
Expand Down
63 changes: 63 additions & 0 deletions fluffy/network/wire/ping_extensions.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Nimbus
# Copyright (c) 2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import ssz_serialization

const
# Extension types
CapabilitiesType* = 0'u16
BasicRadiusType* = 1'u16
HistoryRadiusType* = 2'u16
ErrorType* = 65535'u16

# Limits
MAX_CLIENT_INFO_BYTE_LENGTH* = 200
MAX_CAPABILITIES_LENGTH* = 400
MAX_ERROR_BYTE_LENGTH* = 300

# Different ping extension payloads, TODO: could be moved to each their own file?
type
CapabilitiesPayload* = object
client_info*: ByteList[MAX_CLIENT_INFO_BYTE_LENGTH]
data_radius*: UInt256
capabilities*: List[uint16, MAX_CAPABILITIES_LENGTH]

BasicRadiusPayload* = object
data_radius*: UInt256

HistoryRadiusPayload* = object
data_radius*: UInt256
ephemeral_header_count*: uint16

ErrorPayload* = object
error_code*: uint16
message*: ByteList[MAX_ERROR_BYTE_LENGTH]

CustomPayload* =
CapabilitiesPayload | BasicRadiusPayload | HistoryRadiusPayload | ErrorPayload

ErrorCode* = enum
ExtensionNotSupported = 0
RequestedDataNotFound = 1
FailedToDecodePayload = 2
SystemError = 3

func encodePayload*(payload: CustomPayload): ByteList[1100] =
ByteList[1100].init(SSZ.encode(payload))

func encodeErrorPayload*(code: ErrorCode): (uint16, ByteList[1100]) =
(
ErrorType,
encodePayload(
ErrorPayload(
error_code: uint16(ord(code)),
message: ByteList[MAX_ERROR_BYTE_LENGTH].init(@[]),
)
),
)
117 changes: 88 additions & 29 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import
minilru,
eth/rlp,
eth/p2p/discoveryv5/[protocol, node, enr, routing_table, random2, nodes_verification],
"."/[portal_stream, portal_protocol_config],
"."/[portal_stream, portal_protocol_config, ping_extensions],
./messages

from std/times import epochTime # For system timestamp in traceContentLookup
Expand Down Expand Up @@ -186,6 +186,7 @@ type
offerWorkers: seq[Future[void]]
pingTimings: Table[NodeId, chronos.Moment]
config*: PortalProtocolConfig
pingExtensionCapabilities*: set[uint16]

PortalResult*[T] = Result[T, string]

Expand Down Expand Up @@ -334,26 +335,68 @@ func truncateEnrs(

enrs

proc handlePingExtension(
p: PortalProtocol,
payloadType: uint16,
encodedPayload: ByteList[1100],
srcId: NodeId,
): (uint16, ByteList[1100]) =
if payloadType notin p.pingExtensionCapabilities:
return encodeErrorPayload(ErrorCode.ExtensionNotSupported)

case payloadType
of CapabilitiesType:
let payload = decodeSsz(encodedPayload.asSeq(), CapabilitiesPayload).valueOr:
return encodeErrorPayload(ErrorCode.FailedToDecodePayload)

p.radiusCache.put(srcId, payload.data_radius)

(
payloadType,
encodePayload(
CapabilitiesPayload(
client_info: ByteList[MAX_CLIENT_INFO_BYTE_LENGTH].init(@[]),
data_radius: p.dataRadius(),
capabilities: List[uint16, MAX_CAPABILITIES_LENGTH].init(
p.pingExtensionCapabilities.toSeq()
),
)
),
)
of BasicRadiusType:
let payload = decodeSsz(encodedPayload.asSeq(), BasicRadiusPayload).valueOr:
return encodeErrorPayload(ErrorCode.FailedToDecodePayload)

p.radiusCache.put(srcId, payload.data_radius)

(payloadType, encodePayload(HistoryRadiusPayload(data_radius: p.dataRadius())))
of HistoryRadiusType:
let payload = decodeSsz(encodedPayload.asSeq(), HistoryRadiusPayload).valueOr:
return encodeErrorPayload(ErrorCode.FailedToDecodePayload)

p.radiusCache.put(srcId, payload.data_radius)

(
payloadType,
encodePayload(
HistoryRadiusPayload(data_radius: p.dataRadius(), ephemeral_header_count: 0)
),
)
else:
encodeErrorPayload(ErrorCode.ExtensionNotSupported)

proc handlePing(p: PortalProtocol, ping: PingMessage, srcId: NodeId): seq[byte] =
# TODO: This should become custom per Portal Network
# TODO: Need to think about the effect of malicious actor sending lots of
# pings from different nodes to clear the LRU.
let customPayloadDecoded =
try:
SSZ.decode(ping.customPayload.asSeq(), CustomPayload)
except SerializationError:
# invalid custom payload, send empty back
return @[]
p.radiusCache.put(srcId, customPayloadDecoded.dataRadius)
let (payloadType, payload) =
handlePingExtension(p, ping.payload_type, ping.payload, srcId)

let customPayload = CustomPayload(dataRadius: p.dataRadius())
let p = PongMessage(
enrSeq: p.localNode.record.seqNum,
customPayload: ByteList[2048](SSZ.encode(customPayload)),
encodeMessage(
PongMessage(
enrSeq: p.localNode.record.seqNum, payload_type: payloadType, payload: payload
)
)

encodeMessage(p)

proc handleFindNodes(p: PortalProtocol, fn: FindNodesMessage): seq[byte] =
if fn.distances.len == 0:
let enrs = List[ByteList[2048], 32](@[])
Expand Down Expand Up @@ -573,6 +616,7 @@ proc new*(
bootstrapRecords: openArray[Record] = [],
distanceCalculator: DistanceCalculator = XorDistanceCalculator,
config: PortalProtocolConfig = defaultPortalProtocolConfig,
pingExtensionCapabilities: set[uint16] = {CapabilitiesType},
): T =
let proto = PortalProtocol(
protocolHandler: messageHandler,
Expand All @@ -595,6 +639,7 @@ proc new*(
offerQueue: newAsyncQueue[OfferRequest](config.maxConcurrentOffers),
pingTimings: Table[NodeId, chronos.Moment](),
config: config,
pingExtensionCapabilities: pingExtensionCapabilities,
)

proto.baseProtocol.registerTalkProtocol(@(proto.protocolId), proto).expect(
Expand Down Expand Up @@ -657,10 +702,19 @@ proc reqResponse[Request: SomeMessage, Response: SomeMessage](
proc pingImpl*(
p: PortalProtocol, dst: Node
): Future[PortalResult[PongMessage]] {.async: (raises: [CancelledError]).} =
let customPayload = CustomPayload(dataRadius: p.dataRadius())
let pingPayload = encodePayload(
CapabilitiesPayload(
client_info: ByteList[MAX_CLIENT_INFO_BYTE_LENGTH].init(@[]),
data_radius: p.dataRadius(),
capabilities:
List[uint16, MAX_CAPABILITIES_LENGTH].init(p.pingExtensionCapabilities.toSeq()),
)
)

let ping = PingMessage(
enrSeq: p.localNode.record.seqNum,
customPayload: ByteList[2048](SSZ.encode(customPayload)),
payload_type: CapabilitiesType,
payload: pingPayload,
)

return await reqResponse[PingMessage, PongMessage](p, dst, ping)
Expand Down Expand Up @@ -703,25 +757,30 @@ proc recordsFromBytes*(

proc ping*(
p: PortalProtocol, dst: Node
): Future[PortalResult[PongMessage]] {.async: (raises: [CancelledError]).} =
): Future[PortalResult[(uint64, CapabilitiesPayload)]] {.
async: (raises: [CancelledError])
.} =
let pongResponse = await p.pingImpl(dst)

if pongResponse.isOk():
# Update last time we pinged this node
p.pingTimings[dst.id] = now(chronos.Moment)

let pong = pongResponse.get()
# TODO: This should become custom per Portal Network
let customPayloadDecoded =
try:
SSZ.decode(pong.customPayload.asSeq(), CustomPayload)
except SerializationError:
# invalid custom payload
return err("Pong message contains invalid custom payload")

p.radiusCache.put(dst.id, customPayloadDecoded.dataRadius)
# Note: currently only decoding as capabilities payload as this is the only
# one that we support sending.
if pong.payload_type != CapabilitiesType:
return err("Pong message contains invalid or error payload")

return pongResponse
let payload = decodeSsz(pong.payload.asSeq(), CapabilitiesPayload).valueOr:
return err("Pong message contains invalid CapabilitiesPayload")

p.radiusCache.put(dst.id, payload.data_radius)

ok((pong.enrSeq, payload))
else:
err(pongResponse.error)

proc findNodes*(
p: PortalProtocol, dst: Node, distances: seq[uint16]
Expand Down Expand Up @@ -1706,8 +1765,8 @@ proc revalidateNode*(p: PortalProtocol, n: Node) {.async: (raises: [CancelledErr
let pong = await p.ping(n)

if pong.isOk():
let res = pong.get()
if res.enrSeq > n.record.seqNum:
let (enrSeq, _) = pong.get()
if enrSeq > n.record.seqNum:
# Request new ENR
let nodesMessage = await p.findNodes(n, @[0'u16])
if nodesMessage.isOk():
Expand Down
Loading
Loading