Skip to content

Commit

Permalink
Fix flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Nov 28, 2023
1 parent abd79cf commit d7798da
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 36 deletions.
31 changes: 13 additions & 18 deletions heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,20 @@ func (self *heartbeater) HandleReceive(*Message, Channel) {
// ignore incoming heartbeat events, everything is handled by the transformer
}

func (self *heartbeater) queueEvent(event heartbeatEvent) {
select {
case self.events <- event:
default:
}
}

func (self *heartbeater) Rx(m *Message, _ Channel) {
if val, found := m.GetUint64Header(HeartbeatHeader); found {
select {
case self.events <- heartbeatRxEvent(val):
default:
}
self.queueEvent(heartbeatRxEvent(val))
}

if val, found := m.GetUint64Header(HeartbeatResponseHeader); found {
select {
case self.events <- heartbeatRespRxEvent(val):
default:
}
self.queueEvent(heartbeatRespRxEvent(val))
}
}

Expand All @@ -138,19 +139,13 @@ func (self *heartbeater) Tx(m *Message, _ Channel) {
if now-self.lastHeartbeatTx > self.heartBeatIntervalNs {
m.PutUint64Header(HeartbeatHeader, uint64(now))
atomic.StoreInt64(&self.lastHeartbeatTx, now)
select {
case self.events <- heartbeatTxEvent(now):
default:
}
self.queueEvent(heartbeatTxEvent(now))
}

if unrespondedHeartbeat := atomic.LoadInt64(&self.unrespondedHeartbeat); unrespondedHeartbeat != 0 {
m.PutUint64Header(HeartbeatResponseHeader, uint64(unrespondedHeartbeat))
atomic.StoreInt64(&self.unrespondedHeartbeat, 0)
select {
case self.events <- heartbeatRespTxEvent(now):
default:
}
self.queueEvent(heartbeatRespTxEvent(now))
}
}

Expand Down Expand Up @@ -179,7 +174,7 @@ func (self *heartbeater) sendHeartbeat() {
if err := m.WithTimeout(time.Second).SendAndWaitForWire(self.ch); err != nil && !self.ch.IsClosed() {
logrus.WithError(err).
WithField("channelId", self.ch.Label()).
Error("failed to send heartbeat")
Error("pulse failed to send heartbeat")
}
}

Expand All @@ -188,7 +183,7 @@ func (self *heartbeater) sendHeartbeatIfQueueFree() {
if err := m.WithTimeout(10 * time.Millisecond).Send(self.ch); err != nil && !self.ch.IsClosed() {
logrus.WithError(err).
WithField("channelId", self.ch.Label()).
Error("failed to send heartbeat")
Error("handleUnresponded failed to send heartbeat")
}
}

Expand Down
53 changes: 35 additions & 18 deletions heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package channel

