Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Test DHT with large number of nodes #57

Draft
wants to merge 61 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
a86a5d6
warn and skip test nodes on busy ports
cskiraly May 9, 2023
3b71e50
increase node count to 1000
cskiraly May 9, 2023
a3092df
wait more to have better logs
cskiraly May 9, 2023
591b270
log routing table size periodically
cskiraly May 9, 2023
4b49586
improve tracing of message exchange
cskiraly May 10, 2023
ab7edc0
add 10ms of delay between node initializations
cskiraly May 10, 2023
7956898
change revalidateLoop to use [RevalidateMax/2, RevalidateMax]
cskiraly May 10, 2023
b1957f4
Fake DatagramTransport: first compile
cskiraly May 10, 2023
e8b019b
store TransportAddress
cskiraly May 10, 2023
2eef69f
try to add global registry of endpoints
cskiraly May 10, 2023
dcdef89
make it compile with gcsafe
cskiraly May 10, 2023
20ff87b
simplify code changes
cskiraly May 10, 2023
6109cba
index by port
cskiraly May 10, 2023
fd317a5
add simple send functionality
cskiraly May 10, 2023
70c2d28
use 127.0.0.1 for these tests
cskiraly May 10, 2023
eb08cbf
introduce waitResponse wrapper
cskiraly May 10, 2023
349127f
add on/off for network emulator
cskiraly May 10, 2023
8b24458
fixup: move sendRequest forward
cskiraly May 10, 2023
5e0cdd1
remove some echo lines
cskiraly May 10, 2023
cd1303f
rename callback
cskiraly May 10, 2023
3e0c283
add recvFrom to fake DatagramTransport
cskiraly May 10, 2023
23733c1
remove some debug output
cskiraly May 10, 2023
ba0d671
add 50ms simulated network latency
cskiraly May 10, 2023
43d71d4
test: make nodecount and delays configurable
cskiraly May 10, 2023
dcb6c24
generalize p2p delay
cskiraly May 10, 2023
349eb01
add stub for egress queuing
cskiraly May 11, 2023
f58f8cb
introducing ChronoSim
cskiraly May 11, 2023
eb74441
add logscope
cskiraly May 11, 2023
d8b0421
chronosim: use const to configure
cskiraly May 11, 2023
527db66
chronosim: add timewarp
cskiraly May 11, 2023
e934dd7
add random loss
cskiraly May 13, 2023
c048362
dht: waitMessage: expose timeout as parameter, keeping default
cskiraly May 17, 2023
8f7d342
encoding: introducing type cipher=aes128
cskiraly Jun 2, 2023
50dd3cd
encoding: introducing the "nop" cipher
cskiraly Jun 2, 2023
d496a47
changing async logic for nodes (multi) response
cskiraly Jun 2, 2023
45e2dea
chronosim: trace instead of info
cskiraly Jun 2, 2023
17e55d4
add long all-to-all test
cskiraly Jun 2, 2023
e4862cd
fixing bearssl version in lockfile and in nimble file
cskiraly Jun 2, 2023
830247f
fix imports: remove pkg/
cskiraly Jun 5, 2023
9665df6
update .nimble and .lock files to work
cskiraly Jun 5, 2023
6aa2707
just a comment on awaitedNodesMessages
cskiraly Jun 6, 2023
c2bb22a
add DHT storage (addValue/getValue) functionality
cskiraly Jun 6, 2023
575a341
addValue/getValue: add minimal test
cskiraly Jun 6, 2023
1ed7813
DAS: add minimal DAS test
cskiraly Jun 6, 2023
15c45be
start sample downloads in parallel
cskiraly Jun 6, 2023
2a8c7e7
add sampling instead of getting all segments
cskiraly Jun 6, 2023
479d640
upload to DHT in parallel
cskiraly Jun 6, 2023
b277446
better sample generation for small samples
cskiraly Jun 9, 2023
0083e1e
generate segments systematically
cskiraly Jun 12, 2023
2076bcd
das: factorize startSampling procedure
cskiraly Jun 12, 2023
3af8b67
DAS: introduce the sample async procedure
cskiraly Jun 12, 2023
6601186
DAS: run sampling in parallel from each node
cskiraly Jun 12, 2023
c768586
DAS: code cleanup - variables
cskiraly Jun 12, 2023
fe38377
DAS: code cleanup - imports
cskiraly Jun 12, 2023
a05c0f8
DAS: rename to sampleDA
cskiraly Jun 12, 2023
7b5e9d1
DAS: add ratio to stats
cskiraly Jun 12, 2023
12c4c97
DASL check samplesize <= blocksize
cskiraly Jun 12, 2023
8286d8f
DAS: better defaults
cskiraly Jun 12, 2023
3afc006
use ChronoSim version of nim-chronos
cskiraly Jun 12, 2023
d0c777c
making this work with nimble
cskiraly Jun 12, 2023
97d9b37
add minimal README
cskiraly Jun 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README-DAS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# DAS emulator

