Skip to content

Commit

Permalink
feat: add manuel commit implementation as default
Browse files Browse the repository at this point in the history
* Add manuel commit to prevent early commit

* Fix closed reader exception

* feat: Add context cancellation support for consume to handle shutdown gracefully
  • Loading branch information
mhmtszr authored Apr 25, 2023
1 parent 665e6a8 commit d965996
Showing 1 changed file with 19 additions and 8 deletions.
27 changes: 19 additions & 8 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type consumer struct {
retryTopic string

logger LoggerInterface

cancelFn context.CancelFunc
}

var _ Consumer = (*consumer)(nil)
Expand Down Expand Up @@ -109,19 +111,20 @@ func (c *consumer) Consume() {
c.cronsumer.Start()
}

go c.consume()
ctx, cancel := context.WithCancel(context.Background())
c.cancelFn = cancel

go c.consume(ctx)

for i := 0; i < c.concurrency; i++ {
c.wg.Add(1)
go func() {
defer c.wg.Done()

for message := range c.messageCh {
if err := c.consumeFn(message); err == nil {
continue
}
err := c.consumeFn(message)

if c.retryEnabled {
if err != nil && c.retryEnabled {
retryableMsg := convertToRetryableMessage(c.retryTopic, message)
if err := c.retryFn(retryableMsg); err != nil {
if err = c.cronsumer.Produce(retryableMsg); err != nil {
Expand All @@ -130,6 +133,10 @@ func (c *consumer) Consume() {
}
}
}
if err = c.r.CommitMessages(context.Background(), kafka.Message(message)); err != nil {
c.logger.Errorf("Error Committing message %s",
string(message.Value))
}
}
}()
}
Expand Down Expand Up @@ -179,7 +186,7 @@ func convertFromRetryableMessage(message kcronsumer.Message) Message {
}
}

func (c *consumer) consume() {
func (c *consumer) consume(ctx context.Context) {
c.logger.Debug("Consuming is starting")
c.wg.Add(1)
defer c.wg.Done()
Expand All @@ -189,8 +196,11 @@ func (c *consumer) consume() {
case <-c.quit:
return
default:
message, err := c.r.ReadMessage(context.Background())
message, err := c.r.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
continue
}
c.logger.Errorf("Message could not read, err %s", err.Error())
continue
}
Expand All @@ -206,10 +216,11 @@ func (c *consumer) Stop() error {
if c.retryEnabled {
c.cronsumer.Stop()
}
err = c.r.Close()
c.cancelFn()
c.quit <- struct{}{}
close(c.messageCh)
c.wg.Wait()
err = c.r.Close()
})

return err
Expand Down

0 comments on commit d965996

Please sign in to comment.