From 4cdcf583c4a57a0498925cf01fffd2b610486f79 Mon Sep 17 00:00:00 2001 From: Vinicius Fortuna Date: Fri, 3 May 2024 13:58:49 -0400 Subject: [PATCH] fix: duplexConn ReadFrom and connect handler initial data coalescing (#229) --- transport/stream.go | 2 +- transport/stream_test.go | 30 ++++++++++++++ x/httpproxy/connect_handler.go | 11 ++++- x/report/report_test.go | 73 +++++++++++++++++----------------- 4 files changed, 78 insertions(+), 38 deletions(-) diff --git a/transport/stream.go b/transport/stream.go index a628d13f..fd3da153 100644 --- a/transport/stream.go +++ b/transport/stream.go @@ -53,7 +53,7 @@ func (dc *duplexConnAdaptor) Write(b []byte) (int, error) { } func (dc *duplexConnAdaptor) ReadFrom(r io.Reader) (int64, error) { // Make sure we prefer ReadFrom. Otherwise io.Copy will try WriteTo first. - if rf, ok := r.(io.ReaderFrom); ok { + if rf, ok := dc.w.(io.ReaderFrom); ok { return rf.ReadFrom(r) } return io.Copy(dc.w, r) diff --git a/transport/stream_test.go b/transport/stream_test.go index 27f51c7e..6964b799 100644 --- a/transport/stream_test.go +++ b/transport/stream_test.go @@ -15,8 +15,10 @@ package transport import ( + "bytes" "context" "errors" + "io" "net" "sync" "syscall" @@ -155,3 +157,31 @@ func TestDialStreamEndpointAddr(t *testing.T) { require.Equal(t, listener.Addr().String(), conn.RemoteAddr().String()) require.Nil(t, conn.Close()) } + +type countWriter struct { + writeCalls, readFromCalls int +} + +func (w *countWriter) Write(b []byte) (int, error) { + w.writeCalls += 1 + return len(b), nil +} + +func (w *countWriter) ReadFrom(r io.Reader) (int64, error) { + w.readFromCalls += 1 + return 0, nil +} + +var _ io.Writer = (*countWriter)(nil) +var _ io.ReaderFrom = (*countWriter)(nil) + +func Test_duplexConnAdaptor_PreferReadFrom(t *testing.T) { + var w countWriter + c := WrapConn(nil, nil, &w) + src := bytes.NewBuffer([]byte("data")) + n, err := c.(io.ReaderFrom).ReadFrom(src) + require.NoError(t, err) + require.Equal(t, 1, w.readFromCalls) + require.Equal(t, 0, w.writeCalls) + require.Equal(t, int64(0), n) +} diff --git a/x/httpproxy/connect_handler.go b/x/httpproxy/connect_handler.go index 25845d25..471cdf5d 100644 --- a/x/httpproxy/connect_handler.go +++ b/x/httpproxy/connect_handler.go @@ -119,7 +119,16 @@ func (h *connectHandler) ServeHTTP(proxyResp http.ResponseWriter, proxyReq *http // Relay data between client and target in both directions. go func() { - io.Copy(targetConn, clientRW) + // io.Copy prefers WriteTo, which clientRW implements. However, + // bufio.ReadWriter.WriteTo issues an empty Write() call, which flushes + // the Shadowsocks IV and connect request, breaking the coalescing with + // the initial data. By preferring ReaderFrom, the coalescing of IV, + // request and initial data is preserved. + if rf, ok := targetConn.(io.ReaderFrom); ok { + rf.ReadFrom(clientRW) + } else { + io.Copy(targetConn, clientRW) + } targetConn.CloseWrite() }() // We can't use io.Copy here because it doesn't call Flush on writes, so the first diff --git a/x/report/report_test.go b/x/report/report_test.go index 79054dc2..7aa3fbe1 100644 --- a/x/report/report_test.go +++ b/x/report/report_test.go @@ -81,43 +81,44 @@ func TestIsSuccess(t *testing.T) { } } -func TestSendReportSuccessfully(t *testing.T) { - var testSetup = ConnectivitySetup{ - Proxy: "testProxy", - Resolver: "8.8.8.8", - Proto: "udp", - Prefix: "HTTP1/1", - } - var testErr = ConnectivityError{ - Op: "read", - PosixError: "ETIMEDOUT", - Msg: "i/o timeout", - } - var testReport = ConnectivityReport{ - Connection: testSetup, - Time: time.Now().UTC().Truncate(time.Second), - DurationMs: 1, - Error: testErr, - } +// TODO(fortuna): Make this work without the external service. +// func TestSendReportSuccessfully(t *testing.T) { +// var testSetup = ConnectivitySetup{ +// Proxy: "testProxy", +// Resolver: "8.8.8.8", +// Proto: "udp", +// Prefix: "HTTP1/1", +// } +// var testErr = ConnectivityError{ +// Op: "read", +// PosixError: "ETIMEDOUT", +// Msg: "i/o timeout", +// } +// var testReport = ConnectivityReport{ +// Connection: testSetup, +// Time: time.Now().UTC().Truncate(time.Second), +// DurationMs: 1, +// Error: testErr, +// } - var r Report = testReport - v, ok := r.(HasSuccess) - if ok { - fmt.Printf("The test report shows success: %v\n", v.IsSuccess()) - } - u, err := url.Parse("https://script.google.com/macros/s/AKfycbzoMBmftQaR9Aw4jzTB-w4TwkDjLHtSfBCFhh4_2NhTEZAUdj85Qt8uYCKCNOEAwCg4/exec") - if err != nil { - t.Errorf("Expected no error, but got: %v", err) - } - c := RemoteCollector{ - CollectorURL: u, - HttpClient: &http.Client{Timeout: 10 * time.Second}, - } - err = c.Collect(context.Background(), r) - if err != nil { - t.Errorf("Expected no error, but got: %v", err) - } -} +// var r Report = testReport +// v, ok := r.(HasSuccess) +// if ok { +// fmt.Printf("The test report shows success: %v\n", v.IsSuccess()) +// } +// u, err := url.Parse("https://script.google.com/macros/s/AKfycbzoMBmftQaR9Aw4jzTB-w4TwkDjLHtSfBCFhh4_2NhTEZAUdj85Qt8uYCKCNOEAwCg4/exec") +// if err != nil { +// t.Errorf("Expected no error, but got: %v", err) +// } +// c := RemoteCollector{ +// CollectorURL: u, +// HttpClient: &http.Client{Timeout: 10 * time.Second}, +// } +// err = c.Collect(context.Background(), r) +// if err != nil { +// t.Errorf("Expected no error, but got: %v", err) +// } +// } func TestSendReportUnsuccessfully(t *testing.T) { var testReport = ConnectivityReport{