Emulate DAS DHT behavior, with a few simple assumption
- the block is populated in the DHT by the builder (node 0)
- all nodes start sampling at the same time
- 1-way latency is 50ms (configurable)
- no losses in transmission (configurable)
- scaled down numbers (nodes, blocksize, etc., all configrable)

## Compilation

```
# install Nim 1.6

# install Nimble 0.14+
nimble install nimble

# make sure the newly installed nimble is used
export PATH=~/.nimble/bin:$PATH

# install dependencies
nimble install

# compile and run passing on various flags
nimble run "-d:chronicles_sinks=textlines[stdout,nocolors]" -d:chronicles_log_level=INFO -d:release -d:asyncTimer=virtual das
```
185 changes: 185 additions & 0 deletions das.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import
std/[random, math],
chronicles,
chronos,
libp2pdht/dht,
libp2pdht/discv5/crypto as dhtcrypto,
libp2pdht/discv5/protocol as discv5_protocol,
test_helper

logScope:
topics = "DAS emulator"

proc bootstrapNodes(
nodecount: int,
bootnodes: seq[SignedPeerRecord],
rng = newRng(),
delay: int = 0
) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} =

debug "---- STARTING BOOSTRAPS ---"
for i in 0..<nodecount:
try:
let privKey = PrivateKey.example(rng)
let node = initDiscoveryNode(rng, privKey, localAddress(20302 + i), bootnodes)
await node.start()
result.add((node, privKey))
if delay > 0:
await sleepAsync(chronos.milliseconds(delay))
except TransportOsError as e:
echo "skipping node ",i ,":", e.msg

#await allFutures(result.mapIt(it.bootstrap())) # this waits for bootstrap based on bootENode, which includes bonding with all its ping pongs

proc bootstrapNetwork(
nodecount: int,
rng = newRng(),
delay: int = 0
) : Future[seq[(discv5_protocol.Protocol, PrivateKey)]] {.async.} =

let
bootNodeKey = PrivateKey.fromHex(
"a2b50376a79b1a8c8a3296485572bdfbf54708bb46d3c25d73d2723aaaf6a617")
.expect("Valid private key hex")
bootNodeAddr = localAddress(20301)
bootNode = initDiscoveryNode(rng, bootNodeKey, bootNodeAddr, @[]) # just a shortcut for new and open

#waitFor bootNode.bootstrap() # immediate, since no bootnodes are defined above

var res = await bootstrapNodes(nodecount - 1,
@[bootnode.localNode.record],
rng,
delay)
res.insert((bootNode, bootNodeKey), 0)
return res

proc toNodeId(data: openArray[byte]): NodeId =
readUintBE[256](keccak256.digest(data).data)

proc segmentData(s: int, segmentsize: int) : seq[byte] =
result = newSeq[byte](segmentsize)
var
r = s
i = 0
while r > 0:
assert(i<segmentsize)
result[i] = byte(r mod 256)
r = r div 256
i+=1

proc sample(s: Slice[int], len: int): seq[int] =
# random sample without replacement
# TODO: not the best for small len
assert s.a <= s.b
var all = s.b - s.a + 1
var count = len
if len >= all div 10: # add better algo selector
var generated = newSeq[bool](all) # Initialized to false.
while count != 0:
let n = rand(s)
if not generated[n - s.a]:
generated[n - s.a] = true
result.add n
dec count
else:
while count != 0:
let n = rand(s)
if not (n in result):
result.add n
dec count


when isMainModule:
proc main() {.async.} =
let
nodecount = 100
delay_pernode = 10 # in millisec
delay_init = 15*1000 # in millisec
blocksize = 256
segmentsize = 2
samplesize = 3
upload_timeout = 5.seconds
sampling_timeout = 5.seconds
assert(log2(blocksize.float).ceil.int <= segmentsize * 8 )
assert(samplesize <= blocksize)

var
segmentIDs = newSeq[NodeId](blocksize)

# start network
let
rng = newRng()
nodes = await bootstrapNetwork(nodecount=nodecount, delay=delay_pernode)

# wait for network to settle
await sleepAsync(chronos.milliseconds(delay_init))

# generate block and push data
info "starting upload to DHT"
let startTime = Moment.now()
var futs = newSeq[Future[seq[Node]]]()
for s in 0 ..< blocksize:
let
segment = segmentData(s, segmentsize)
key = toNodeId(segment)

segmentIDs[s] = key

futs.add(nodes[0][0].addValue(key, segment))

let pass = await allFutures(futs).withTimeout(upload_timeout)
info "uploaded to DHT", by = 0, pass, time = Moment.now() - startTime

# sample
proc startSamplingDA(n: discv5_protocol.Protocol): seq[Future[DiscResult[seq[byte]]]] =
## Generate random sample and start the sampling process
var futs = newSeq[Future[DiscResult[seq[byte]]]]()

let sample = sample(0 ..< blocksize, samplesize)
debug "starting sampling", by = n, sample
for s in sample:
let fut = n.getValue(segmentIDs[s])
futs.add(fut)
return futs

