diff --git a/kafka.go b/kafka.go index e80e1d5..86373df 100644 --- a/kafka.go +++ b/kafka.go @@ -15,7 +15,6 @@ Interfaces. type KafkaConsumer interface { Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error ReadMessage(timeout time.Duration) (*kafka.Message, error) - Events() chan kafka.Event Close() error } diff --git a/kafka_mock_test.go b/kafka_mock_test.go index 1a0c57a..43e9d6a 100644 --- a/kafka_mock_test.go +++ b/kafka_mock_test.go @@ -10,19 +10,16 @@ import ( type consMockFuncs struct { Subscribe func(m *consMock, topic string, rebalanceCb kafka.RebalanceCb) error ReadMessage func(m *consMock, timeout time.Duration) (*kafka.Message, error) - Events func(m *consMock) chan kafka.Event Close func(m *consMock) error } type consMockCounts struct { Subscribe, ReadMessage, - Events, Close concurrent.AtomicCounter } type consMock struct { - events chan kafka.Event rebalanceCallback kafka.RebalanceCb rebalanceEvents chan kafka.Event messages chan *kafka.Message @@ -50,20 +47,12 @@ func (m *consMock) ReadMessage(timeout time.Duration) (*kafka.Message, error) { return m.f.ReadMessage(m, timeout) } -func (m *consMock) Events() chan kafka.Event { - defer m.c.Events.Inc() - return m.f.Events(m) -} - func (m *consMock) Close() error { defer m.c.Close.Inc() return m.f.Close(m) } func (m *consMock) fillDefaults() { - if m.events == nil { - m.events = make(chan kafka.Event) - } if m.rebalanceEvents == nil { m.rebalanceEvents = make(chan kafka.Event) } @@ -85,17 +74,11 @@ func (m *consMock) fillDefaults() { } } } - if m.f.Events == nil { - m.f.Events = func(m *consMock) chan kafka.Event { - return m.events - } - } if m.f.Close == nil { m.f.Close = func(m *consMock) error { return nil } } - m.c.Events = concurrent.NewAtomicCounter() m.c.Subscribe = concurrent.NewAtomicCounter() m.c.ReadMessage = concurrent.NewAtomicCounter() m.c.Close = concurrent.NewAtomicCounter() diff --git a/neli.go b/neli.go index 304a517..87ef264 100644 --- a/neli.go +++ b/neli.go @@ -372,10 +372,10 @@ func (n *neli) Close() error { // the Events channel. So we delegate this to a separate goroutine — better an orphaned goroutine than a // frozen harvester. (The rest of the battery will still unwind normally.) const closeTimeout = 10 * time.Second - _, _ = performTimed(void(n.producer.Close), closeTimeout) + _, _ = performTimed(n.logger().W(), "producer close", void(n.producer.Close), closeTimeout) // Similarly to the above, Consumer.Close() may also hang, and we need to cope with this until #463 is resolved. - _, err := performTimed(n.consumer.Close, closeTimeout) + _, err := performTimed(n.logger().W(), "consumer close", n.consumer.Close, closeTimeout) return err } diff --git a/timed.go b/timed.go index 6f92099..8b69447 100644 --- a/timed.go +++ b/timed.go @@ -5,6 +5,7 @@ import ( "time" "github.com/obsidiandynamics/libstdgo/concurrent" + "github.com/obsidiandynamics/libstdgo/scribe" ) var errPerformWithNoError = errors.New("") @@ -16,10 +17,10 @@ func void(f func()) func() error { } } -func performTimed(f func() error, timeout time.Duration) (bool, error) { +func performTimed(logger scribe.Logger, opName string, op func() error, timeout time.Duration) (bool, error) { errorRef := concurrent.NewAtomicReference() go func() { - err := f() + err := op() if err != nil { errorRef.Set(err) } else { @@ -29,6 +30,7 @@ func performTimed(f func() error, timeout time.Duration) (bool, error) { res := errorRef.Await(concurrent.RefNot(concurrent.RefNil()), timeout) if res == nil { + logger("Operation '%s' failed to complete within %v", opName, timeout) return false, nil } if err := res.(error); err != errPerformWithNoError { diff --git a/timed_test.go b/timed_test.go index e1f2ebf..1d63715 100644 --- a/timed_test.go +++ b/timed_test.go @@ -6,33 +6,46 @@ import ( "time" "github.com/obsidiandynamics/libstdgo/check" + "github.com/obsidiandynamics/libstdgo/scribe" "github.com/stretchr/testify/require" ) func TestPerformTimed_noError(t *testing.T) { - done, err := performTimed(void(func() {}), 10*time.Second) + m := scribe.NewMock() + scr := scribe.New(m.Factories()) + done, err := performTimed(scr.W(), "some-op", void(func() {}), 10*time.Second) require.True(t, done) require.Nil(t, err) + m.Entries().Assert(t, scribe.Count(0)) } func TestPerformTimed_withError(t *testing.T) { - done, err := performTimed(func() error { + m := scribe.NewMock() + scr := scribe.New(m.Factories()) + done, err := performTimed(scr.W(), "some-op", func() error { return check.ErrSimulated }, 10*time.Second) require.True(t, done) require.Equal(t, check.ErrSimulated, err) + m.Entries().Assert(t, scribe.Count(0)) } func TestPerformTimed_withTimeout(t *testing.T) { + m := scribe.NewMock() + scr := scribe.New(m.Factories()) wg := sync.WaitGroup{} wg.Add(1) - done, err := performTimed(func() error { + done, err := performTimed(scr.W(), "some-op", func() error { wg.Wait() return nil }, 1*time.Millisecond) require.False(t, done) require.Nil(t, err) + m.Entries(). + Having(scribe.LogLevel(scribe.Warn)). + Having(scribe.MessageEqual("Operation 'some-op' failed to complete within 1ms")). + Assert(t, scribe.Count(1)) wg.Done() }