From 93bed24a0b9f2ebf756e1774b0b9e20d2d8adf99 Mon Sep 17 00:00:00 2001 From: 9seconds Date: Fri, 3 Dec 2021 07:39:45 +0300 Subject: [PATCH 1/4] Remove all cleverness that broke uploads --- essentials/conns.go | 2 - mtglib/internal/relay/init.go | 6 +-- mtglib/internal/relay/pools.go | 30 ++++------- mtglib/internal/relay/relay.go | 30 ++++++----- mtglib/internal/relay/relay_test.go | 2 - mtglib/internal/relay/sync_pair.go | 77 ----------------------------- 6 files changed, 24 insertions(+), 123 deletions(-) delete mode 100644 mtglib/internal/relay/sync_pair.go diff --git a/essentials/conns.go b/essentials/conns.go index 432591aa4..1b3cbf052 100644 --- a/essentials/conns.go +++ b/essentials/conns.go @@ -8,14 +8,12 @@ import ( // CloseableReader is a reader interface that can close its reading end. type CloseableReader interface { io.Reader - CloseRead() error } // CloseableWriter is a writer that can close its writing end. type CloseableWriter interface { io.Writer - CloseWrite() error } diff --git a/mtglib/internal/relay/init.go b/mtglib/internal/relay/init.go index 6278c48f4..a0dcb0c3f 100644 --- a/mtglib/internal/relay/init.go +++ b/mtglib/internal/relay/init.go @@ -1,11 +1,7 @@ package relay -import "time" - const ( - copyBufferSize = 64 * 1024 - writerBufferSize = 128 * 1024 - readTimeout = 10 * time.Millisecond + copyBufferSize = 64 * 1024 ) type Logger interface { diff --git a/mtglib/internal/relay/pools.go b/mtglib/internal/relay/pools.go index 2adff99e3..7b9371e2a 100644 --- a/mtglib/internal/relay/pools.go +++ b/mtglib/internal/relay/pools.go @@ -1,31 +1,19 @@ package relay -import ( - "bufio" - "io" - "net" - "sync" -) +import "sync" -var syncPairPool = sync.Pool{ +var copyBufferPool = sync.Pool{ New: func() interface{} { - return &syncPair{ - writer: bufio.NewWriterSize(nil, writerBufferSize), - copyBuf: make([]byte, copyBufferSize), - } + rv := make([]byte, copyBufferSize) + + return &rv }, } -func acquireSyncPair(reader net.Conn, writer io.Writer) *syncPair { - sp := syncPairPool.Get().(*syncPair) // nolint: forcetypeassert - sp.writer.Reset(writer) - sp.reader = reader - - return sp +func acquireCopyBuffer() *[]byte { + return copyBufferPool.Get().(*[]byte) } -func releaseSyncPair(sp *syncPair) { - sp.writer.Reset(nil) - sp.reader = nil - syncPairPool.Put(sp) +func releaseCopyBuffer(buf *[]byte) { + copyBufferPool.Put(buf) } diff --git a/mtglib/internal/relay/relay.go b/mtglib/internal/relay/relay.go index 6f9db4f3d..0350373f5 100644 --- a/mtglib/internal/relay/relay.go +++ b/mtglib/internal/relay/relay.go @@ -4,7 +4,6 @@ import ( "context" "errors" "io" - "sync" "github.com/9seconds/mtg/v2/essentials" ) @@ -22,28 +21,27 @@ func Relay(ctx context.Context, log Logger, telegramConn, clientConn essentials. clientConn.Close() }() - wg := &sync.WaitGroup{} - wg.Add(2) // nolint: gomnd + closeChan := make(chan struct{}) - go pump(log, telegramConn, clientConn, wg, "client -> telegram") + go func() { + defer close(closeChan) + + pump(log, telegramConn, clientConn, "client -> telegram") + }() - pump(log, clientConn, telegramConn, wg, "telegram -> client") + pump(log, clientConn, telegramConn, "telegram -> client") - wg.Wait() + <-closeChan } -func pump(log Logger, src, dst essentials.Conn, wg *sync.WaitGroup, direction string) { - syncer := acquireSyncPair(src, dst) +func pump(log Logger, src, dst essentials.Conn, direction string) { + defer src.CloseRead() // nolint: errcheck + defer dst.CloseWrite() // nolint: errcheck - defer func() { - syncer.Flush() - releaseSyncPair(syncer) - src.CloseRead() // nolint: errcheck - dst.CloseWrite() // nolint: errcheck - wg.Done() - }() + copyBuffer := acquireCopyBuffer() + defer releaseCopyBuffer(copyBuffer) - n, err := syncer.Sync() + n, err := io.CopyBuffer(src, dst, *copyBuffer) switch { case err == nil: diff --git a/mtglib/internal/relay/relay_test.go b/mtglib/internal/relay/relay_test.go index 368469f7d..390af9da8 100644 --- a/mtglib/internal/relay/relay_test.go +++ b/mtglib/internal/relay/relay_test.go @@ -42,14 +42,12 @@ func (suite *RelayTestSuite) TestExit() { suite.telegramConnMock.On("CloseWrite").Return(nil).Once() suite.telegramConnMock.On("Read", mock.Anything).Return(10, io.EOF).Once() suite.telegramConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe() - suite.telegramConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe() suite.clientConnMock.On("Read", mock.Anything).Return(0, io.EOF).Once() suite.clientConnMock.On("Write", mock.Anything).Return(10, io.EOF).Maybe() suite.clientConnMock.On("Close").Return(nil) suite.clientConnMock.On("CloseRead").Return(nil).Once() suite.clientConnMock.On("CloseWrite").Return(nil).Once() - suite.clientConnMock.On("SetReadDeadline", mock.Anything).Return(nil).Maybe() relay.Relay(suite.ctx, suite.loggerMock, suite.telegramConnMock, suite.clientConnMock) } diff --git a/mtglib/internal/relay/sync_pair.go b/mtglib/internal/relay/sync_pair.go deleted file mode 100644 index 035b05e39..000000000 --- a/mtglib/internal/relay/sync_pair.go +++ /dev/null @@ -1,77 +0,0 @@ -package relay - -import ( - "bufio" - "errors" - "fmt" - "io" - "net" - "os" - "sync" - "time" -) - -type syncPair struct { - writer *bufio.Writer - copyBuf []byte - - mutex sync.Mutex - reader net.Conn -} - -func (s *syncPair) Sync() (int64, error) { - return io.CopyBuffer(s, s, s.copyBuf) // nolint: wrapcheck -} - -func (s *syncPair) Read(p []byte) (int, error) { - n, err := s.readBlocking(p, false) - - // nothing has been delivered for readTimeout time. Let's flush. - if errors.Is(err, os.ErrDeadlineExceeded) { - if err := s.Flush(); err != nil { - return 0, fmt.Errorf("cannot flush writer hand-side: %w", err) - } - - return s.readBlocking(p, true) - } - - return n, err -} - -func (s *syncPair) Write(p []byte) (int, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - n, err := s.writer.Write(p) - - // optimization for a case when we have a small package and want to avoid a - // delay in readTimeout. In that case, we assume that peer has finished to - // sent a data it wants to send so we can flush without waiting for anything - // else. - if err == nil && n < copyBufferSize { - err = s.writer.Flush() - } - - return n, err // nolint: wrapcheck -} - -func (s *syncPair) Flush() error { - s.mutex.Lock() - defer s.mutex.Unlock() - - return s.writer.Flush() // nolint: wrapcheck -} - -func (s *syncPair) readBlocking(p []byte, blocking bool) (int, error) { - var deadline time.Time - - if !blocking { - deadline = time.Now().Add(readTimeout) - } - - if err := s.reader.SetReadDeadline(deadline); err != nil { - return 0, fmt.Errorf("cannot set read deadline: %w", err) - } - - return s.reader.Read(p) // nolint: wrapcheck -} From ad8c09a2a3dd46e895cf23129b7f89a4e0869f40 Mon Sep 17 00:00:00 2001 From: 9seconds Date: Fri, 3 Dec 2021 09:26:31 +0300 Subject: [PATCH 2/4] Fix gofumpt cli --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 37cc4e9ac..e0a559bc9 100644 --- a/Makefile +++ b/Makefile @@ -39,7 +39,7 @@ vendor: go.mod go.sum .PHONY: fmt fmt: - @$(GOTOOL) gofumpt -w -s -extra "$(ROOT_DIR)" + @$(GOTOOL) gofumpt -w -extra "$(ROOT_DIR)" .PHONY: test test: From 15bb5be6c41c9d77cbba996c1c6bf11d9dd64f13 Mon Sep 17 00:00:00 2001 From: 9seconds Date: Fri, 3 Dec 2021 09:27:58 +0300 Subject: [PATCH 3/4] Update go mod tidy command --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index e0a559bc9..c54a228ee 100644 --- a/Makefile +++ b/Makefile @@ -95,4 +95,4 @@ install-tools-goreleaser: .bin .PHONY: update-deps update-deps: - @go get -u && go mod tidy + @go get -u && go mod tidy -go=1.17 From 7d38fec74e824e946d2da3a7537e218305659dae Mon Sep 17 00:00:00 2001 From: 9seconds Date: Fri, 3 Dec 2021 09:28:05 +0300 Subject: [PATCH 4/4] Update dependencies --- go.mod | 4 ++-- go.sum | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 2ef0dbca3..ce52f2a4d 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jarcoal/httpmock v1.0.8 github.com/kentik/patricia v0.0.0-20210909164817-21603333b70e github.com/mccutchen/go-httpbin v1.1.1 - github.com/panjf2000/ants/v2 v2.4.6 + github.com/panjf2000/ants/v2 v2.4.7 github.com/pelletier/go-toml v1.9.4 github.com/prometheus/client_golang v1.11.0 github.com/prometheus/common v0.32.1 // indirect @@ -24,7 +24,7 @@ require ( github.com/stretchr/objx v0.3.0 // indirect github.com/stretchr/testify v1.7.0 github.com/tylertreat/BoomFilters v0.0.0-20210315201527-1a82519a3e43 - golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 + golang.org/x/crypto v0.0.0-20211202192323-5770296d904e golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect golang.org/x/sys v0.0.0-20211124211545-fe61309f8881 google.golang.org/protobuf v1.27.1 // indirect diff --git a/go.sum b/go.sum index 0fdfd030d..6e70dd9e7 100644 --- a/go.sum +++ b/go.sum @@ -193,8 +193,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/panjf2000/ants/v2 v2.4.6 h1:drmj9mcygn2gawZ155dRbo+NfXEfAssjZNU1qoIb4gQ= -github.com/panjf2000/ants/v2 v2.4.6/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= +github.com/panjf2000/ants/v2 v2.4.7 h1:MZnw2JRyTJxFwtaMtUJcwE618wKD04POWk2gwwP4E2M= +github.com/panjf2000/ants/v2 v2.4.7/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= @@ -285,8 +285,8 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI= -golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20211202192323-5770296d904e h1:MUP6MR3rJ7Gk9LEia0LP2ytiH6MuCfs7qYz+47jGdD8= +golang.org/x/crypto v0.0.0-20211202192323-5770296d904e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=