diff --git a/kafka/message.go b/kafka/message.go index d473771a7..7537a1120 100644 --- a/kafka/message.go +++ b/kafka/message.go @@ -70,14 +70,15 @@ func (t TimestampType) String() string { // Message represents a Kafka message type Message struct { - TopicPartition TopicPartition - Value []byte - Key []byte - Timestamp time.Time - TimestampType TimestampType - Opaque interface{} - Headers []Header - LeaderEpoch *int32 // Deprecated: LeaderEpoch or nil if not available. Use m.TopicPartition.LeaderEpoch instead. + TopicPartition TopicPartition + Value []byte + Key []byte + Timestamp time.Time + TimestampType TimestampType + Opaque interface{} + Headers []Header + LeaderEpoch *int32 // Deprecated: LeaderEpoch or nil if not available. Use m.TopicPartition.LeaderEpoch instead. + ProducerLatency *int64 } // String returns a human readable representation of a Message. @@ -167,13 +168,16 @@ func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) { msg.LeaderEpoch = &leaderEpoch msg.TopicPartition.LeaderEpoch = &leaderEpoch } + producerLatency := int64(C.rd_kafka_message_latency(cmsg)) + if producerLatency >= 0 { + msg.ProducerLatency = &producerLatency + } } // newMessageFromC creates a new message object from a C rd_kafka_message_t // NOTE: For use with Producer: does not set message timestamp fields. func (h *handle) newMessageFromC(cmsg *C.rd_kafka_message_t) (msg *Message) { msg = &Message{} - h.setupMessageFromC(msg, cmsg) return msg diff --git a/kafka/producer_test.go b/kafka/producer_test.go index 634746202..7750d3739 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -136,6 +136,9 @@ func TestProducerAPIs(t *testing.T) { switch e := ev.(type) { case *Message: msgCnt++ + if *e.ProducerLatency <= 0 { + t.Errorf("Producer Latency should be included in delivery reports, instead got %v", *e.ProducerLatency) + } if (string)(e.Value) == "ProducerChannel" { s := e.Opaque.(*string) if s != &myOpq {