diff --git a/plugins/input/kafka/input_kafka.go b/plugins/input/kafka/input_kafka.go index 5d5cf37cc0..e80775c477 100644 --- a/plugins/input/kafka/input_kafka.go +++ b/plugins/input/kafka/input_kafka.go @@ -23,6 +23,7 @@ import ( "github.com/IBM/sarama" "github.com/alibaba/ilogtail/pkg/logger" + "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/pipeline/extensions" "github.com/alibaba/ilogtail/pkg/protocol/decoder" @@ -239,7 +240,11 @@ func (k *InputKafka) onMessage(msg *sarama.ConsumerMessage) { switch k.version { case v1: fields := make(map[string]string) - fields[string(msg.Key)] = string(msg.Value) + if len(msg.Key) == 0 { + fields[models.ContentKey] = string(msg.Value) + } else { + fields[string(msg.Key)] = string(msg.Value) + } k.collectorV1.AddData(nil, fields) case v2: data, err := k.decoder.DecodeV2(msg.Value, nil)