proc sampleDA(n: discv5_protocol.Protocol): Future[(bool, int, Duration)] {.async.} =
## Sample and return detailed results of sampling
let startTime = Moment.now()
var futs = startSamplingDA(n)

# test is passed if all segments are retrieved in time
let pass = await allFutures(futs).withTimeout(sampling_timeout)
var passcount: int
for f in futs:
if f.finished():
passcount += 1

let time = Moment.now() - startTime
info "sample", by = n.localNode, pass, cnt = passcount, time
return (pass, passcount, time)

# all nodes start sampling in parallel
var samplings = newSeq[Future[(bool, int, Duration)]]()
for n in 1 ..< nodecount:
samplings.add(sampleDA(nodes[n][0]))
await allFutures(samplings)

# print statistics
var
passed = 0
for f in samplings:
if f.finished():
let (pass, passcount, time) = await f
passed += pass.int
debug "sampleStats", pass, cnt = passcount, time
else:
error "This should not happen!"
info "sampleStats", passed, total = samplings.len, ratio = passed/samplings.len

waitfor main()

# proc teardownAll() =
# for (n, _) in nodes: # if last test is enabled, we need nodes[1..^1] here
# await n.closeWait()


6 changes: 4 additions & 2 deletions libp2pdht.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ description = "DHT based on the libp2p Kademlia spec"
license = "MIT"
skipDirs = @["tests"]

bin = @["das"]

# Dependencies
requires "nim >= 1.2.0",
"nimcrypto >= 0.5.4 & < 0.6.0",
"bearssl#head",
"bearssl#f4c4233de453cb7eac0ce3f3ffad6496295f83ab",
"chronicles >= 0.10.2 & < 0.11.0",
"chronos >= 3.0.11 & < 3.1.0",
"libp2p#unstable",
Expand All @@ -21,7 +22,8 @@ requires "nim >= 1.2.0",
"stint",
"asynctest >= 0.3.1 & < 0.4.0",
"https://github.com/status-im/nim-datastore#head",
"questionable"
"questionable",
"datastore"

task coverage, "generates code coverage report":
var (output, exitCode) = gorgeEx("which lcov")
Expand Down
6 changes: 4 additions & 2 deletions libp2pdht/dht.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import
./dht/[providers_encoding, providers_messages]
./dht/[providers_encoding, providers_messages],
./dht/[value_encoding, value_messages]

export providers_encoding, providers_messages
export providers_encoding, providers_messages
export value_encoding, value_messages
77 changes: 77 additions & 0 deletions libp2pdht/dht/value_encoding.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import
../discv5/[node],
libp2p/protobuf/minprotobuf,
./value_messages

func getField(pb: ProtoBuffer, field: int,
nid: var NodeId): ProtoResult[bool] {.inline.} =
## Read ``NodeId`` from ProtoBuf's message and validate it
var buffer: seq[byte]
let res = ? pb.getField(field, buffer)
if not(res):
ok(false)
else:
nid = readUintBE[256](buffer)
ok(true)

func write(pb: var ProtoBuffer, field: int, nid: NodeId) =
## Write NodeId value ``nodeid`` to object ``pb`` using ProtoBuf's encoding.
write(pb, field, nid.toBytesBE())

proc decode*(
T: typedesc[AddValueMessage],
buffer: openArray[byte]): Result[AddValueMessage, ProtoError] =

let pb = initProtoBuffer(buffer)
var msg = AddValueMessage()

? pb.getRequiredField(1, msg.cId)
? pb.getRequiredField(2, msg.value)

ok(msg)

proc encode*(msg: AddValueMessage): seq[byte] =
var pb = initProtoBuffer()

pb.write(1, msg.cId)
pb.write(2, msg.value)

pb.finish()
pb.buffer

proc decode*(
T: typedesc[GetValueMessage],
buffer: openArray[byte]): Result[GetValueMessage, ProtoError] =

let pb = initProtoBuffer(buffer)
var msg = GetValueMessage()

? pb.getRequiredField(1, msg.cId)

ok(msg)

proc encode*(msg: GetValueMessage): seq[byte] =
var pb = initProtoBuffer()

pb.write(1, msg.cId)

pb.finish()
pb.buffer

proc decode*(
T: typedesc[ValueMessage],
buffer: openArray[byte]): Result[ValueMessage, ProtoError] =

let pb = initProtoBuffer(buffer)
var msg = ValueMessage()
? pb.getRequiredField(1, msg.value)

ok(msg)

proc encode*(msg: ValueMessage): seq[byte] =
var pb = initProtoBuffer()

pb.write(1, msg.value)

pb.finish()
pb.buffer
14 changes: 14 additions & 0 deletions libp2pdht/dht/value_messages.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import
../discv5/[node]

type
AddValueMessage* = object
cId*: NodeId
value*: seq[byte]

GetValueMessage* = object
cId*: NodeId

ValueMessage* = object
#total*: uint32
value*: seq[byte]
Loading