Skip to content

Commit

Permalink
feat: plumb through context changes (#459)
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert authored Nov 11, 2021
1 parent 5c90105 commit c6dd285
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 161 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ orbs:
executors:
golang:
docker:
- image: circleci/golang:1.15.5
- image: cimg/go:1.17
resource_class: 2xlarge
ubuntu:
docker:
Expand Down
18 changes: 12 additions & 6 deletions floodsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) []host.Host {
var out []host.Host

for i := 0; i < n; i++ {
netw := swarmt.GenSwarm(t, ctx)
netw := swarmt.GenSwarm(t)
h := bhost.NewBlankHost(netw)
t.Cleanup(func() { h.Close() })
out = append(out, h)
}

Expand Down Expand Up @@ -1140,7 +1141,8 @@ func TestWithInvalidMessageAuthor(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h.Close()
_, err := NewFloodSub(ctx, h, WithMessageAuthor("bogotr0n"))
if err == nil {
t.Fatal("expected error")
Expand All @@ -1155,8 +1157,10 @@ func TestPreconnectedNodes(t *testing.T) {
defer cancel()

// Create hosts
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
defer h2.Close()

opts := []Option{WithDiscovery(&dummyDiscovery{})}
// Setup first PubSub
Expand Down Expand Up @@ -1214,8 +1218,10 @@ func TestDedupInboundStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
defer h2.Close()

_, err := NewFloodSub(ctx, h1)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ require (
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-libp2p-blankhost v0.2.0
github.com/libp2p/go-libp2p-connmgr v0.2.4
github.com/libp2p/go-libp2p-core v0.8.6
github.com/libp2p/go-libp2p-discovery v0.5.1
github.com/libp2p/go-libp2p-swarm v0.5.3
github.com/libp2p/go-libp2p-core v0.11.0
github.com/libp2p/go-libp2p-discovery v0.6.0
github.com/libp2p/go-libp2p-swarm v0.8.0
github.com/libp2p/go-msgio v0.0.6
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multiaddr v0.4.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
)
135 changes: 40 additions & 95 deletions go.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion gossipsub_connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
connmgrs[i] = connmgr.NewConnManager(nHonest, connLimit, 0,
connmgr.DecayerConfig(&decayCfg))

netw := swarmt.GenSwarm(t, ctx)
netw := swarmt.GenSwarm(t)
defer netw.Close()
h := bhost.NewBlankHost(netw, bhost.WithConnectionManager(connmgrs[i]))
honestHosts[i] = h
honestPeers[h.ID()] = struct{}{}
Expand Down
3 changes: 2 additions & 1 deletion gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,7 +1601,8 @@ func TestGossipsubPiggybackControl(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h.Close()
ps := getGossipsub(ctx, h)

blah := peer.ID("bogotr0n")
Expand Down
6 changes: 4 additions & 2 deletions trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,10 @@ func TestRemoteTracer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
h1 := bhost.NewBlankHost(swarmt.GenSwarm(t))
h2 := bhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
defer h2.Close()

mrt := &mockRemoteTracer{}
h1.SetStreamHandler(RemoteTracerProtoID, mrt.handleStream)
Expand Down
103 changes: 52 additions & 51 deletions validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,71 +199,72 @@ func TestValidateOverload(t *testing.T) {
},
}

for _, tc := range tcs {
for tci, tc := range tcs {
t.Run(fmt.Sprintf("%d", tci), func(t *testing.T) {
hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)

hosts := getNetHosts(t, ctx, 2)
psubs := getPubsubs(ctx, hosts)
connect(t, hosts[0], hosts[1])
topic := "foobar"

connect(t, hosts[0], hosts[1])
topic := "foobar"
block := make(chan struct{})

block := make(chan struct{})
err := psubs[1].RegisterTopicValidator(topic,
func(ctx context.Context, from peer.ID, msg *Message) bool {
<-block
return true
},
WithValidatorConcurrency(tc.maxConcurrency))

err := psubs[1].RegisterTopicValidator(topic,
func(ctx context.Context, from peer.ID, msg *Message) bool {
<-block
return true
},
WithValidatorConcurrency(tc.maxConcurrency))

if err != nil {
t.Fatal(err)
}

sub, err := psubs[1].Subscribe(topic)
if err != nil {
t.Fatal(err)
}
if err != nil {
t.Fatal(err)
}

time.Sleep(time.Millisecond * 50)
sub, err := psubs[1].Subscribe(topic)
if err != nil {
t.Fatal(err)
}

if len(tc.msgs) != tc.maxConcurrency+1 {
t.Fatalf("expected number of messages sent to be maxConcurrency+1. Got %d, expected %d", len(tc.msgs), tc.maxConcurrency+1)
}
time.Sleep(time.Millisecond * 50)

p := psubs[0]
if len(tc.msgs) != tc.maxConcurrency+1 {
t.Fatalf("expected number of messages sent to be maxConcurrency+1. Got %d, expected %d", len(tc.msgs), tc.maxConcurrency+1)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
for _, tmsg := range tc.msgs {
select {
case msg := <-sub.ch:
if !tmsg.validates {
t.Log(msg)
t.Error("expected message validation to drop the message because all validator goroutines are taken")
}
case <-time.After(time.Second):
if tmsg.validates {
t.Error("expected message validation to accept the message")
p := psubs[0]

var wg sync.WaitGroup
wg.Add(1)
go func() {
for _, tmsg := range tc.msgs {
select {
case msg := <-sub.ch:
if !tmsg.validates {
t.Log(msg)
t.Error("expected message validation to drop the message because all validator goroutines are taken")
}
case <-time.After(time.Second):
if tmsg.validates {
t.Error("expected message validation to accept the message")
}
}
}
}
wg.Done()
}()
wg.Done()
}()

for _, tmsg := range tc.msgs {
err := p.Publish(topic, tmsg.msg)
if err != nil {
t.Fatal(err)
for _, tmsg := range tc.msgs {
err := p.Publish(topic, tmsg.msg)
if err != nil {
t.Fatal(err)
}
}
}

// wait a bit before unblocking the validator goroutines
time.Sleep(500 * time.Millisecond)
close(block)
// wait a bit before unblocking the validator goroutines
time.Sleep(500 * time.Millisecond)
close(block)

wg.Wait()
wg.Wait()
})
}
}

Expand Down

0 comments on commit c6dd285

Please sign in to comment.