Skip to content

Commit

Permalink
tunnel: add option to write packets in multiple network streams with …
Browse files Browse the repository at this point in the history
…dedicated connections
  • Loading branch information
pymq committed May 27, 2024
1 parent 82d3268 commit cab3468
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 10 deletions.
2 changes: 1 addition & 1 deletion application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func BenchmarkTunnelPackets(b *testing.B) {
peer1.tun.Outbound <- packet
atomic.AddInt64(&packetsSent, 1)
// to have packet_loss at reasonable level (but more than 0)
const sleepEvery = 150
const sleepEvery = 100
if i != 0 && i%sleepEvery == 0 {
time.Sleep(1 * time.Millisecond)
}
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type (
ListenAddresses []string `json:"listenAddresses"`
ReconnectionIntervalSec time.Duration `json:"reconnectionIntervalSec" swaggertype:"primitive,integer"`
AutoAcceptAuthRequests bool `json:"autoAcceptAuthRequests"`

UseDedicatedConnForEachStream bool `json:"useDedicatedConnForEachStream"`
ParallelSendingStreamsCount int `json:"parallelSendingStreamsCount"`
}
VPNConfig struct {
InterfaceName string `json:"interfaceName"`
Expand Down
5 changes: 3 additions & 2 deletions config/other.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func setDefaults(conf *Config, bus awlevent.Bus) {
if conf.P2pNode.ReconnectionIntervalSec == 0 {
conf.P2pNode.ReconnectionIntervalSec = 10
}
if conf.P2pNode.ParallelSendingStreamsCount == 0 {
conf.P2pNode.ParallelSendingStreamsCount = 1
}

// Other
if conf.LoggerLevel == "" {
Expand All @@ -155,8 +158,6 @@ func setDefaults(conf *Config, bus awlevent.Bus) {
if isEmptyConfig {
conf.HttpListenOnAdminHost = true
}
// TODO: remove in next release
conf.HttpListenOnAdminHost = true

if conf.VPNConfig.IPNet == "" {
conf.VPNConfig.IPNet = defaultNetworkSubnet
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/milosgajdos/tenus v0.0.3
github.com/mr-tron/base58 v1.2.0
github.com/multiformats/go-multiaddr v0.12.0
github.com/multiformats/go-multistream v0.5.0
github.com/olekukonko/tablewriter v0.0.5
github.com/quic-go/quic-go v0.39.4
github.com/stretchr/testify v1.8.4
Expand Down Expand Up @@ -106,7 +107,6 @@ require (
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
Expand Down
61 changes: 61 additions & 0 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"errors"
"fmt"
"io"
"net"
"sync"
"sync/atomic"
Expand All @@ -28,6 +29,7 @@ import (
libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/multiformats/go-multiaddr"
msmux "github.com/multiformats/go-multistream"
"go.uber.org/multierr"
)

Expand Down Expand Up @@ -209,6 +211,34 @@ func (p *P2p) NewStream(ctx context.Context, id peer.ID, proto protocol.ID) (net
return p.host.NewStream(ctx, id, proto)
}

func (p *P2p) NewStreamWithDedicatedConn(ctx context.Context, id peer.ID, proto protocol.ID) (network.Stream, error) {
ctx = network.WithUseTransient(ctx, "awl")

// mostly copied from NewStream()
// github.com/libp2p/[email protected]/p2p/host/basic/basic_host.go:634
conn, err := p.host.Network().DialPeer(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to dial: %v", err)
}

stream, err := conn.NewStream(ctx)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("failed to create new stream: %v", err)
}

err = stream.SetProtocol(proto)
if err != nil {
return nil, fmt.Errorf("failed to set protocol to stream: %v", err)
}
lzcon := msmux.NewMSSelect(stream, proto)

return &streamWrapper{
Stream: stream,
rw: lzcon,
}, nil
}

func (p *P2p) IsConnected(peerID peer.ID) bool {
return p.host.Network().Connectedness(peerID) == network.Connected
}
Expand Down Expand Up @@ -376,3 +406,34 @@ func DefaultListenAddrs() []multiaddr.Multiaddr {
multiaddr.StringCast(fmt.Sprintf("/ip6/::/udp/%d/quic-v1", defaultP2pPort)),
}
}

// copied from
// github.com/libp2p/[email protected]/p2p/host/basic/basic_host.go:1050
type streamWrapper struct {
network.Stream
rw io.ReadWriteCloser
}

func (s *streamWrapper) Read(b []byte) (int, error) {
return s.rw.Read(b)
}

func (s *streamWrapper) Write(b []byte) (int, error) {
return s.rw.Write(b)
}

func (s *streamWrapper) Close() error {
return s.rw.Close()
}

func (s *streamWrapper) CloseWrite() error {
// Flush the handshake before closing, but ignore the error. The other
// end may have closed their side for reading.
//
// If something is wrong with the stream, the user will get on error on
// read instead.
if flusher, ok := s.rw.(interface{ Flush() error }); ok {
_ = flusher.Flush()
}
return s.Stream.CloseWrite()
}
10 changes: 6 additions & 4 deletions service/auth_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"sync"
"time"

"github.com/anywherelan/awl/awldns"
"github.com/anywherelan/awl/awlevent"
"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/protocol"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"

"github.com/anywherelan/awl/awldns"
"github.com/anywherelan/awl/awlevent"
"github.com/anywherelan/awl/config"
"github.com/anywherelan/awl/protocol"
)

const (
Expand All @@ -25,6 +26,7 @@ const (
type P2p interface {
ConnectPeer(ctx context.Context, peerID peer.ID) error
NewStream(ctx context.Context, id peer.ID, proto libp2pProtocol.ID) (network.Stream, error)
NewStreamWithDedicatedConn(ctx context.Context, id peer.ID, proto libp2pProtocol.ID) (network.Stream, error)
SubscribeConnectionEvents(onConnected, onDisconnected func(network.Network, network.Conn))
ProtectPeer(id peer.ID)
}
Expand Down
14 changes: 12 additions & 2 deletions service/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,16 @@ func (t *Tunnel) makeTunnelStream(ctx context.Context, peerID peer.ID) (network.
return nil, err
}

stream, err := t.p2p.NewStream(ctx, peerID, protocol.TunnelPacketMethod)
newStreamFunc := t.p2p.NewStream
if t.conf.P2pNode.UseDedicatedConnForEachStream {
newStreamFunc = t.p2p.NewStreamWithDedicatedConn
}

stream, err := newStreamFunc(ctx, peerID, protocol.TunnelPacketMethod)
if err != nil {
return nil, err
}

return stream, nil
}

Expand All @@ -193,7 +199,10 @@ type VpnPeer struct {
// TODO: remove Tunnel from VpnPeer dependencies
func (vp *VpnPeer) Start(t *Tunnel) {
go vp.backgroundInboundHandler(t)
go vp.backgroundOutboundHandler(t)

for i := 0; i < t.conf.P2pNode.ParallelSendingStreamsCount; i++ {
go vp.backgroundOutboundHandler(t)
}
}

func (vp *VpnPeer) Close(t *Tunnel) {
Expand Down Expand Up @@ -282,6 +291,7 @@ func (vp *VpnPeer) backgroundInboundHandler(t *Tunnel) {
t.device.PutTempPacket(packet)
continue
}
// TODO: add batching
err := t.device.WritePacket(packet, vp.localIP)
if err != nil {
t.logger.Warnf("write packet to vpn: %v", err)
Expand Down

0 comments on commit cab3468

Please sign in to comment.