Skip to content

Commit

Permalink
Timed blocking of Kafka producer/consumer Close()
Browse files Browse the repository at this point in the history
  • Loading branch information
ekoutanov committed May 5, 2020
1 parent b7232d1 commit eded2a5
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 29 deletions.
1 change: 1 addition & 0 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Interfaces.
type KafkaConsumer interface {
Subscribe(topic string, rebalanceCb kafka.RebalanceCb) error
ReadMessage(timeout time.Duration) (*kafka.Message, error)
Events() chan kafka.Event
Close() error
}

Expand Down
17 changes: 17 additions & 0 deletions kafka_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import (
type consMockFuncs struct {
Subscribe func(m *consMock, topic string, rebalanceCb kafka.RebalanceCb) error
ReadMessage func(m *consMock, timeout time.Duration) (*kafka.Message, error)
Events func(m *consMock) chan kafka.Event
Close func(m *consMock) error
}

type consMockCounts struct {
Subscribe,
ReadMessage,
Events,
Close concurrent.AtomicCounter
}

type consMock struct {
events chan kafka.Event
rebalanceCallback kafka.RebalanceCb
rebalanceEvents chan kafka.Event
messages chan *kafka.Message
Expand Down Expand Up @@ -47,12 +50,20 @@ func (m *consMock) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
return m.f.ReadMessage(m, timeout)
}

func (m *consMock) Events() chan kafka.Event {
defer m.c.Events.Inc()
return m.f.Events(m)
}

func (m *consMock) Close() error {
defer m.c.Close.Inc()
return m.f.Close(m)
}

func (m *consMock) fillDefaults() {
if m.events == nil {
m.events = make(chan kafka.Event)
}
if m.rebalanceEvents == nil {
m.rebalanceEvents = make(chan kafka.Event)
}
Expand All @@ -74,11 +85,17 @@ func (m *consMock) fillDefaults() {
}
}
}
if m.f.Events == nil {
m.f.Events = func(m *consMock) chan kafka.Event {
return m.events
}
}
if m.f.Close == nil {
m.f.Close = func(m *consMock) error {
return nil
}
}
m.c.Events = concurrent.NewAtomicCounter()
m.c.Subscribe = concurrent.NewAtomicCounter()
m.c.ReadMessage = concurrent.NewAtomicCounter()
m.c.Close = concurrent.NewAtomicCounter()
Expand Down
59 changes: 30 additions & 29 deletions neli.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ type Neli interface {
}

type neli struct {
config Config
scribe scribe.Scribe
consumer KafkaConsumer
producer KafkaProducer
pollDeadline concurrent.Deadline
lastReceived concurrent.AtomicCounter
isAssigned concurrent.AtomicCounter
isLeader concurrent.AtomicCounter
barrier Barrier
state concurrent.AtomicReference
stateMutex sync.Mutex
config Config
scribe scribe.Scribe
consumer KafkaConsumer
producer KafkaProducer
pollDeadline concurrent.Deadline
lastReceived concurrent.AtomicCounter
isAssigned concurrent.AtomicCounter
isLeader concurrent.AtomicCounter
barrier Barrier
state concurrent.AtomicReference
stateMutex sync.Mutex
deliveryHandlerDone chan int
}

