Skip to content

Commit

Permalink
uds: add exponential backoff for reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
iksaif committed May 24, 2024
1 parent 1139efe commit 0649aa0
Showing 1 changed file with 31 additions and 3 deletions.
34 changes: 31 additions & 3 deletions statsd/uds.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type udsWriter struct {
conn net.Conn
// write timeout
writeTimeout time.Duration
// current write timeout
currentWriteTimeout time.Duration
// connect timeout
connectTimeout time.Duration
sync.RWMutex // used to lock conn / writer can replace it
Expand All @@ -29,7 +31,14 @@ type udsWriter struct {
// newUDSWriter returns a pointer to a new udsWriter given a socket file path as addr.
func newUDSWriter(addr string, writeTimeout time.Duration, connectTimeout time.Duration, transport string) (*udsWriter, error) {
// Defer connection to first Write
writer := &udsWriter{addr: addr, transport: transport, conn: nil, writeTimeout: writeTimeout, connectTimeout: connectTimeout}
writer := &udsWriter{
addr: addr,
transport: transport,
conn: nil,
writeTimeout: writeTimeout,
currentWriteTimeout: writeTimeout,
connectTimeout: connectTimeout,
}
return writer, nil
}

Expand Down Expand Up @@ -70,7 +79,7 @@ func (w *udsWriter) Write(data []byte) (int, error) {

// When using streams the deadline will only make us drop the packet if we can't write it at all,
// once we've started writing we need to finish.
conn.SetWriteDeadline(time.Now().Add(w.writeTimeout))
_ = conn.SetWriteDeadline(time.Now().Add(w.currentWriteTimeout))

// When using streams, we append the length of the packet to the data
if stream {
Expand All @@ -82,7 +91,7 @@ func (w *udsWriter) Write(data []byte) (int, error) {

// W need to be able to finish to write partially written packets once we have started.
// But we will reset the connection if we can't write anything at all for a long time.
w.conn.SetWriteDeadline(time.Now().Add(w.connectTimeout))
_ = w.conn.SetWriteDeadline(time.Now().Add(w.connectTimeout))

// Continue writing only if we've written the length of the packet
if err == nil {
Expand All @@ -96,11 +105,28 @@ func (w *udsWriter) Write(data []byte) (int, error) {
}

if w.shouldCloseConnection(err, partialWrite) {
w.maybeIncreaseTimeout(stream)
w.unsetConnection()
}
return n, err
}

func (w *udsWriter) maybeIncreaseTimeout(stream bool) {
if stream {
// For uds-stream we want to gradually increase the write timeout
// up to the connect timeout to avoid overloading the agent with new connections.
// We increase by 20% or 500ms up to the connection timeout.
incr := w.currentWriteTimeout / 100 * 20
if incr == time.Duration(0) {
incr = time.Millisecond * 500
}
w.currentWriteTimeout += incr
if w.currentWriteTimeout > w.connectTimeout {
w.currentWriteTimeout = w.connectTimeout
}
}
}

func (w *udsWriter) Close() error {
if w.conn != nil {
return w.conn.Close()
Expand Down Expand Up @@ -154,6 +180,8 @@ func (w *udsWriter) ensureConnection() (net.Conn, error) {
if err != nil {
return nil, err
}
// reset the write timeout
w.currentWriteTimeout = w.writeTimeout
w.conn = newConn
w.transport = newConn.RemoteAddr().Network()
return newConn, nil
Expand Down

0 comments on commit 0649aa0

Please sign in to comment.