Skip to content

Commit

Permalink
tcp: add copy closed logs (#144)
Browse files Browse the repository at this point in the history
Adds debug logs when io.Copy returns an error when forwarding traffic
between two TCP conections.

---------

Co-authored-by: Yoofi Quansah <[email protected]>
  • Loading branch information
andydunstall and yquansah authored Jul 26, 2024
1 parent 6c6c2a8 commit a1ecbf5
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 16 deletions.
18 changes: 12 additions & 6 deletions agent/tcpproxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (s *Server) serveConn(c net.Conn) {
}
defer upstream.Close()

forward(c, upstream)
s.forward(c, upstream)
}

func (s *Server) addConn(c net.Conn) {
Expand Down Expand Up @@ -129,18 +129,24 @@ func (s *Server) logConnClosed() {
}
}

func forward(conn1 net.Conn, conn2 net.Conn) {
func (s *Server) forward(conn net.Conn, upstream net.Conn) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
defer conn1.Close()
_, _ = io.Copy(conn1, conn2)
defer conn.Close()
_, err := io.Copy(conn, upstream)
if err != nil {
s.logger.Debug("copy to conn closed", zap.Error(err))
}
}()
go func() {
defer wg.Done()
defer conn2.Close()
_, _ = io.Copy(conn2, conn1)
defer upstream.Close()
_, err := io.Copy(upstream, conn)
if err != nil {
s.logger.Debug("copy to upstream closed", zap.Error(err))
}
}()
wg.Wait()
}
18 changes: 16 additions & 2 deletions client/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,27 @@ func (f *Forwarder) forward(downstream net.Conn) {
go func() {
defer g.Done()
defer downstream.Close()
_, _ = io.Copy(downstream, upstream)
_, err := io.Copy(downstream, upstream)
if err != nil {
f.logger.Debug(
"copy to downstream closed",
zap.String("addr", f.addr),
zap.Error(err),
)
}
}()

go func() {
defer g.Done()
defer upstream.Close()
_, _ = io.Copy(upstream, downstream)
_, err := io.Copy(upstream, downstream)
if err != nil {
f.logger.Debug(
"copy to upstream closed",
zap.String("addr", f.addr),
zap.Error(err),
)
}
}()

g.Wait()
Expand Down
18 changes: 16 additions & 2 deletions forward/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,26 @@ func (f *Forwarder) forwardConn(conn net.Conn) {
go func() {
defer g.Done()
defer conn.Close()
_, _ = io.Copy(conn, upstream)
_, err := io.Copy(conn, upstream)
if err != nil {
f.logger.Debug(
"copy to conn closed",
zap.String("endpoint-id", f.endpointID),
zap.Error(err),
)
}
}()
go func() {
defer g.Done()
defer upstream.Close()
_, _ = io.Copy(upstream, conn)
_, err := io.Copy(upstream, conn)
if err != nil {
f.logger.Debug(
"copy to upstream closed",
zap.String("endpoint-id", f.endpointID),
zap.Error(err),
)
}
}()
g.Wait()
}
18 changes: 12 additions & 6 deletions server/proxy/tcpproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,27 @@ func (p *TCPProxy) ServeHTTP(w http.ResponseWriter, r *http.Request, endpointID
downstreamConn := pikowebsocket.New(wsConn)
defer downstreamConn.Close()

forward(upstreamConn, downstreamConn)
p.forward(upstreamConn, downstreamConn)
}

func forward(conn1 net.Conn, conn2 net.Conn) {
func (p *TCPProxy) forward(upstream net.Conn, downstream net.Conn) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
defer conn1.Close()
_, _ = io.Copy(conn1, conn2)
defer upstream.Close()
_, err := io.Copy(upstream, downstream)
if err != nil {
p.logger.Debug("copy to upstream closed", zap.Error(err))
}
}()
go func() {
defer wg.Done()
defer conn2.Close()
_, _ = io.Copy(conn2, conn1)
defer downstream.Close()
_, err := io.Copy(downstream, upstream)
if err != nil {
p.logger.Debug("copy to downstream closed", zap.Error(err))
}
}()
wg.Wait()
}

0 comments on commit a1ecbf5

Please sign in to comment.