Skip to content

Commit

Permalink
rpc: use in-memory buffered stream instead of synchronous net.Pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
zllovesuki committed Mar 29, 2024
1 parent 10b2e93 commit 3b76686
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 31 deletions.
11 changes: 6 additions & 5 deletions gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.miragespace.co/specter/spec/tun"

"github.com/go-chi/chi/v5"
"github.com/iangudger/memnet"
"github.com/libp2p/go-yamux/v4"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
Expand Down Expand Up @@ -393,7 +394,7 @@ func TestH1HTTPFound(t *testing.T) {
testHost := "hello"
testResponse := "this is fine"

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
ch := make(chan net.Conn, 1)
go serveMiniClient(as, ch, testResponse)
defer close(ch)
Expand Down Expand Up @@ -437,7 +438,7 @@ func TestH2HTTPFound(t *testing.T) {
testHost := "hello"
testResponse := "this is fine"

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
ch := make(chan net.Conn, 1)
go serveMiniClient(as, ch, testResponse)
defer close(ch)
Expand Down Expand Up @@ -479,7 +480,7 @@ func TestH3HTTPFound(t *testing.T) {
testHost := "hello"
testResponse := "this is fine from h3"

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
ch := make(chan net.Conn, 1)
go serveMiniClient(as, ch, testResponse)
defer close(ch)
Expand Down Expand Up @@ -625,7 +626,7 @@ func TestH2TCPFound(t *testing.T) {
testHost := "hello"
bufLength := 20

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()

go func() {
tun.SendStatusProto(c2, nil)
Expand Down Expand Up @@ -689,7 +690,7 @@ func TestH3TCPFound(t *testing.T) {
testHost := "hello"
bufLength := 20

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()

go func() {
tun.SendStatusProto(c2, nil)
Expand Down
8 changes: 5 additions & 3 deletions gateway/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.miragespace.co/specter/spec/protocol"
"go.miragespace.co/specter/spec/tun"

"github.com/iangudger/memnet"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestHTTPRedirect(t *testing.T) {
Expand Down Expand Up @@ -54,7 +56,7 @@ func TestHTTPConnectProxy(t *testing.T) {
testHost := "hello"
bufLength := 16

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()

go func() {
tun.SendStatusProto(c2, nil)
Expand Down
3 changes: 2 additions & 1 deletion gateway/internal_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"go.miragespace.co/specter/util/acceptor"

"github.com/go-chi/chi/v5"
"github.com/iangudger/memnet"
"github.com/quic-go/quic-go"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -118,7 +119,7 @@ func TestInternalProxy(t *testing.T) {
acc := acceptor.NewH2Acceptor(nil)
go fakeServer.Serve(acc)

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()

mockS.On("DialInternal", mock.Anything, mock.MatchedBy(func(node *protocol.Node) bool {
return node.GetAddress() == fakeNode.GetAddress() && node.GetId() == fakeNode.GetId()
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/getsentry/sentry-go v0.27.0
github.com/go-chi/chi/v5 v5.0.12
github.com/gorilla/websocket v1.5.1
github.com/iangudger/memnet v0.0.0-20220731214234-823edbed1e9d
github.com/jedib0t/go-pretty/v6 v6.5.6
github.com/libp2p/go-buffer-pool v0.1.0
github.com/libp2p/go-yamux/v4 v4.0.1
Expand Down Expand Up @@ -44,6 +45,8 @@ require (
moul.io/zapfilter v1.7.0
)

replace gitub.com/iangudger/memnet v0.0.0-20220731214234-823edbed1e9d => github.com/miragespace/memnet v0.0.0-20240328231002-8045afef8f5b

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ github.com/google/pprof v0.0.0-20230131232505-5a9e8f65f08f h1:gl1DCiSk+mrXXBGPm6
github.com/google/pprof v0.0.0-20230131232505-5a9e8f65f08f/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/iangudger/memnet v0.0.0-20220731214234-823edbed1e9d h1:zxTobvB1wfgBuesc/+kQhUtgPebXoCm6DCUxj0N4n4g=
github.com/iangudger/memnet v0.0.0-20220731214234-823edbed1e9d/go.mod h1:xfj2/uCICUpyj7m+6A/2pXVpnG+hKxggFBoAuIl6hZc=
github.com/jedib0t/go-pretty/v6 v6.5.6 h1:nKXVLqPfAwY7sWcYXdNZZZ2fjqDpAtj9UeWupgfUxSg=
github.com/jedib0t/go-pretty/v6 v6.5.6/go.mod h1:5LQIxa52oJ/DlDSLv0HEkWOFMDGoWkJb9ss5KqPpJBg=
github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c=
Expand Down
3 changes: 2 additions & 1 deletion integrations/tunnel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/iangudger/memnet"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -413,7 +414,7 @@ func TestIntegrationTunnel(t *testing.T) {
xApp, xLogs := compileApp(client.Generate())
xApp.Metadata["connectOverride"] = hostMap["tcp"]

leftConn, rightConn := net.Pipe()
leftConn, rightConn := memnet.NewBufferedStreamConnPair()
xApp.Metadata[client.PipeInKey] = leftConn
xApp.Metadata[client.PipeOutKey] = leftConn

Expand Down
2 changes: 1 addition & 1 deletion overlay/quic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
KeepAlivePeriod: time.Second * 5,
HandshakeIdleTimeout: timing.TLSHandshakeTimeout,
MaxIdleTimeout: time.Second * 30,
MaxIncomingStreams: 250,
MaxIncomingStreams: 500,
EnableDatagrams: true,
}
)
Expand Down
3 changes: 2 additions & 1 deletion overlay/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.miragespace.co/specter/util/atomic"

"github.com/avast/retry-go/v4"
"github.com/iangudger/memnet"
"github.com/quic-go/quic-go"
"github.com/zhangyunhao116/skipmap"
uberAtomic "go.uber.org/atomic"
Expand Down Expand Up @@ -184,7 +185,7 @@ func (t *QUIC) DialStream(ctx context.Context, peer *protocol.Node, kind protoco
}

if peer.GetAddress() == t.Endpoint.GetAddress() && t.VirtualTransport {
c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
t.streamChan <- &transport.StreamDelegate{
Identity: &protocol.Node{
Address: t.Endpoint.GetAddress(),
Expand Down
3 changes: 2 additions & 1 deletion spec/mocks/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.miragespace.co/specter/spec/protocol"
"go.miragespace.co/specter/spec/transport"

"github.com/iangudger/memnet"
"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -68,7 +69,7 @@ func (t *MemoryTransport) Identity() *protocol.Node {
}

func (t *MemoryTransport) DialStream(ctx context.Context, peer *protocol.Node, kind protocol.Stream_Type) (net.Conn, error) {
c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
select {
case t.Other <- &transport.StreamDelegate{
Conn: c1,
Expand Down
8 changes: 4 additions & 4 deletions spec/proto/transport.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import "spec/proto/node.proto";
message Stream {
enum Type {
UNKNOWN_TYPE = 0;
RPC = 1;
DIRECT = 2;
PROXY = 3;
INTERNAL = 4;
RPC = 1; // used for rpc between nodes and between client/server
DIRECT = 2; // used for establishing the reverse proxy from the tunneling node to client
PROXY = 3; // used when client is not directly connected to the tunneling node
INTERNAL = 4; // used for /_internal endpoint proxying between nodes
}

Type type = 1;
Expand Down
8 changes: 4 additions & 4 deletions spec/protocol/transport.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions spec/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"go.miragespace.co/specter/spec/protocol"
"go.miragespace.co/specter/spec/transport"
"go.miragespace.co/specter/timing"
"go.miragespace.co/specter/util/ratecounter"

pool "github.com/libp2p/go-buffer-pool"
Expand Down Expand Up @@ -76,10 +77,10 @@ func DynamicChordClient(baseContext context.Context, chordTransport transport.Tr

// default to http client pooling
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxConnsPerHost = 15
t.MaxConnsPerHost = 50
t.MaxIdleConnsPerHost = 5
t.DisableCompression = true
t.IdleConnTimeout = time.Minute
t.IdleConnTimeout = timing.RPCIdleTimeout
t.DialTLSContext = getDynamicDialer(baseContext, chordTransport)
c := &http.Client{
Transport: t,
Expand Down Expand Up @@ -118,10 +119,10 @@ func DynamicTunnelClient(baseContext context.Context, tunnelTransport transport.

// default to http client pooling
t := http.DefaultTransport.(*http.Transport).Clone()
t.MaxConnsPerHost = 3
t.MaxConnsPerHost = 10
t.MaxIdleConnsPerHost = 1
t.DisableCompression = true
t.IdleConnTimeout = time.Minute
t.IdleConnTimeout = timing.RPCIdleTimeout
t.DialTLSContext = getDynamicDialer(baseContext, tunnelTransport)
c := &http.Client{
Transport: t,
Expand Down
2 changes: 2 additions & 0 deletions timing/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ const (

ChordRPCTimeout = time.Second * 10
ChordPingTimeout = time.Second * 3

RPCIdleTimeout = time.Second * 10
)
3 changes: 2 additions & 1 deletion tun/server/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.miragespace.co/specter/spec/tun"

"github.com/go-chi/chi/v5"
"github.com/iangudger/memnet"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestHandlerListClientTunnels(t *testing.T) {
hostnameBytes[i] = []byte(h)
}

c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()

clientChan := make(chan *transport.StreamDelegate)
clientT.On("AcceptStream").Return(clientChan)
Expand Down
10 changes: 5 additions & 5 deletions tun/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"net"
"testing"
"time"

Expand All @@ -15,6 +14,7 @@ import (
"go.miragespace.co/specter/spec/transport"
"go.miragespace.co/specter/spec/tun"

"github.com/iangudger/memnet"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestLookupSuccessDirect(t *testing.T) {

// 3. once we figure out that it is connected to us,
// attempt to dial
c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
go func() {
l := &protocol.Link{}
err := rpc.Receive(c2, l)
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestLookupSuccessRemote(t *testing.T) {

// 3. once we figure out that it is NOT connected to us,
// attempt to dial via chord
c1, c2 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
go func() {
// the remote node should receive the bundle
bundle := &protocol.TunnelRoute{}
Expand Down Expand Up @@ -283,8 +283,8 @@ func TestHandleRemoteConnection(t *testing.T) {

// since the "client" is connected to us, we should expect a DialDirect
// to the client
c1, c2 := net.Pipe()
c3, c4 := net.Pipe()
c1, c2 := memnet.NewBufferedStreamConnPair()
c3, c4 := memnet.NewBufferedStreamConnPair()
clientT.On("DialStream", mock.Anything, mock.MatchedBy(func(n *protocol.Node) bool {
return n.GetId() == cli.GetId()
}), protocol.Stream_DIRECT).Return(c3, nil)
Expand Down

0 comments on commit 3b76686

Please sign in to comment.