Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

handle relay addr dialing intelligently #118

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 162 additions & 14 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

Expand All @@ -16,6 +17,7 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
transport "github.com/libp2p/go-libp2p-transport"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr-net"
)

// Diagram of dial sync:
Expand Down Expand Up @@ -58,6 +60,10 @@ const ConcurrentFdDials = 160
// per peer
const DefaultPerPeerRateLimit = 8

// this is the protocol code for relay. we can't import go-libp2p-circuit b/c
// of circular dependency.
const P_CIRCUIT = 290

// dialbackoff is a struct used to avoid over-dialing the same, dead peers.
// Whenever we totally time out on a peer (all three attempts), we add them
// to dialbackoff. Then, whenevers goroutines would _wait_ (dialsync), they
Expand Down Expand Up @@ -283,14 +289,6 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
log.Debug("Dial not given PrivateKey, so WILL NOT SECURE conn.")
}

//////
/*
This slice-to-chan code is temporary, the peerstore can currently provide
a channel as an interface for receiving addresses, but more thought
needs to be put into the execution. For now, this allows us to use
the improved rate limiter, while maintaining the outward behaviour
that we previously had (halting a dial when we run out of addrs)
*/
peerAddrs := s.peers.Addrs(p)
if len(peerAddrs) == 0 {
return nil, errors.New("no addresses")
Expand All @@ -299,12 +297,11 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
if len(goodAddrs) == 0 {
return nil, errors.New("no good addresses")
}
goodAddrsChan := make(chan ma.Multiaddr, len(goodAddrs))
for _, a := range goodAddrs {
goodAddrsChan <- a
}
close(goodAddrsChan)
/////////

// schedule the dial
ctx, cancel := context.WithCancel(ctx)
defer cancel()
goodAddrsChan := s.scheduleDialAddrs(ctx, goodAddrs)

// try to get a connection to any addr
connC, err := s.dialAddrs(ctx, p, goodAddrsChan)
Expand Down Expand Up @@ -352,6 +349,157 @@ func (s *Swarm) filterKnownUndialables(addrs []ma.Multiaddr) []ma.Multiaddr {
)
}

func (s *Swarm) scheduleDialAddrs(ctx context.Context, addrs []ma.Multiaddr) <-chan ma.Multiaddr {
var immediateAddrs []ma.Multiaddr
relayAddrs := make(map[peer.ID][]ma.Multiaddr)

for _, a := range addrs {
_, err := a.ValueForProtocol(P_CIRCUIT)
if err != nil {
immediateAddrs = append(immediateAddrs, a)
continue
}

relayS, err := a.ValueForProtocol(ma.P_P2P)
if err != nil {
// uspecfic relay addr; push it to immediate -- note that these addrs should not be
// advertised any more, but we don't want to break older nodes or tests
immediateAddrs = append(immediateAddrs, a)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just drop this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I really don't know what to do with these!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should schedule them for last, after all specific relay addrs have been scheduled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really gain anything? This literally means: dial me through random relays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not.
Only concern is that we do "support" users requesting dials with ipfs swarm connect /p2p-circuit/ipfs/QmTarget, and we may not want to completely break that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gah. What if we just hack that into a higher layer? Or just drop it? Nobody should be relying on that in production and I don't feel all that bad about breaking demos.

Copy link
Contributor Author

@vyzo vyzo Apr 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should drop it -- but we might want to disable circuit.OptDiscovery in ipfs as well.
We can substitute it with /ipfs/QmRelay/p2p-circuit/ipfs/QmTarget anyway.

continue
}

relay, err := peer.IDB58Decode(relayS)
if err != nil {
log.Debugf("malformed relay peer ID %s: %s", relayS, err.Error())
continue
}

relayAddrs[relay] = append(relayAddrs[relay], a)
}

ch := make(chan ma.Multiaddr)
go s.emitDialAddrs(ctx, ch, immediateAddrs, relayAddrs)

return ch
}

func (s *Swarm) emitDialAddrs(ctx context.Context, ch chan ma.Multiaddr, immediate []ma.Multiaddr, relays map[peer.ID][]ma.Multiaddr) {
defer close(ch)

// first issue the immediate addrs without delay; track public addrs to determine the delay
// before dialing relays
isPublic := false
for _, a := range immediate {
select {
case ch <- a:
case <-ctx.Done():
return
}

if manet.IsPublicAddr(a) {
isPublic = true
}
}

if len(relays) == 0 {
return
}

// split relays into connected and unknown
var connected, unknown []peer.ID

for relay := range relays {
if s.Connectedness(relay) == inet.Connected {
connected = append(connected, relay)
} else {
unknown = append(unknown, relay)
}
}

shufflePeers(connected)
shufflePeers(unknown)

// wait for delay before emitting relay addrs
var delay time.Duration
if len(immediate) > 0 {
if isPublic {
delay = 1000 * time.Millisecond
} else {
delay = 250 * time.Millisecond
}
}

if delay > 0 {
select {
case <-time.After(delay):
case <-ctx.Done():
return
}
}

// issue connected relay addrs first, aggregated by relay
for i, relay := range connected {
relayAddr, err := ma.NewMultiaddr(fmt.Sprintf("/p2p/%s/p2p-circuit", relay.Pretty()))
if err != nil {
log.Errorf("Error constructing relay addr: %s", err.Error())
continue
}

select {
case ch <- relayAddr:
case <-ctx.Done():
return
}

if len(unknown) > 0 || i < len(connected)-1 {
delay = 1500 * time.Millisecond
} else {
delay = 0
}

if delay > 0 {
select {
case <-time.After(delay):
case <-ctx.Done():
return
}
}
}

// now issue the addrs for non-connected relays; we use the whole address set
for i, relay := range unknown {
addrs := relays[relay]
for _, a := range addrs {
select {
case ch <- a:
case <-ctx.Done():
return
}
}

if i < len(unknown)-1 {
delay = 2000 * time.Millisecond
} else {
delay = 0
}

if delay > 0 {
select {
case <-time.After(delay):
case <-ctx.Done():
return
}
}
}
}

func shufflePeers(ps []peer.ID) {
for i := range ps {
j := rand.Intn(i + 1)
ps[i], ps[j] = ps[j], ps[i]
}
}

func (s *Swarm) dialAddrs(ctx context.Context, p peer.ID, remoteAddrs <-chan ma.Multiaddr) (transport.Conn, error) {
log.Debugf("%s swarm dialing %s", s.local, p)

Expand Down