import (
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/stretchr/testify/require"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -49,7 +50,9 @@ func TestBaselineHeartbeat(t *testing.T) {
for !ch.IsClosed() && !done.Load() {
msg := NewMessage(ContentTypePingType, []byte("hello"))
if err := msg.WithTimeout(time.Second).Send(ch); err != nil {
errC <- err
if !ch.IsClosed() {
errC <- err
}
return
}
count++
Expand All @@ -64,18 +67,23 @@ func TestBaselineHeartbeat(t *testing.T) {
options := DefaultOptions()
options.WriteTimeout = 100 * time.Millisecond

fmt.Printf("dialing server\n")
time.Sleep(time.Second * 2)
ch := dialServer(options, t, BindHandlerF(func(binding Binding) error {
binding.AddReceiveHandlerF(ContentTypePingType, func(m *Message, ch Channel) {})
return nil
}))
fmt.Printf("done dialing server\n")

defer func() { _ = ch.Close() }()

var count uint64
for !ch.IsClosed() && !done.Load() {
msg := NewMessage(ContentTypePingType, []byte("hello"))
err := msg.WithTimeout(time.Second).Send(ch)
req.NoError(err)
if !ch.IsClosed() {
req.NoError(err)
}
count++
}

Expand Down Expand Up @@ -126,10 +134,14 @@ func TestBusyHeartbeat(t *testing.T) {
for !ch.IsClosed() && !done.Load() {
msg := NewMessage(ContentTypePingType, []byte("hello"))
if err := msg.WithTimeout(time.Second).Send(ch); err != nil {
errC <- err
if !ch.IsClosed() {
pfxlog.Logger().WithError(err).WithField("side", "server").Error("send error")
errC <- err
}
return
}
count++
time.Sleep(time.Millisecond)
}
}()
}
Expand All @@ -139,7 +151,7 @@ func TestBusyHeartbeat(t *testing.T) {
req := require.New(t)

options := DefaultOptions()
options.WriteTimeout = 100 * time.Millisecond
options.WriteTimeout = time.Second

hb := &heartbeatTracker{id: "client"}
ch := dialServer(options, t, BindHandlerF(func(binding Binding) error {
Expand All @@ -154,7 +166,10 @@ func TestBusyHeartbeat(t *testing.T) {
for !ch.IsClosed() && !done.Load() {
msg := NewMessage(ContentTypePingType, []byte("hello"))
err := msg.WithTimeout(time.Second).Send(ch)
req.NoError(err)
if !ch.IsClosed() {
req.NoError(err)
}
time.Sleep(time.Millisecond)
count++
}

Expand All @@ -173,12 +188,12 @@ func TestBusyHeartbeat(t *testing.T) {

fmt.Printf("count: %v\nremote: %v\nchecks: %v\n", count, remoteCount, hb.checkCount)

req.True(hb.localCount > 8)
req.True(hb.localCount < 12)
req.True(hb.remoteCount > 8)
req.True(hb.remoteCount < 12)
req.True(hb.checkCount > 80)
req.True(hb.remoteCount < 120)
req.True(hb.respRx >= 8, "value: %v", hb.respRx)
req.True(hb.respRx < 12, "value: %v", hb.respRx)
req.True(hb.remoteCount >= 8, "value: %v", hb.remoteCount)
req.True(hb.remoteCount < 12, "value: %v", hb.remoteCount)
req.True(hb.checkCount > 80, "value: %v", hb.checkCount)
req.True(hb.checkCount < 120, "value: %v", hb.checkCount)
}

func TestQuietHeartbeat(t *testing.T) {
Expand Down Expand Up @@ -261,35 +276,37 @@ func TestQuietHeartbeat(t *testing.T) {

fmt.Printf("count: %v\nremote: %v\n", count, remoteCount)

req.True(hb.localCount > 8)
req.True(hb.localCount < 12)
req.True(hb.respRx > 8)
req.True(hb.respRx < 12)
req.True(hb.remoteCount > 8)
req.True(hb.remoteCount < 12)
}

type heartbeatTracker struct {
id string
localCount int
hbTx int
respRx int
remoteCount int
checkCount int
}

func (self *heartbeatTracker) HeartbeatTx(ts int64) {
fmt.Printf("%v: h-> @%v\n", self.id, ts)
self.hbTx++
fmt.Printf("%v: h-> @%v, hbTx: %v\n", self.id, ts, self.hbTx)
}

func (self *heartbeatTracker) HeartbeatRx(ts int64) {
fmt.Printf("%v: <-h @%v\n", self.id, ts)
}

func (self *heartbeatTracker) HeartbeatRespTx(ts int64) {
fmt.Printf("%v: r-> @%v\n", self.id, ts)
self.remoteCount++
fmt.Printf("%v: r-> @%v, rc: %v\n", self.id, ts, self.remoteCount)
}

func (self *heartbeatTracker) HeartbeatRespRx(ts int64) {
fmt.Printf("%v: <-r @%v\n", self.id, ts)
self.localCount++
self.respRx++
fmt.Printf("%v: <-r @%v, respRx: %v\n", self.id, ts, self.respRx)
}

func (self *heartbeatTracker) CheckHeartBeat() {
Expand Down

0 comments on commit d7798da

Please sign in to comment.