diff --git a/changelog/fragments/1690563780-Add-kafka-output-type-for-agent-policies.yaml b/changelog/fragments/1690563780-Add-kafka-output-type-for-agent-policies.yaml new file mode 100644 index 000000000..3817836dd --- /dev/null +++ b/changelog/fragments/1690563780-Add-kafka-output-type-for-agent-policies.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Add kafka output type for agent policies + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent-shipper/issues/116 diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 4f5b99ae5..645c3eb81 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -25,6 +25,7 @@ import ( const ( OutputTypeElasticsearch = "elasticsearch" OutputTypeLogstash = "logstash" + OutputTypeKafka = "kafka" ) var ( @@ -54,6 +55,9 @@ func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.B case OutputTypeLogstash: zlog.Debug().Msg("preparing logstash output") zlog.Info().Msg("no actions required for logstash output preparation") + case OutputTypeKafka: + zlog.Debug().Msg("preparing kafka output") + zlog.Info().Msg("no actions required for kafka output preparation") default: zlog.Error().Msgf("unknown output type: %s; skipping preparation", p.Type) return fmt.Errorf("encountered unexpected output type while preparing outputs: %s", p.Type) diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index f74d57b3e..bb4cbad9b 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -69,6 +69,37 @@ func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { bulker.AssertExpectations(t) } +func TestPolicyKafkaOutputPrepare(t *testing.T) { + logger := testlog.SetLogger(t) + bulker := ftesting.NewMockBulk() + po := Output{ + Type: OutputTypeKafka, + Name: "test output", + Role: &RoleT{ + Sha2: "fake sha", + Raw: TestPayload, + }, + } + + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + require.Nil(t, err, "expected prepare to pass") + bulker.AssertExpectations(t) +} +func TestPolicyKafkaOutputPrepareNoRole(t *testing.T) { + logger := testlog.SetLogger(t) + bulker := ftesting.NewMockBulk() + po := Output{ + Type: OutputTypeKafka, + Name: "test output", + Role: nil, + } + + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + // No permissions are required by kafka currently + require.Nil(t, err, "expected prepare to pass") + bulker.AssertExpectations(t) +} + func TestPolicyESOutputPrepareNoRole(t *testing.T) { logger := testlog.SetLogger(t) bulker := ftesting.NewMockBulk()