diff --git a/Makefile b/Makefile index 37cc4e9ac..c54a228ee 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ vendor: go.mod go.sum .PHONY: fmt fmt: - @$(GOTOOL) gofumpt -w -s -extra "$(ROOT_DIR)" + @$(GOTOOL) gofumpt -w -extra "$(ROOT_DIR)" .PHONY: test test: @@ -95,4 +95,4 @@ install-tools-goreleaser: .bin .PHONY: update-deps update-deps: - @go get -u && go mod tidy + @go get -u && go mod tidy -go=1.17 diff --git a/essentials/conns.go b/essentials/conns.go index 432591aa4..1b3cbf052 100644 --- a/essentials/conns.go +++ b/essentials/conns.go @@ -8,14 +8,12 @@ import ( // CloseableReader is a reader interface that can close its reading end. type CloseableReader interface { io.Reader - CloseRead() error } // CloseableWriter is a writer that can close its writing end. type CloseableWriter interface { io.Writer - CloseWrite() error } diff --git a/go.mod b/go.mod index 2ef0dbca3..ce52f2a4d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jarcoal/httpmock v1.0.8 github.com/kentik/patricia v0.0.0-20210909164817-21603333b70e github.com/mccutchen/go-httpbin v1.1.1 - github.com/panjf2000/ants/v2 v2.4.6 + github.com/panjf2000/ants/v2 v2.4.7 github.com/pelletier/go-toml v1.9.4 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.32.1 // indirect @@ -24,7 +24,7 @@ require ( github.com/stretchr/objx v0.3.0 // indirect github.com/stretchr/testify v1.7.0 github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43 - golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 + golang.org/x/crypto v0.0.0-20211202192323-5770296d904e golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 google.golang.org/protobuf v1.27.1 // indirect diff --git a/go.sum b/go.sum index 0fdfd030d..6e70dd9e7 100644 --- a/go.sum +++ b/go.sum @@ -193,8 +193,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ= -github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= +github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M= +github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= @@ -285,8 +285,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI= -golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20211202192323-5770296d904e h1:MUP6MR3rJ7Gk9LEia0LP2ytiH6MuCfs7qYz+47jGdD8= +golang.org/x/crypto v0.0.0-20211202192323-5770296d904e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= diff --git a/mtglib/internal/relay/init.go b/mtglib/internal/relay/init.go index 6278c48f4..a0dcb0c3f 100644 --- a/mtglib/internal/relay/init.go +++ b/mtglib/internal/relay/init.go @@ -1,11 +1,7 @@ package relay -import "time" - const ( - copyBufferSize = 64 * 1024 - writerBufferSize = 128 * 1024 - readTimeout = 10 * time.Millisecond + copyBufferSize = 64 * 1024 ) type Logger interface { diff --git a/mtglib/internal/relay/pools.go b/mtglib/internal/relay/pools.go index 2adff99e3..7b9371e2a 100644 --- a/mtglib/internal/relay/pools.go +++ b/mtglib/internal/relay/pools.go @@ -1,31 +1,19 @@ package relay -import ( - "bufio" - "io" - "net" - "sync" -) +import "sync" -var syncPairPool = sync.Pool{ +var copyBufferPool = sync.Pool{ New: func() interface{} { - return &syncPair{ - writer: bufio.NewWriterSize(nil, writerBufferSize), - copyBuf: make([]byte, copyBufferSize), - } + rv := make([]byte, copyBufferSize) + + return &rv }, } -func acquireSyncPair(reader net.Conn, writer io.Writer) *syncPair { - sp := syncPairPool.Get().(*syncPair) // nolint: forcetypeassert - sp.writer.Reset(writer) - sp.reader = reader - - return sp +func acquireCopyBuffer() *[]byte { + return copyBufferPool.Get().(*[]byte) } -func releaseSyncPair(sp *syncPair) { - sp.writer.Reset(nil) - sp.reader = nil - syncPairPool.Put(sp) +func releaseCopyBuffer(buf *[]byte) { + copyBufferPool.Put(buf) } diff --git a/mtglib/internal/relay/relay.go b/mtglib/internal/relay/relay.go index 6f9db4f3d..0350373f5 100644 --- a/mtglib/internal/relay/relay.go +++ b/mtglib/internal/relay/relay.go @@ -4,7 +4,6 @@ import ( "context" "errors" "io" - "sync" "github.com/9seconds/mtg/v2/essentials" ) @@ -22,28 +21,27 @@ func Relay(ctx context.Context, log Logger, telegramConn, clientConn essentials. clientConn.Close() }() - wg := &sync.WaitGroup{} - wg.Add(2) // nolint: gomnd + closeChan := make(chan struct{}) - go pump(log, telegramConn, clientConn, wg, "client -> telegram") + go func() { + defer close(closeChan) + + pump(log, telegramConn, clientConn, "client -> telegram") + }() - pump(log, clientConn, telegramConn, wg, "telegram -> client") + pump(log, clientConn, telegramConn, "telegram -> client") - wg.Wait() + <-closeChan } -func pump(log Logger, src, dst essentials.Conn, wg *sync.WaitGroup, direction string) { - syncer := acquireSyncPair(src, dst) +func pump(log Logger, src, dst essentials.Conn, direction string) { + defer src.CloseRead() // nolint: errcheck + defer dst.CloseWrite() // nolint: errcheck - defer func() { - syncer.Flush() - releaseSyncPair(syncer) - src.CloseRead() // nolint: errcheck - dst.CloseWrite() // nolint: errcheck - wg.Done() - }() + copyBuffer := acquireCopyBuffer() + defer releaseCopyBuffer(copyBuffer) - n, err := syncer.Sync() + n, err := io.CopyBuffer(src, dst, *copyBuffer) switch { case err == nil: diff --git a/mtglib/internal/relay/relay_test.go b/mtglib/internal/relay/relay_test.go index 368469f7d..390af9da8 100644 --- a/mtglib/internal/relay/relay_test.go +++ b/mtglib/internal/relay/relay_test.go @@ -42,14 +42,12 @@ func (suite *RelayTestSuite) TestExit() { suite.telegramConnMock.On("CloseWrite").Return(nil).Once() suite.telegramConnMock.On("Read", mock.Anything).Return(10, io.EOF).Once() suite.telegramConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe() - suite.telegramConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe() suite.clientConnMock.On("Read", mock.Anything).Return(0, io.EOF).Once() suite.clientConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe() suite.clientConnMock.On("Close").Return(nil) suite.clientConnMock.On("CloseRead").Return(nil).Once() suite.clientConnMock.On("CloseWrite").Return(nil).Once() - suite.clientConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe() relay.Relay(suite.ctx, suite.loggerMock, suite.telegramConnMock, suite.clientConnMock) } diff --git a/mtglib/internal/relay/sync_pair.go b/mtglib/internal/relay/sync_pair.go deleted file mode 100644 index 035b05e39..000000000 --- a/mtglib/internal/relay/sync_pair.go +++ /dev/null @@ -1,77 +0,0 @@ -package relay - -import ( - "bufio" - "errors" - "fmt" - "io" - "net" - "os" - "sync" - "time" -) - -type syncPair struct { - writer *bufio.Writer - copyBuf []byte - - mutex sync.Mutex - reader net.Conn -} - -func (s *syncPair) Sync() (int64, error) { - return io.CopyBuffer(s, s, s.copyBuf) // nolint: wrapcheck -} - -func (s *syncPair) Read(p []byte) (int, error) { - n, err := s.readBlocking(p, false) - - // nothing has been delivered for readTimeout time. Let's flush. - if errors.Is(err, os.ErrDeadlineExceeded) { - if err := s.Flush(); err != nil { - return 0, fmt.Errorf("cannot flush writer hand-side: %w", err) - } - - return s.readBlocking(p, true) - } - - return n, err -} - -func (s *syncPair) Write(p []byte) (int, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - n, err := s.writer.Write(p) - - // optimization for a case when we have a small package and want to avoid a - // delay in readTimeout. In that case, we assume that peer has finished to - // sent a data it wants to send so we can flush without waiting for anything - // else. - if err == nil && n < copyBufferSize { - err = s.writer.Flush() - } - - return n, err // nolint: wrapcheck -} - -func (s *syncPair) Flush() error { - s.mutex.Lock() - defer s.mutex.Unlock() - - return s.writer.Flush() // nolint: wrapcheck -} - -func (s *syncPair) readBlocking(p []byte, blocking bool) (int, error) { - var deadline time.Time - - if !blocking { - deadline = time.Now().Add(readTimeout) - } - - if err := s.reader.SetReadDeadline(deadline); err != nil { - return 0, fmt.Errorf("cannot set read deadline: %w", err) - } - - return s.reader.Read(p) // nolint: wrapcheck -}