From cab3468d33db698f43982bccec3d4dbd3e7a0944 Mon Sep 17 00:00:00 2001 From: Maxim Merzhanov Date: Mon, 27 May 2024 14:21:24 +0900 Subject: [PATCH] tunnel: add option to write packets in multiple network streams with dedicated connections --- application_test.go | 2 +- config/config.go | 3 +++ config/other.go | 5 ++-- go.mod | 2 +- p2p/p2p.go | 61 ++++++++++++++++++++++++++++++++++++++++++ service/auth_status.go | 10 ++++--- service/tunnel.go | 14 ++++++++-- 7 files changed, 87 insertions(+), 10 deletions(-) diff --git a/application_test.go b/application_test.go index cf9f96b..464800f 100644 --- a/application_test.go +++ b/application_test.go @@ -311,7 +311,7 @@ func BenchmarkTunnelPackets(b *testing.B) { peer1.tun.Outbound <- packet atomic.AddInt64(&packetsSent, 1) // to have packet_loss at reasonable level (but more than 0) - const sleepEvery = 150 + const sleepEvery = 100 if i != 0 && i%sleepEvery == 0 { time.Sleep(1 * time.Millisecond) } diff --git a/config/config.go b/config/config.go index 14d9ed9..c21999c 100644 --- a/config/config.go +++ b/config/config.go @@ -62,6 +62,9 @@ type ( ListenAddresses []string `json:"listenAddresses"` ReconnectionIntervalSec time.Duration `json:"reconnectionIntervalSec" swaggertype:"primitive,integer"` AutoAcceptAuthRequests bool `json:"autoAcceptAuthRequests"` + + UseDedicatedConnForEachStream bool `json:"useDedicatedConnForEachStream"` + ParallelSendingStreamsCount int `json:"parallelSendingStreamsCount"` } VPNConfig struct { InterfaceName string `json:"interfaceName"` diff --git a/config/other.go b/config/other.go index 2c648f1..ada1123 100644 --- a/config/other.go +++ b/config/other.go @@ -144,6 +144,9 @@ func setDefaults(conf *Config, bus awlevent.Bus) { if conf.P2pNode.ReconnectionIntervalSec == 0 { conf.P2pNode.ReconnectionIntervalSec = 10 } + if conf.P2pNode.ParallelSendingStreamsCount == 0 { + conf.P2pNode.ParallelSendingStreamsCount = 1 + } // Other if conf.LoggerLevel == "" { @@ -155,8 +158,6 @@ func setDefaults(conf *Config, bus awlevent.Bus) { if isEmptyConfig { conf.HttpListenOnAdminHost = true } - // TODO: remove in next release - conf.HttpListenOnAdminHost = true if conf.VPNConfig.IPNet == "" { conf.VPNConfig.IPNet = defaultNetworkSubnet diff --git a/go.mod b/go.mod index 9284afd..3b196c9 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/milosgajdos/tenus v0.0.3 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multiaddr v0.12.0 + github.com/multiformats/go-multistream v0.5.0 github.com/olekukonko/tablewriter v0.0.5 github.com/quic-go/quic-go v0.39.4 github.com/stretchr/testify v1.8.4 @@ -106,7 +107,6 @@ require ( github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multihash v0.2.3 // indirect - github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.13.0 // indirect github.com/opencontainers/runtime-spec v1.1.0 // indirect diff --git a/p2p/p2p.go b/p2p/p2p.go index a59fdbc..3a139bd 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "errors" "fmt" + "io" "net" "sync" "sync/atomic" @@ -28,6 +29,7 @@ import ( libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" "github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/multiformats/go-multiaddr" + msmux "github.com/multiformats/go-multistream" "go.uber.org/multierr" ) @@ -209,6 +211,34 @@ func (p *P2p) NewStream(ctx context.Context, id peer.ID, proto protocol.ID) (net return p.host.NewStream(ctx, id, proto) } +func (p *P2p) NewStreamWithDedicatedConn(ctx context.Context, id peer.ID, proto protocol.ID) (network.Stream, error) { + ctx = network.WithUseTransient(ctx, "awl") + + // mostly copied from NewStream() + // github.com/libp2p/go-libp2p@v0.32.2/p2p/host/basic/basic_host.go:634 + conn, err := p.host.Network().DialPeer(ctx, id) + if err != nil { + return nil, fmt.Errorf("failed to dial: %v", err) + } + + stream, err := conn.NewStream(ctx) + if err != nil { + _ = conn.Close() + return nil, fmt.Errorf("failed to create new stream: %v", err) + } + + err = stream.SetProtocol(proto) + if err != nil { + return nil, fmt.Errorf("failed to set protocol to stream: %v", err) + } + lzcon := msmux.NewMSSelect(stream, proto) + + return &streamWrapper{ + Stream: stream, + rw: lzcon, + }, nil +} + func (p *P2p) IsConnected(peerID peer.ID) bool { return p.host.Network().Connectedness(peerID) == network.Connected } @@ -376,3 +406,34 @@ func DefaultListenAddrs() []multiaddr.Multiaddr { multiaddr.StringCast(fmt.Sprintf("/ip6/::/udp/%d/quic-v1", defaultP2pPort)), } } + +// copied from +// github.com/libp2p/go-libp2p@v0.32.2/p2p/host/basic/basic_host.go:1050 +type streamWrapper struct { + network.Stream + rw io.ReadWriteCloser +} + +func (s *streamWrapper) Read(b []byte) (int, error) { + return s.rw.Read(b) +} + +func (s *streamWrapper) Write(b []byte) (int, error) { + return s.rw.Write(b) +} + +func (s *streamWrapper) Close() error { + return s.rw.Close() +} + +func (s *streamWrapper) CloseWrite() error { + // Flush the handshake before closing, but ignore the error. The other + // end may have closed their side for reading. + // + // If something is wrong with the stream, the user will get on error on + // read instead. + if flusher, ok := s.rw.(interface{ Flush() error }); ok { + _ = flusher.Flush() + } + return s.Stream.CloseWrite() +} diff --git a/service/auth_status.go b/service/auth_status.go index 726dd89..0347e12 100644 --- a/service/auth_status.go +++ b/service/auth_status.go @@ -7,14 +7,15 @@ import ( "sync" "time" - "github.com/anywherelan/awl/awldns" - "github.com/anywherelan/awl/awlevent" - "github.com/anywherelan/awl/config" - "github.com/anywherelan/awl/protocol" "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + + "github.com/anywherelan/awl/awldns" + "github.com/anywherelan/awl/awlevent" + "github.com/anywherelan/awl/config" + "github.com/anywherelan/awl/protocol" ) const ( @@ -25,6 +26,7 @@ const ( type P2p interface { ConnectPeer(ctx context.Context, peerID peer.ID) error NewStream(ctx context.Context, id peer.ID, proto libp2pProtocol.ID) (network.Stream, error) + NewStreamWithDedicatedConn(ctx context.Context, id peer.ID, proto libp2pProtocol.ID) (network.Stream, error) SubscribeConnectionEvents(onConnected, onDisconnected func(network.Network, network.Conn)) ProtectPeer(id peer.ID) } diff --git a/service/tunnel.go b/service/tunnel.go index 016a4af..cca6747 100644 --- a/service/tunnel.go +++ b/service/tunnel.go @@ -176,10 +176,16 @@ func (t *Tunnel) makeTunnelStream(ctx context.Context, peerID peer.ID) (network. return nil, err } - stream, err := t.p2p.NewStream(ctx, peerID, protocol.TunnelPacketMethod) + newStreamFunc := t.p2p.NewStream + if t.conf.P2pNode.UseDedicatedConnForEachStream { + newStreamFunc = t.p2p.NewStreamWithDedicatedConn + } + + stream, err := newStreamFunc(ctx, peerID, protocol.TunnelPacketMethod) if err != nil { return nil, err } + return stream, nil } @@ -193,7 +199,10 @@ type VpnPeer struct { // TODO: remove Tunnel from VpnPeer dependencies func (vp *VpnPeer) Start(t *Tunnel) { go vp.backgroundInboundHandler(t) - go vp.backgroundOutboundHandler(t) + + for i := 0; i < t.conf.P2pNode.ParallelSendingStreamsCount; i++ { + go vp.backgroundOutboundHandler(t) + } } func (vp *VpnPeer) Close(t *Tunnel) { @@ -282,6 +291,7 @@ func (vp *VpnPeer) backgroundInboundHandler(t *Tunnel) { t.device.PutTempPacket(packet) continue } + // TODO: add batching err := t.device.WritePacket(packet, vp.localIP) if err != nil { t.logger.Warnf("write packet to vpn: %v", err)