diff --git a/consumer.go b/consumer.go index 9b34229..581b5e8 100644 --- a/consumer.go +++ b/consumer.go @@ -32,6 +32,8 @@ type consumer struct { retryTopic string logger LoggerInterface + + cancelFn context.CancelFunc } var _ Consumer = (*consumer)(nil) @@ -109,7 +111,10 @@ func (c *consumer) Consume() { c.cronsumer.Start() } - go c.consume() + ctx, cancel := context.WithCancel(context.Background()) + c.cancelFn = cancel + + go c.consume(ctx) for i := 0; i < c.concurrency; i++ { c.wg.Add(1) @@ -117,11 +122,9 @@ func (c *consumer) Consume() { defer c.wg.Done() for message := range c.messageCh { - if err := c.consumeFn(message); err == nil { - continue - } + err := c.consumeFn(message) - if c.retryEnabled { + if err != nil && c.retryEnabled { retryableMsg := convertToRetryableMessage(c.retryTopic, message) if err := c.retryFn(retryableMsg); err != nil { if err = c.cronsumer.Produce(retryableMsg); err != nil { @@ -130,6 +133,10 @@ func (c *consumer) Consume() { } } } + if err = c.r.CommitMessages(context.Background(), kafka.Message(message)); err != nil { + c.logger.Errorf("Error Committing message %s", + string(message.Value)) + } } }() } @@ -179,7 +186,7 @@ func convertFromRetryableMessage(message kcronsumer.Message) Message { } } -func (c *consumer) consume() { +func (c *consumer) consume(ctx context.Context) { c.logger.Debug("Consuming is starting") c.wg.Add(1) defer c.wg.Done() @@ -189,8 +196,11 @@ func (c *consumer) consume() { case <-c.quit: return default: - message, err := c.r.ReadMessage(context.Background()) + message, err := c.r.FetchMessage(ctx) if err != nil { + if ctx.Err() != nil { + continue + } c.logger.Errorf("Message could not read, err %s", err.Error()) continue } @@ -206,10 +216,11 @@ func (c *consumer) Stop() error { if c.retryEnabled { c.cronsumer.Stop() } - err = c.r.Close() + c.cancelFn() c.quit <- struct{}{} close(c.messageCh) c.wg.Wait() + err = c.r.Close() }) return err