Expand Down Expand Up @@ -76,14 +76,14 @@ func New(config Config, barrier ...Barrier) (Neli, error) {
return nil, err
}
n := &neli{
config: config,
scribe: config.Scribe,
lastReceived: concurrent.NewAtomicCounter(),
isAssigned: concurrent.NewAtomicCounter(),
isLeader: concurrent.NewAtomicCounter(),
barrier: barrierArg,
pollDeadline: concurrent.NewDeadline(*config.MinPollInterval),
state: concurrent.NewAtomicReference(Live),
config: config,
scribe: config.Scribe,
lastReceived: concurrent.NewAtomicCounter(),
isAssigned: concurrent.NewAtomicCounter(),
isLeader: concurrent.NewAtomicCounter(),
barrier: barrierArg,
pollDeadline: concurrent.NewDeadline(*config.MinPollInterval),
state: concurrent.NewAtomicReference(Live),
deliveryHandlerDone: make(chan int),
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func New(config Config, barrier ...Barrier) (Neli, error) {
n.producer = p
go func() {
defer close(n.deliveryHandlerDone)

for event := range p.Events() {
switch e := event.(type) {
case *kafka.Message:
Expand Down Expand Up @@ -367,15 +367,16 @@ func (n *neli) Close() error {
defer func() {
<-n.deliveryHandlerDone
}()
defer func() {
go func() {
// A bug in confluent-kafka-go (#463) occasionally causes an indefinite syscall hang in Close(), after it closes
// the Events channel. So we delegate this to a separate goroutine — better an orphaned goroutine than a
// frozen harvester. (The rest of the battery will still unwind normally.)
n.producer.Close()
}()
}()
return n.consumer.Close()

// A bug in confluent-kafka-go (#463) occasionally causes an indefinite syscall hang in Close(), after it closes
// the Events channel. So we delegate this to a separate goroutine — better an orphaned goroutine than a
// frozen harvester. (The rest of the battery will still unwind normally.)
const closeTimeout = 10 * time.Second
_, _ = performTimed(void(n.producer.Close), closeTimeout)

// Similarly to the above, Consumer.Close() may also hang, and we need to cope with this until #463 is resolved.
_, err := performTimed(n.consumer.Close, closeTimeout)
return err
}

// Await the closing of this Neli instance.
Expand Down
15 changes: 15 additions & 0 deletions neli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,21 @@ func TestFatalErrorInProduce(t *testing.T) {
assert.Equal(t, "Fatal error: simulated", err.Error())
}

func TestCloseConsumerError(t *testing.T) {
_, cons, _, config, _ := fixtures{}.create()
cons.f.Close = func(m *consMock) error {
return check.ErrSimulated
}

n, err := New(config)
require.Nil(t, err)

err = n.Close()
require.Equal(t, check.ErrSimulated, err)

n.Await()
}

func TestDeliveryReports(t *testing.T) {
m, _, prod, config, _ := fixtures{}.create()

Expand Down
38 changes: 38 additions & 0 deletions timed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package goneli

import (
"errors"
"time"

"github.com/obsidiandynamics/libstdgo/concurrent"
)

var errPerformWithNoError = errors.New("")

func void(f func()) func() error {
return func() error {
f()
return nil
}
}

func performTimed(f func() error, timeout time.Duration) (bool, error) {
errorRef := concurrent.NewAtomicReference()
go func() {
err := f()
if err != nil {
errorRef.Set(err)
} else {
errorRef.Set(errPerformWithNoError)
}
}()

res := errorRef.Await(concurrent.RefNot(concurrent.RefNil()), timeout)
if res == nil {
return false, nil
}
if err := res.(error); err != errPerformWithNoError {
return true, err
}
return true, nil
}
38 changes: 38 additions & 0 deletions timed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package goneli

import (
"sync"
"testing"
"time"

"github.com/obsidiandynamics/libstdgo/check"
"github.com/stretchr/testify/require"
)

func TestPerformTimed_noError(t *testing.T) {
done, err := performTimed(void(func() {}), 10*time.Second)
require.True(t, done)
require.Nil(t, err)
}

func TestPerformTimed_withError(t *testing.T) {
done, err := performTimed(func() error {
return check.ErrSimulated
}, 10*time.Second)
require.True(t, done)
require.Equal(t, check.ErrSimulated, err)
}

func TestPerformTimed_withTimeout(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(1)

done, err := performTimed(func() error {
wg.Wait()
return nil
}, 1*time.Millisecond)
require.False(t, done)
require.Nil(t, err)

wg.Done()
}

0 comments on commit eded2a5

Please sign in to comment.