diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index 349fd82fc7f..8596dbd605d 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -266,7 +266,10 @@ func TestKafkaPublish(t *testing.T) { cfg := makeConfig(t, defaultConfig) if test.config != nil { - cfg.Merge(makeConfig(t, test.config)) + err := cfg.Merge(makeConfig(t, test.config)) + if err != nil { + t.Fatal(err) + } } t.Run(name, func(t *testing.T) { @@ -275,7 +278,8 @@ func TestKafkaPublish(t *testing.T) { t.Fatal(err) } - output := grp.Clients[0].(*client) + output, ok := grp.Clients[0].(*client) + assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client") if err := output.Connect(); err != nil { t.Fatal(err) } @@ -291,7 +295,10 @@ func TestKafkaPublish(t *testing.T) { } wg.Add(1) - output.Publish(context.Background(), batch) + err := output.Publish(context.Background(), batch) + if err != nil { + t.Fatal(err) + } } // wait for all published batches to be ACKed @@ -347,7 +354,8 @@ func validateJSON(t *testing.T, value []byte, events []beat.Event) string { return "" } - msg := decoded["message"].(string) + msg, ok := decoded["message"].(string) + assert.True(t, ok, "type of decoded message was not string") event := findEvent(events, msg) if event == nil { t.Errorf("could not find expected event with message: %v", msg)