diff --git a/README-DAS.md b/README-DAS.md new file mode 100644 index 00000000..8bf8239f --- /dev/null +++ b/README-DAS.md @@ -0,0 +1,26 @@ +# DAS emulator + +Emulate DAS DHT behavior, with a few simple assumption +- the block is populated in the DHT by the builder (node 0) +- all nodes start sampling at the same time +- 1-way latency is 50ms (configurable) +- no losses in transmission (configurable) +- scaled down numbers (nodes, blocksize, etc., all configrable) + +## Compilation + +``` +# install Nim 1.6 + +# install Nimble 0.14+ +nimble install nimble + +# make sure the newly installed nimble is used +export PATH=~/.nimble/bin:$PATH + +# install dependencies +nimble install + +# compile and run passing on various flags +nimble run "-d:chronicles_sinks=textlines[stdout,nocolors]" -d:chronicles_log_level=INFO -d:release -d:asyncTimer=virtual das +``` \ No newline at end of file diff --git a/das.nim b/das.nim new file mode 100644 index 00000000..946d7426 --- /dev/null +++ b/das.nim @@ -0,0 +1,185 @@ +import + std/[random, math], + chronicles, + chronos, + libp2pdht/dht, + libp2pdht/discv5/crypto as dhtcrypto, + libp2pdht/discv5/protocol as discv5_protocol, + test_helper + +logScope: + topics = "DAS emulator" + +proc bootstrapNodes( + nodecount: int, + bootnodes: seq[SignedPeerRecord], + rng = newRng(), + delay: int = 0 + ) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} = + + debug "---- STARTING BOOSTRAPS ---" + for i in 0.. 0: + await sleepAsync(chronos.milliseconds(delay)) + except TransportOsError as e: + echo "skipping node ",i ,":", e.msg + + #await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs + +proc bootstrapNetwork( + nodecount: int, + rng = newRng(), + delay: int = 0 + ) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} = + + let + bootNodeKey = PrivateKey.fromHex( + "a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617") + .expect("Valid private key hex") + bootNodeAddr = localAddress(20301) + bootNode = initDiscoveryNode(rng, bootNodeKey, bootNodeAddr, @[]) # just a shortcut for new and open + + #waitFor bootNode.bootstrap() # immediate, since no bootnodes are defined above + + var res = await bootstrapNodes(nodecount - 1, + @[bootnode.localNode.record], + rng, + delay) + res.insert((bootNode, bootNodeKey), 0) + return res + +proc toNodeId(data: openArray[byte]): NodeId = + readUintBE[256](keccak256.digest(data).data) + +proc segmentData(s: int, segmentsize: int) : seq[byte] = + result = newSeq[byte](segmentsize) + var + r = s + i = 0 + while r > 0: + assert(i= all div 10: # add better algo selector + var generated = newSeq[bool](all) # Initialized to false. + while count != 0: + let n = rand(s) + if not generated[n - s.a]: + generated[n - s.a] = true + result.add n + dec count + else: + while count != 0: + let n = rand(s) + if not (n in result): + result.add n + dec count + + +when isMainModule: + proc main() {.async.} = + let + nodecount = 100 + delay_pernode = 10 # in millisec + delay_init = 15*1000 # in millisec + blocksize = 256 + segmentsize = 2 + samplesize = 3 + upload_timeout = 5.seconds + sampling_timeout = 5.seconds + assert(log2(blocksize.float).ceil.int <= segmentsize * 8 ) + assert(samplesize <= blocksize) + + var + segmentIDs = newSeq[NodeId](blocksize) + + # start network + let + rng = newRng() + nodes = await bootstrapNetwork(nodecount=nodecount, delay=delay_pernode) + + # wait for network to settle + await sleepAsync(chronos.milliseconds(delay_init)) + + # generate block and push data + info "starting upload to DHT" + let startTime = Moment.now() + var futs = newSeq[Future[seq[Node]]]() + for s in 0 ..< blocksize: + let + segment = segmentData(s, segmentsize) + key = toNodeId(segment) + + segmentIDs[s] = key + + futs.add(nodes[0][0].addValue(key, segment)) + + let pass = await allFutures(futs).withTimeout(upload_timeout) + info "uploaded to DHT", by = 0, pass, time = Moment.now() - startTime + + # sample + proc startSamplingDA(n: discv5_protocol.Protocol): seq[Future[DiscResult[seq[byte]]]] = + ## Generate random sample and start the sampling process + var futs = newSeq[Future[DiscResult[seq[byte]]]]() + + let sample = sample(0 ..< blocksize, samplesize) + debug "starting sampling", by = n, sample + for s in sample: + let fut = n.getValue(segmentIDs[s]) + futs.add(fut) + return futs + + proc sampleDA(n: discv5_protocol.Protocol): Future[(bool, int, Duration)] {.async.} = + ## Sample and return detailed results of sampling + let startTime = Moment.now() + var futs = startSamplingDA(n) + + # test is passed if all segments are retrieved in time + let pass = await allFutures(futs).withTimeout(sampling_timeout) + var passcount: int + for f in futs: + if f.finished(): + passcount += 1 + + let time = Moment.now() - startTime + info "sample", by = n.localNode, pass, cnt = passcount, time + return (pass, passcount, time) + + # all nodes start sampling in parallel + var samplings = newSeq[Future[(bool, int, Duration)]]() + for n in 1 ..< nodecount: + samplings.add(sampleDA(nodes[n][0])) + await allFutures(samplings) + + # print statistics + var + passed = 0 + for f in samplings: + if f.finished(): + let (pass, passcount, time) = await f + passed += pass.int + debug "sampleStats", pass, cnt = passcount, time + else: + error "This should not happen!" + info "sampleStats", passed, total = samplings.len, ratio = passed/samplings.len + + waitfor main() + +# proc teardownAll() = +# for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here +# await n.closeWait() + + diff --git a/libp2pdht.nimble b/libp2pdht.nimble index 147c78d2..9d403a5d 100644 --- a/libp2pdht.nimble +++ b/libp2pdht.nimble @@ -6,11 +6,12 @@ description = "DHT based on the libp2p Kademlia spec" license = "MIT" skipDirs = @["tests"] +bin = @["das"] # Dependencies requires "nim >= 1.2.0", "nimcrypto >= 0.5.4 & < 0.6.0", - "bearssl#head", + "bearssl#f4c4233de453cb7eac0ce3f3ffad6496295f83ab", "chronicles >= 0.10.2 & < 0.11.0", "chronos >= 3.0.11 & < 3.1.0", "libp2p#unstable", @@ -21,7 +22,8 @@ requires "nim >= 1.2.0", "stint", "asynctest >= 0.3.1 & < 0.4.0", "https://github.com/status-im/nim-datastore#head", - "questionable" + "questionable", + "datastore" task coverage, "generates code coverage report": var (output, exitCode) = gorgeEx("which lcov") diff --git a/libp2pdht/dht.nim b/libp2pdht/dht.nim index fb62ef87..40e36c92 100644 --- a/libp2pdht/dht.nim +++ b/libp2pdht/dht.nim @@ -1,4 +1,6 @@ import - ./dht/[providers_encoding, providers_messages] + ./dht/[providers_encoding, providers_messages], + ./dht/[value_encoding, value_messages] -export providers_encoding, providers_messages \ No newline at end of file +export providers_encoding, providers_messages +export value_encoding, value_messages \ No newline at end of file diff --git a/libp2pdht/dht/value_encoding.nim b/libp2pdht/dht/value_encoding.nim new file mode 100644 index 00000000..c21d25d3 --- /dev/null +++ b/libp2pdht/dht/value_encoding.nim @@ -0,0 +1,77 @@ +import + ../discv5/[node], + libp2p/protobuf/minprotobuf, + ./value_messages + +func getField(pb: ProtoBuffer, field: int, + nid: var NodeId): ProtoResult[bool] {.inline.} = + ## Read ``NodeId`` from ProtoBuf's message and validate it + var buffer: seq[byte] + let res = ? pb.getField(field, buffer) + if not(res): + ok(false) + else: + nid = readUintBE[256](buffer) + ok(true) + +func write(pb: var ProtoBuffer, field: int, nid: NodeId) = + ## Write NodeId value ``nodeid`` to object ``pb`` using ProtoBuf's encoding. + write(pb, field, nid.toBytesBE()) + +proc decode*( + T: typedesc[AddValueMessage], + buffer: openArray[byte]): Result[AddValueMessage, ProtoError] = + + let pb = initProtoBuffer(buffer) + var msg = AddValueMessage() + + ? pb.getRequiredField(1, msg.cId) + ? pb.getRequiredField(2, msg.value) + + ok(msg) + +proc encode*(msg: AddValueMessage): seq[byte] = + var pb = initProtoBuffer() + + pb.write(1, msg.cId) + pb.write(2, msg.value) + + pb.finish() + pb.buffer + +proc decode*( + T: typedesc[GetValueMessage], + buffer: openArray[byte]): Result[GetValueMessage, ProtoError] = + + let pb = initProtoBuffer(buffer) + var msg = GetValueMessage() + + ? pb.getRequiredField(1, msg.cId) + + ok(msg) + +proc encode*(msg: GetValueMessage): seq[byte] = + var pb = initProtoBuffer() + + pb.write(1, msg.cId) + + pb.finish() + pb.buffer + +proc decode*( + T: typedesc[ValueMessage], + buffer: openArray[byte]): Result[ValueMessage, ProtoError] = + + let pb = initProtoBuffer(buffer) + var msg = ValueMessage() + ? pb.getRequiredField(1, msg.value) + + ok(msg) + +proc encode*(msg: ValueMessage): seq[byte] = + var pb = initProtoBuffer() + + pb.write(1, msg.value) + + pb.finish() + pb.buffer diff --git a/libp2pdht/dht/value_messages.nim b/libp2pdht/dht/value_messages.nim new file mode 100644 index 00000000..57e63622 --- /dev/null +++ b/libp2pdht/dht/value_messages.nim @@ -0,0 +1,14 @@ +import + ../discv5/[node] + +type + AddValueMessage* = object + cId*: NodeId + value*: seq[byte] + + GetValueMessage* = object + cId*: NodeId + + ValueMessage* = object + #total*: uint32 + value*: seq[byte] diff --git a/libp2pdht/private/eth/p2p/discoveryv5/chronosim.nim b/libp2pdht/private/eth/p2p/discoveryv5/chronosim.nim new file mode 100644 index 00000000..01b85300 --- /dev/null +++ b/libp2pdht/private/eth/p2p/discoveryv5/chronosim.nim @@ -0,0 +1,114 @@ +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +# ChronoSim: simulation/emulation wrapper around Chronos + +import + std/[tables, deques, random], + chronos, + chronicles + +logScope: + topics = "ChronoSim" + +const + timeWarp = 1 + emulateDatagram = true + +# chronos uses SomeIntegerI64. We shoudl be more specific here to override +proc milliseconds*(v: int): Duration {.inline.} = + chronos.milliseconds(v * timeWarp) + +proc seconds*(v: int): Duration {.inline.} = + chronos.seconds(v * timeWarp) + +when(emulateDatagram): #enable network emulator + type + DatagramCallback* = proc(transp: DatagramTransport, + remote: TransportAddress): Future[void] {. + gcsafe, raises: [Defect].} + + DatagramTransport* = ref object + udata*: pointer # User-driven pointer + local: TransportAddress # Local address + callback: DatagramCallback # Receive data callback + ingress: Deque[seq[byte]] + egress: Deque[(TransportAddress, seq[byte])] # simple FIFO for now + + var network = initTable[Port, DatagramTransport]() + + proc `$`*(transp: DatagramTransport): string = + $transp.local + + proc recvFrom[T](transp: DatagramTransport, remote: TransportAddress, + msg: sink seq[T], msglen = -1) = + trace "recv:", src = remote, dst = transp.local + {.gcsafe.}: + transp.ingress.addLast(msg) + # call the callback on remote + asyncCheck transp.callback(transp, remote) + + proc getLatency(src: TransportAddress, dst: TransportAddress) : Duration = + 50.milliseconds + + proc getLoss(src: TransportAddress, dst: TransportAddress) : float = + 0.0 + proc getLineTime(transp: DatagramTransport, msg: seq[byte]) : Duration = + # let bandwith = transp.bandwidth + let bandwidth = 100 # Bytes/ms = KB/sec + (msg.len div bandwidth).milliseconds + + proc sendTo*[T](transp: DatagramTransport, remote: TransportAddress, + msg: sink seq[T], msglen = -1) {.async.} = + trace "send:", src = transp.local, dst = remote + + #transp.egress.addLast(remote, msg) + #await sleepAsync(getLineTime(transp, msg)) + + if rand(1.0) < getLoss(transp.local, remote): + return + + await sleepAsync(getLatency(transp.local, remote)) + {.gcsafe.}: + network[remote.port].recvFrom(transp.local, msg) + + proc getMessage*(t: DatagramTransport,): seq[byte] {. + raises: [Defect, CatchableError].} = + #echo "getMessage " + t.ingress.popFirst() + + proc close*(transp: DatagramTransport) = + debug "close" + + proc closed*(transp: DatagramTransport): bool {.inline.} = + result = false + + proc closeWait*(transp: DatagramTransport) {.async.} = + debug "closeWait " + + proc getUserData*[T](transp: DatagramTransport): T {.inline.} = + ## Obtain user data stored in ``transp`` object. + result = cast[T](transp.udata) + + proc newDatagramTransport*[T](cbproc: DatagramCallback, + udata: ref T, + local: TransportAddress = AnyAddress, + ): DatagramTransport {. + raises: [Defect, CatchableError].} = + debug "new" + result = DatagramTransport() + GC_ref(udata) + result.udata = cast[pointer](udata) + result.local = local + result.callback = cbproc + {.gcsafe.}: + network[local.port] = result + +export seconds, milliseconds +export TransportAddress, initTAddress +export async, sleepAsync, complete, await +export Future, FutureBase, newFuture, futureContinue +export TransportOsError \ No newline at end of file diff --git a/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim b/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim index 3c6f6e66..df12406f 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim @@ -24,6 +24,7 @@ import libp2p/signed_envelope, metrics, nimcrypto, + nop, "."/[messages, messages_encoding, node, spr, hkdf, sessions], "."/crypto @@ -39,6 +40,9 @@ declareCounter discovery_session_decrypt_failures, "Session decrypt failures" logScope: topics = "discv5" +type + cipher = nop128 + const version: uint16 = 1 idSignatureText = "discovery v5 identity proof" @@ -161,7 +165,7 @@ proc deriveKeys*(n1, n2: NodeId, priv: PrivateKey, pub: PublicKey, ok secrets proc encryptGCM*(key: AesKey, nonce, pt, authData: openArray[byte]): seq[byte] = - var ectx: GCM[aes128] + var ectx: GCM[cipher] ectx.init(key, nonce, authData) result = newSeq[byte](pt.len + gcmTagSize) ectx.encrypt(pt, result) @@ -174,7 +178,7 @@ proc decryptGCM*(key: AesKey, nonce, ct, authData: openArray[byte]): debug "cipher is missing tag", len = ct.len return - var dctx: GCM[aes128] + var dctx: GCM[cipher] dctx.init(key, nonce, authData) var res = newSeq[byte](ct.len - gcmTagSize) var tag: array[gcmTagSize, byte] @@ -188,7 +192,7 @@ proc decryptGCM*(key: AesKey, nonce, ct, authData: openArray[byte]): return some(res) proc encryptHeader*(id: NodeId, iv, header: openArray[byte]): seq[byte] = - var ectx: CTR[aes128] + var ectx: CTR[cipher] ectx.init(id.toByteArrayBE().toOpenArray(0, 15), iv) result = newSeq[byte](header.len) ectx.encrypt(header, result) @@ -370,7 +374,7 @@ proc decodeHeader*(id: NodeId, iv, maskedHeader: openArray[byte]): DecodeResult[(StaticHeader, seq[byte])] = # No need to check staticHeader size as that is included in minimum packet # size check in decodePacket - var ectx: CTR[aes128] + var ectx: CTR[cipher] ectx.init(id.toByteArrayBE().toOpenArray(0, aesKeySize - 1), iv) # Decrypt static-header part of the header var staticHeader = newSeq[byte](staticHeaderSize) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim index da3cdb04..1bbfa9f4 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim @@ -17,9 +17,11 @@ import bearssl/rand, ./spr, ./node, - ../../../../dht/providers_messages + ../../../../dht/providers_messages, + ../../../../dht/value_messages export providers_messages +export value_messages type MessageKind* {.pure.} = enum @@ -41,6 +43,9 @@ type addProvider = 0x0B getProviders = 0x0C providers = 0x0D + addValue = 0x0E + getValue = 0x0F + respValue = 0x10 findNodeFast = 0x83 RequestId* = object @@ -79,7 +84,8 @@ type SomeMessage* = PingMessage or PongMessage or FindNodeMessage or NodesMessage or TalkReqMessage or TalkRespMessage or AddProviderMessage or GetProvidersMessage or - ProvidersMessage or FindNodeFastMessage + ProvidersMessage or FindNodeFastMessage or + AddValueMessage or GetValueMessage or ValueMessage Message* = object reqId*: RequestId @@ -112,6 +118,12 @@ type getProviders*: GetProvidersMessage of providers: provs*: ProvidersMessage + of addValue: + addValue*: AddValueMessage + of getValue: + getValue*: GetValueMessage + of respValue: + value*: ValueMessage else: discard @@ -126,6 +138,9 @@ template messageKind*(T: typedesc[SomeMessage]): MessageKind = elif T is AddProviderMessage: MessageKind.addProvider elif T is GetProvidersMessage: MessageKind.getProviders elif T is ProvidersMessage: MessageKind.providers + elif T is AddValueMessage: MessageKind.addValue + elif T is GetValueMessage: MessageKind.getValue + elif T is ValueMessage: MessageKind.respValue proc hash*(reqId: RequestId): Hash = hash(reqId.id) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim b/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim index 6de65961..4d6d989e 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim @@ -14,7 +14,8 @@ import libp2p/routing_record, libp2p/signed_envelope, "."/[messages, spr, node], - ../../../../dht/providers_encoding + ../../../../dht/providers_encoding, + ../../../../dht/value_encoding from stew/objects import checkedEnumAssign @@ -434,6 +435,30 @@ proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] = else: return err("Unable to decode ProvidersMessage") + of addValue: + let res = AddValueMessage.decode(encoded) + if res.isOk: + message.addValue = res.get + return ok(message) + else: + return err "Unable to decode AddValueMessage" + + of getValue: + let res = GetValueMessage.decode(encoded) + if res.isOk: + message.getValue = res.get + return ok(message) + else: + return err("Unable to decode GetValueMessage") + + of respValue: + let res = ValueMessage.decode(encoded) + if res.isOk: + message.value = res.get + return ok(message) + else: + return err("Unable to decode ValueMessage") + of regTopic, ticket, regConfirmation, topicQuery: # We just pass the empty type of this message without attempting to # decode, so that the protocol knows what was received. diff --git a/libp2pdht/private/eth/p2p/discoveryv5/nop.nim b/libp2pdht/private/eth/p2p/discoveryv5/nop.nim new file mode 100644 index 00000000..1c8394d2 --- /dev/null +++ b/libp2pdht/private/eth/p2p/discoveryv5/nop.nim @@ -0,0 +1,74 @@ +# +# +# NimCrypto +# (c) Copyright 2023 Status +# +# See the file "LICENSE", included in this +# distribution, for details about the copyright. +# + +## This module implements a no-op(NOP) crypto that des nothing, +## Do not use for anything else than testing. + +{.deadCodeElim:on.} + +when sizeof(int) == 4: + type + NopContext[bits: static[uint]] = object + skey: array[120, uint32] + nr: int +elif sizeof(int) == 8: + type + NopContext[bits: static[uint]] = object + skey: array[120, uint64] + nr: int + +type + nop128* = NopContext[128] + nop192* = NopContext[192] + nop256* = NopContext[256] + nop* = nop128 | nop192 | nop256 + +proc encrypt*(ctx: NopContext, input: openarray[byte], + output: var openarray[byte]) = + for i, v in input: + output[i] = v + +proc decrypt*(ctx: NopContext, input: openarray[byte], + output: var openarray[byte]) = + for i, v in input: + output[i] = v + +template sizeKey*(ctx: NopContext): int = + (ctx.bits div 8) + +template sizeBlock*(ctx: NopContext): int = + (16) + +template sizeKey*(r: typedesc[nop]): int = + when r is nop128: + (16) + elif r is nop192: + (24) + elif r is nop256: + (32) + +template sizeBlock*(r: typedesc[nop]): int = + (16) + +proc init*(ctx: var NopContext, key: openarray[byte]) {.inline.} = + discard + +proc init*(ctx: var NopContext, key: ptr byte, nkey: int = 0) {.inline.} = + discard + +proc clear*(ctx: var NopContext) {.inline.} = + discard + +proc encrypt*(ctx: var NopContext, inbytes: ptr byte, + outbytes: ptr byte) {.inline.} = + outbytes = inbytes + +proc decrypt*(ctx: var NopContext, inbytes: ptr byte, + outbytes: ptr byte) {.inline.} = + outbytes = inbytes diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index fb0dd2b2..ca6133ba 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -162,6 +162,8 @@ type transport*: Transport[Protocol] # exported for tests routingTable*: RoutingTable awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] + # awaitedNodesMessages: Table[(NodeId, RequestId), (Future[DiscResult[seq[SignedPeerRecord]]], int, seq[SignedPeerRecord])] # for some reason DiscResult is not compiling here, needs to be expanded + awaitedNodesMessages: Table[(NodeId, RequestId), (Future[Result[seq[SignedPeerRecord],cstring]], uint32, seq[SignedPeerRecord])] refreshLoop: Future[void] revalidateLoop: Future[void] ipMajorityLoop: Future[void] @@ -172,6 +174,7 @@ type talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of rng*: ref HmacDrbgContext providers: ProvidersManager + valueStore: Table[NodeId, seq[byte]] TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] {.gcsafe, raises: [Defect].} @@ -181,6 +184,9 @@ type DiscResult*[T] = Result[T, cstring] +func `$`*(d: Protocol): string = + $d.localNode.id + const defaultDiscoveryConfig* = DiscoveryConfig( tableIpLimits: DefaultTableIpLimits, @@ -397,6 +403,38 @@ proc handleGetProviders( let response = ProvidersMessage(total: 1, provs: provs.get) d.sendResponse(fromId, fromAddr, response, reqId) +proc addValueLocal(p: Protocol, cId: NodeId, value: seq[byte]) {.async.} = + trace "adding value to local db", n = p.localNode, cId, value + + p.valueStore[cId] = value + +proc handleAddValue( + d: Protocol, + fromId: NodeId, + fromAddr: Address, + addValue: AddValueMessage, + reqId: RequestId) = + asyncSpawn d.addValueLocal(addValue.cId, addValue.value) + +proc handleGetValue( + d: Protocol, + fromId: NodeId, + fromAddr: Address, + getValue: GetValueMessage, + reqId: RequestId) {.async.} = + + try: + let value = d.valueStore[getValue.cId] + trace "retrieved value from local db", n = d.localNode, cID = getValue.cId, value + ##TODO: handle multiple messages? + let response = ValueMessage(value: value) + d.sendResponse(fromId, fromAddr, response, reqId) + + except KeyError: + # should we respond here? I would say so + trace "no value in local db", n = d.localNode, cID = getValue.cId + # TODO: add noValue response + proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, message: Message) = case message.kind @@ -419,11 +457,38 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, of getProviders: discovery_message_requests_incoming.inc() asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) + of addValue: + discovery_message_requests_incoming.inc() + #discovery_message_requests_incoming.inc(labelValues = ["no_response"]) + d.handleAddValue(srcId, fromAddr, message.addValue, message.reqId) + of getValue: + discovery_message_requests_incoming.inc() + asyncSpawn d.handleGetValue(srcId, fromAddr, message.getValue, message.reqId) of regTopic, topicQuery: discovery_message_requests_incoming.inc() discovery_message_requests_incoming.inc(labelValues = ["no_response"]) trace "Received unimplemented message kind", kind = message.kind, origin = fromAddr + of nodes: + trace "node-response message received" + + var sprs = message.nodes.sprs + let total = message.nodes.total + trace "waiting for more nodes messages", me=d.localNode, srcId, total + try: + var (waiter, cnt, s) = d.awaitedNodesMessages[(srcId, message.reqId)] + cnt += 1 + s.add(sprs) + d.awaitedNodesMessages[(srcId, message.reqId)] = (waiter, cnt, s) + trace "nodes collected", me=d.localNode, srcId, cnt, s + if cnt == total: + d.awaitedNodesMessages.del((srcId, message.reqId)) + trace "all nodes responses received", me=d.localNode, srcId + waiter.complete(DiscResult[seq[SignedPeerRecord]].ok(s)) + except KeyError: + discovery_unsolicited_messages.inc() + warn "Timed out or unrequested message", kind = message.kind, + origin = fromAddr else: var waiter: Future[Option[Message]] if d.awaitedMessages.take((srcId, message.reqId), waiter): @@ -450,63 +515,20 @@ proc replaceNode(d: Protocol, n: Node) = # peers in the routing table. debug "Message request to bootstrap node failed", src=d.localNode, dst=n - -proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): - Future[Option[Message]] = - result = newFuture[Option[Message]]("waitMessage") - let res = result - let key = (fromNode.id, reqId) - sleepAsync(ResponseTimeout).addCallback() do(data: pointer): - d.awaitedMessages.del(key) - if not res.finished: - res.complete(none(Message)) - d.awaitedMessages[key] = result - -proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): - Future[DiscResult[seq[SignedPeerRecord]]] {.async.} = - ## Wait for one or more nodes replies. - ## - ## The first reply will hold the total number of replies expected, and based - ## on that, more replies will be awaited. - ## If one reply is lost here (timed out), others are ignored too. - ## Same counts for out of order receival. - var op = await d.waitMessage(fromNode, reqId) - if op.isSome: - if op.get.kind == MessageKind.nodes: - var res = op.get.nodes.sprs - let total = op.get.nodes.total - for i in 1 ..< total: - op = await d.waitMessage(fromNode, reqId) - if op.isSome and op.get.kind == MessageKind.nodes: - res.add(op.get.nodes.sprs) - else: - # No error on this as we received some nodes. - break - return ok(res) - else: - discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) - return err("Invalid response to find node message") - else: - discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) - return err("Nodes message not received in time") - -proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T): - RequestId = +proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T, + reqId: RequestId) = let - reqId = RequestId.init(d.rng[]) message = encodeMessage(m, reqId) trace "Send message packet", dstId = toId, toAddr, kind = messageKind(T) discovery_message_requests_outgoing.inc() d.transport.sendMessage(toId, toAddr, message) - return reqId -proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T): - RequestId = +proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T, + reqId: RequestId) = doAssert(toNode.address.isSome()) let - reqId = RequestId.init(d.rng[]) message = encodeMessage(m, reqId) trace "Send message packet", dstId = toNode.id, @@ -514,16 +536,57 @@ proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T): discovery_message_requests_outgoing.inc() d.transport.sendMessage(toNode, message) - return reqId + +proc waitResponse*[T: SomeMessage](d: Protocol, node: Node, msg: T): + Future[Option[Message]] = + let reqId = RequestId.init(d.rng[]) + result = d.waitMessage(node, reqId) + sendRequest(d, node, msg, reqId) + +proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId, timeout = ResponseTimeout): + Future[Option[Message]] = + result = newFuture[Option[Message]]("waitMessage") + let res = result + let key = (fromNode.id, reqId) + sleepAsync(timeout).addCallback() do(data: pointer): + d.awaitedMessages.del(key) + if not res.finished: + res.complete(none(Message)) + d.awaitedMessages[key] = result + +proc waitNodeResponses*[T: SomeMessage](d: Protocol, node: Node, msg: T): + Future[DiscResult[seq[SignedPeerRecord]]] = + let reqId = RequestId.init(d.rng[]) + result = d.waitNodes(node, reqId) + sendRequest(d, node, msg, reqId) + +proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId, timeout = ResponseTimeout): + Future[DiscResult[seq[SignedPeerRecord]]] = + ## Wait for one or more nodes replies. + ## + ## The first reply will hold the total number of replies expected, and based + ## on that, more replies will be awaited. + ## If one reply is lost here (timed out), others are ignored too. + ## Same counts for out of order receival. + ## TODO: these are VERY optimistic assumptions here. We need a short timeout if we collect + + result = newFuture[DiscResult[seq[SignedPeerRecord]]]("waitNodesMessages") + let res = result + let key = (fromNode.id, reqId) + sleepAsync(timeout).addCallback() do(data: pointer): + d.awaitedNodesMessages.del(key) + if not res.finished: + res.complete(DiscResult[seq[SignedPeerRecord]].err("waitNodeMessages timed out")) + d.awaitedNodesMessages[key] = (result, 0.uint32, newSeq[SignedPeerRecord]()) proc ping*(d: Protocol, toNode: Node): Future[DiscResult[PongMessage]] {.async.} = ## Send a discovery ping message. ## ## Returns the received pong message or an error. - let reqId = d.sendRequest(toNode, - PingMessage(sprSeq: d.localNode.record.seqNum)) - let resp = await d.waitMessage(toNode, reqId) + let + msg = PingMessage(sprSeq: d.localNode.record.seqNum) + resp = await d.waitResponse(toNode, msg) if resp.isSome(): if resp.get().kind == pong: @@ -544,8 +607,9 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): ## ## Returns the received nodes or an error. ## Received SPRs are already validated and converted to `Node`. - let reqId = d.sendRequest(toNode, FindNodeMessage(distances: distances)) - let nodes = await d.waitNodes(toNode, reqId) + let + msg = FindNodeMessage(distances: distances) + nodes = await d.waitNodeResponses(toNode, msg) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances) @@ -561,8 +625,9 @@ proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId): ## ## Returns the received nodes or an error. ## Received SPRs are already validated and converted to `Node`. - let reqId = d.sendRequest(toNode, FindNodeFastMessage(target: target)) - let nodes = await d.waitNodes(toNode, reqId) + let + msg = FindNodeFastMessage(target: target) + nodes = await d.waitNodeResponses(toNode, msg) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit) @@ -578,9 +643,9 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): ## Send a discovery talkreq message. ## ## Returns the received talkresp message or an error. - let reqId = d.sendRequest(toNode, - TalkReqMessage(protocol: protocol, request: request)) - let resp = await d.waitMessage(toNode, reqId) + let + msg = TalkReqMessage(protocol: protocol, request: request) + resp = await d.waitResponse(toNode, msg) if resp.isSome(): if resp.get().kind == talkResp: @@ -704,7 +769,8 @@ proc addProvider*( res.add(d.localNode) for toNode in res: if toNode != d.localNode: - discard d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr)) + let reqId = RequestId.init(d.rng[]) + d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr), reqId) else: asyncSpawn d.addProviderLocal(cId, pr) @@ -717,8 +783,7 @@ proc sendGetProviders(d: Protocol, toNode: Node, trace "sendGetProviders", toNode, msg let - reqId = d.sendRequest(toNode, msg) - resp = await d.waitMessage(toNode, reqId) + resp = await d.waitResponse(toNode, msg) if resp.isSome(): if resp.get().kind == MessageKind.providers: @@ -798,6 +863,92 @@ proc getProviders*( return ok res +proc addValue*( + d: Protocol, + cId: NodeId, + value: seq[byte]): Future[seq[Node]] {.async.} = + + var res = await d.lookup(cId) + trace "lookup returned:", res + # TODO: lookup is specified as not returning local, even if that is the closest. Is this OK? + if res.len == 0: + res.add(d.localNode) + for toNode in res: + if toNode != d.localNode: + let reqId = RequestId.init(d.rng[]) + d.sendRequest(toNode, AddValueMessage(cId: cId, value: value), reqId) + else: + asyncSpawn d.addValueLocal(cId, value) + + return res + +proc sendGetValue(d: Protocol, toNode: Node, + cId: NodeId): Future[DiscResult[ValueMessage]] + {.async.} = + let msg = GetValueMessage(cId: cId) + trace "sendGetValue", toNode, msg + + let + resp = await d.waitResponse(toNode, msg) + + if resp.isSome(): + if resp.get().kind == MessageKind.respValue: + d.routingTable.setJustSeen(toNode) + return ok(resp.get().value) + else: + # TODO: do we need to do something when there is an invalid response? + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"]) + return err("Invalid response to GetValue message") + else: + # TODO: do we need to do something when there is no response? + d.replaceNode(toNode) + discovery_message_requests_outgoing.inc(labelValues = ["no_response"]) + return err("GetValue response message not received in time") + +proc getValue*( + d: Protocol, + cId: NodeId, + timeout: Duration = 5000.milliseconds # TODO: not used? + ): Future[DiscResult[seq[byte]]] {.async.} = + + # # What value do we know about? + # var res = await d.getProvidersLocal(cId, maxitems) + # trace "local providers:", prov = res.mapIt(it) + + let nodesNearby = await d.lookup(cId) + trace "nearby:", nodesNearby + var providersFut: seq[Future[DiscResult[ValueMessage]]] + for n in nodesNearby: + if n != d.localNode: + providersFut.add(d.sendGetValue(n, cId)) + + while providersFut.len > 0: + let providersMsg = await one(providersFut) + # trace "Got providers response", providersMsg + + let index = providersFut.find(providersMsg) + if index != -1: + providersFut.del(index) + + let providersMsg2 = await providersMsg + + let providersMsgRes = providersMsg.read + if providersMsgRes.isOk: + let value = providersMsgRes.get.value + var res = value + # TODO: validate result before accepting as the right one + # TODO: cancel pending futures! + return ok res + else: + error "Sending of GetValue message failed", error = providersMsgRes.error + # TODO: should we consider this as an error result if all GetProviders + # requests fail?? + + trace "getValue returned no result", cId + + return err "getValue failed" + proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] {.async.} = ## Query k nodes for the given target, returns all nodes found, including the @@ -937,7 +1088,8 @@ proc revalidateLoop(d: Protocol) {.async.} = ## message. try: while true: - await sleepAsync(milliseconds(d.rng[].rand(RevalidateMax))) + await sleepAsync(milliseconds(RevalidateMax div 2 + d.rng[].rand(RevalidateMax div 2))) + #echo d.localNode.address.get().port, ": ", d.nodesDiscovered() let n = d.routingTable.nodeToRevalidate() if not n.isNil: traceAsyncErrors d.revalidateNode(n) @@ -1031,7 +1183,7 @@ proc newProtocol*( bootstrapRecords: openArray[SignedPeerRecord] = [], previousRecord = none[SignedPeerRecord](), bindPort: Port, - bindIp = IPv4_any(), + bindIp = IPv4_loopback(), enrAutoUpdate = false, config = defaultDiscoveryConfig, rng = newRng(), @@ -1098,7 +1250,7 @@ proc newProtocol*( bindPort: Port, record: SignedPeerRecord, bootstrapRecords: openArray[SignedPeerRecord] = [], - bindIp = IPv4_any(), + bindIp = IPv4_loopback(), config = defaultDiscoveryConfig, rng = newRng(), providers = ProvidersManager.new(SQLiteDatastore.new(Memory) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/providers/common.nim b/libp2pdht/private/eth/p2p/discoveryv5/providers/common.nim index 4edad412..622ef7f0 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/providers/common.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/providers/common.nim @@ -10,11 +10,11 @@ import std/sequtils import std/strutils -import pkg/chronos -import pkg/libp2p -import pkg/datastore -import pkg/questionable -import pkg/questionable/results +import chronos +import libp2p +import datastore +import questionable +import questionable/results import ../node diff --git a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim index 2cf48dfa..d58f8a1d 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim @@ -6,9 +6,9 @@ # Everything below the handling of ordinary messages import - std/[tables, options], + std/[tables, options, deques], bearssl/rand, - chronos, + chronosim, chronicles, libp2p/crypto/crypto, stew/shims/net, @@ -34,6 +34,7 @@ type message: seq[byte] proc sendToA(t: Transport, a: Address, data: seq[byte]) = + trace "Send packet", myport = t.bindAddress.port, address = a let ta = initTAddress(a.ip, a.port) let f = t.transp.sendTo(ta, data) f.callback = proc(data: pointer) {.gcsafe.} = @@ -109,17 +110,17 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = of OrdinaryMessage: if packet.messageOpt.isSome(): let message = packet.messageOpt.get() - trace "Received message packet", srcId = packet.srcId, address = a, + trace "Received message packet", myport = t.bindAddress.port, srcId = packet.srcId, address = a, kind = message.kind, p = $packet t.client.handleMessage(packet.srcId, a, message) else: - trace "Not decryptable message packet received", + trace "Not decryptable message packet received", myport = t.bindAddress.port, srcId = packet.srcId, address = a t.sendWhoareyou(packet.srcId, a, packet.requestNonce, t.client.getNode(packet.srcId)) of Flag.Whoareyou: - trace "Received whoareyou packet", address = a + trace "Received whoareyou packet", myport = t.bindAddress.port, address = a var pr: PendingRequest if t.pendingRequests.take(packet.whoareyou.requestNonce, pr): let toNode = pr.node @@ -141,7 +142,7 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = else: debug "Timed out or unrequested whoareyou packet", address = a of HandshakeMessage: - trace "Received handshake message packet", srcId = packet.srcIdHs, + trace "Received handshake message packet", myport = t.bindAddress.port, srcId = packet.srcIdHs, address = a, kind = packet.message.kind t.client.handleMessage(packet.srcIdHs, a, packet.message) # For a handshake message it is possible that we received an newer SPR. @@ -157,9 +158,9 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = # sending the 'whoareyou' message to. In that case, we can set 'seen' node.seen = true if t.client.addNode(node): - trace "Added new node to routing table after handshake", node + trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered() else: - trace "Packet decoding error", error = decoded.error, address = a + trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a proc processClient[T](transp: DatagramTransport, raddr: TransportAddress): Future[void] {.async.} = @@ -201,7 +202,7 @@ proc newTransport*[T]( privKey: PrivateKey, localNode: Node, bindPort: Port, - bindIp = IPv4_any(), + bindIp = IPv4_loopback(), ## we could use 127.0.0.1 here for local tests rng = newRng()): Transport[T]= # TODO Consider whether this should be a Defect diff --git a/nimble.lock b/nimble.lock index bbeb32cd..40587f1b 100644 --- a/nimble.lock +++ b/nimble.lock @@ -13,24 +13,24 @@ }, "stew": { "version": "0.1.0", - "vcsRevision": "6ad35b876fb6ebe0dfee0f697af173acc47906ee", + "vcsRevision": "e18f5a62af2ade7a1fd1d39635d4e04d944def08", "url": "https://github.com/status-im/nim-stew.git", "downloadMethod": "git", "dependencies": [], "checksums": { - "sha1": "46d58c4feb457f3241e3347778334e325dce5268" + "sha1": "2a80972f66597bf87d820dca8164d89d3bb24c6d" } }, "bearssl": { "version": "0.1.5", - "vcsRevision": "ba80e2a0d7ae8aab666cee013e38ff8d33a3e5e7", + "vcsRevision": "f4c4233de453cb7eac0ce3f3ffad6496295f83ab", "url": "https://github.com/status-im/nim-bearssl", "downloadMethod": "git", "dependencies": [ "unittest2" ], "checksums": { - "sha1": "383abd5becc77bf8e365b780a29d20529e1d9c4c" + "sha1": "dabf4aaac8969fb10281ebd9ff51875d37eeaaa9" } }, "httputils": { @@ -47,7 +47,7 @@ }, "chronos": { "version": "3.0.11", - "vcsRevision": "17fed89c99beac5a92d3668d0d3e9b0e4ac13936", + "vcsRevision": "5b9ec0837c4f6774b68bd9e5ccc36bd79a65e57d", "url": "https://github.com/status-im/nim-chronos.git", "downloadMethod": "git", "dependencies": [ @@ -57,7 +57,7 @@ "unittest2" ], "checksums": { - "sha1": "f6fffc87571e5f76af2a77c4ebcc0e00909ced4e" + "sha1": "f80cedd4b561f5e1dc299afc8babc787c7eb8484" } }, "metrics": { @@ -183,7 +183,7 @@ }, "websock": { "version": "0.1.0", - "vcsRevision": "73edde4417f7b45003113b7a34212c3ccd95b9fd", + "vcsRevision": "7b2ed397d6e4c37ea4df08ae82aeac7ff04cd180", "url": "https://github.com/status-im/nim-websock", "downloadMethod": "git", "dependencies": [ @@ -197,7 +197,7 @@ "zlib" ], "checksums": { - "sha1": "ec2b137543f280298ca48de9ed4461a033ba88d3" + "sha1": "d27f126527be59f5a0dc35303cb37b82d4e2770b" } }, "dnsclient": { @@ -224,8 +224,8 @@ } }, "libp2p": { - "version": "0.0.2", - "vcsRevision": "c7504d2446717a48a79c8b15e0f21bbfc84957ba", + "version": "1.0.0", + "vcsRevision": "a3e9d1ed80c048cd5abc839cbe0863cefcedc702", "url": "https://github.com/status-im/nim-libp2p", "downloadMethod": "git", "dependencies": [ @@ -240,7 +240,7 @@ "websock" ], "checksums": { - "sha1": "ba1aed8860c8771ef23ae7600bbfd459d5651a2c" + "sha1": "65e473566f19f7f9a3529745e7181fb58d30b5ef" } }, "protobuf_serialization": { @@ -269,6 +269,50 @@ "checksums": { "sha1": "0f187a2115315ca898e5f9a30c5e506cf6057062" } + }, + "datastore": { + "version": "0.0.1", + "vcsRevision": "0cde8aeb67c59fd0ac95496dc6b5e1168d6632aa", + "url": "https://github.com/status-im/nim-datastore", + "downloadMethod": "git", + "dependencies": [ + ], + "checksums": { + "sha1": "2c03bb47de97962d2a64be1ed0a8161cd9d65159" + } + }, + "questionable": { + "version": "0.10.6", + "vcsRevision": "30e4184a99c8c1ba329925912d2c5d4b09acf8cc", + "url": "https://github.com/status-im/questionable", + "downloadMethod": "git", + "dependencies": [ + ], + "checksums": { + "sha1": "ca2d1e2e0be6566b4bf13261b29645721d01673d" + } + }, + "upraises": { + "version": "0.1.0", + "vcsRevision": "ff4f8108e44fba9b35cac535ab63d3927e8fd3c2", + "url": "https://github.com/markspanbroek/upraises", + "downloadMethod": "git", + "dependencies": [ + ], + "checksums": { + "sha1": "a0243c8039e12d547dbb2e9c73789c16bb8bc956" + } + }, + "sqlite3-abi": { + "version": "3.34.0", + "vcsRevision": "fda455cfea2df707dde052034411ce63de218453", + "url": "https://github.com/arnetheduck/nim-sqlite3-abi", + "downloadMethod": "git", + "dependencies": [ + ], + "checksums": { + "sha1": "720aaffb34259c1a9dd2239c77ee1fbcc2a41346" + } } } } \ No newline at end of file diff --git a/test_helper.nim b/test_helper.nim new file mode 100644 index 00000000..89f5797b --- /dev/null +++ b/test_helper.nim @@ -0,0 +1,131 @@ +import + bearssl/rand, + chronos, + libp2p/crypto/[crypto, secp], + libp2p/multiaddress, + libp2pdht/discv5/[node, routing_table, spr], + libp2pdht/discv5/crypto as dhtcrypto, + libp2pdht/discv5/protocol as discv5_protocol, + stew/shims/net + +export net + +proc localAddress*(port: int): Address = + Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port)) + +proc example*(T: type PrivateKey, rng: ref HmacDrbgContext): PrivateKey = + PrivateKey.random(rng[]).expect("Valid rng for private key") + +proc example*(T: type NodeId, rng: ref HmacDrbgContext): NodeId = + let + privKey = PrivateKey.example(rng) + pubKey = privKey.getPublicKey.expect("Valid private key for public key") + pubKey.toNodeId().expect("Public key valid for node id") + +proc initDiscoveryNode*( + rng: ref HmacDrbgContext, + privKey: PrivateKey, + address: Address, + bootstrapRecords: openArray[SignedPeerRecord] = [], + localEnrFields: openArray[(string, seq[byte])] = [], + previousRecord = none[SignedPeerRecord]()): + discv5_protocol.Protocol = + # set bucketIpLimit to allow bucket split + let config = DiscoveryConfig.init(1000, 24, 5) + + let protocol = newProtocol( + privKey, + some(address.ip), + some(address.port), + some(address.port), + bindPort = address.port, + bootstrapRecords = bootstrapRecords, + localEnrFields = localEnrFields, + previousRecord = previousRecord, + config = config, + rng = rng) + + protocol.open() + + protocol + +proc nodeIdInNodes*(id: NodeId, nodes: openArray[Node]): bool = + for n in nodes: + if id == n.id: return true + +proc generateNode*(privKey: PrivateKey, port: int = 20302, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node = + + let + port = Port(port) + spr = SignedPeerRecord.init(1, privKey, some(ip), some(port), some(port)) + .expect("Properly intialized private key") + result = newNode(spr).expect("Properly initialized node") + +proc generateNRandomNodes*(rng: ref HmacDrbgContext, n: int): seq[Node] = + var res = newSeq[Node]() + for i in 1..n: + let + privKey = PrivateKey.example(rng) + node = privKey.generateNode() + res.add(node) + res + +proc nodeAndPrivKeyAtDistance*(n: Node, rng: var HmacDrbgContext, d: uint32, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): (Node, PrivateKey) = + while true: + let + privKey = PrivateKey.random(rng).expect("Valid rng for private key") + node = privKey.generateNode(ip = ip) + if logDistance(n.id, node.id) == d: + return (node, privKey) + +proc nodeAtDistance*(n: Node, rng: var HmacDrbgContext, d: uint32, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node = + let (node, _) = n.nodeAndPrivKeyAtDistance(rng, d, ip) + node + +proc nodesAtDistance*( + n: Node, rng: var HmacDrbgContext, d: uint32, amount: int, + ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): seq[Node] = + for i in 0.. 0: - await sleepAsync(chronos.milliseconds(delay)) + try: + let privKey = PrivateKey.example(rng) + let node = initDiscoveryNode(rng, privKey, localAddress(20302 + i), bootnodes) + await node.start() + result.add((node, privKey)) + if delay > 0: + await sleepAsync(chronos.milliseconds(delay)) + except TransportOsError as e: + echo "skipping node ",i ,":", e.msg + #await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs @@ -175,7 +179,12 @@ suite "Providers Tests: two nodes": debug "Providers:", providers check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId) -suite "Providers Tests: 20 nodes": +suite "Providers Tests: many nodes": + + let + nodecount = 1000 + delay_pernode = 10 # in millisec + delay_init = 15*1000 # in millisec var rng: ref HmacDrbgContext @@ -188,19 +197,19 @@ suite "Providers Tests: 20 nodes": setupAll: rng = newRng() - nodes = await bootstrapNetwork(nodecount=20) + nodes = await bootstrapNetwork(nodecount=nodecount, delay=delay_pernode) targetId = NodeId.example(rng) (node0, privKey0) = nodes[0] signedPeerRec0 = privKey0.toSignedPeerRecord peerRec0 = signedPeerRec0.data - await sleepAsync(chronos.seconds(15)) + await sleepAsync(chronos.milliseconds(delay_init)) teardownAll: for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here await n.closeWait() - test "20 nodes, store and retrieve from same": + test $nodecount & " nodes, store and retrieve from same": debug "---- ADDING PROVIDERS ---" let addedTo = await node0.addProvider(targetId, signedPeerRec0) @@ -214,7 +223,7 @@ suite "Providers Tests: 20 nodes": debug "Providers:", providers check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId) - test "20 nodes, retrieve from other": + test $nodecount & " nodes, retrieve from other": debug "---- STARTING PROVIDERS LOOKUP ---" let (node19, _) = nodes[^2] let providersRes = await node19.getProviders(targetId) @@ -224,7 +233,7 @@ suite "Providers Tests: 20 nodes": debug "Providers:", providers check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId) - test "20 nodes, retrieve after bootnodes dies": + test $nodecount & " nodes, retrieve after bootnodes dies": debug "---- KILLING BOOTSTRAP NODE ---" let (node0, _) = nodes[0] let (node18, _) = nodes[^2] @@ -238,3 +247,56 @@ suite "Providers Tests: 20 nodes": let providers = providersRes.get debug "Providers:", providers check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId) + + test $nodecount & " nodes, lookup each other": + debug "---- STARTING NODE LOOKUP ---" + var + tested = 0 + passed = 0 + for (n, _) in nodes[1..^1]: + for (target, _) in nodes[1..^1]: + if n != target: # TODO: fix self-lookup + info "Start lookup", src = n.localNode, dst = target.localNode + let startTime = Moment.now() + let discovered = await n.lookup(target.localNode.id, fast = true) + let pass = (discovered[0] == target.localNode) + info "Lookup", pass, src = n.localNode, dst = target.localNode, time = Moment.now() - startTime + check pass + tested += 1 + passed += int(pass) + info "Lookup ratio", passed, tested + + test $nodecount & " nodes, lookup random": + debug "---- STARTING NODE LOOKUP ---" + var + tested = 0 + passed = 0 + for (n, _) in nodes[1..^1]: + for (target, _) in nodes[1..^1]: + if n != target: # TODO: fix self-lookup + info "Start lookup", src = n.localNode, dst = target.localNode + let startTime = Moment.now() + let discovered = await n.lookup(target.localNode.id, fast = true) + let pass = (discovered[0] == target.localNode) + info "Lookup", pass, src = n.localNode, dst = target.localNode, time = Moment.now() - startTime + check pass + tested += 1 + passed += int(pass) + info "Lookup ratio", passed, tested, ratio = passed/tested + + test $nodecount & " nodes, addValue and getValue": + let + key = NodeId.example(rng) + v = @[byte 1,2,3] + + debug "---- ADDING VALUE ---" + let addedTo = await node0.addValue(key, v) + debug "Value added to: ", addedTo + + debug "---- STARTING VALUE LOOKUP ---" + let res = await node0.getValue(key) + + debug "---- STARTING CHECKS ---" + let v2 = res.get + debug "Value:", v2 + check (v2 == v)