Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for remote ES output #3051

Merged
merged 99 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
44b6985
first draft of remote output
juliaElastic Oct 24, 2023
f870e55
remove commented code
juliaElastic Oct 24, 2023
4605e04
storing remote es clients in bulker
juliaElastic Oct 26, 2023
b6b47af
remote service_token from policy sent to agent
juliaElastic Oct 26, 2023
d82a117
fix lint
juliaElastic Oct 26, 2023
0ceb201
rename remoteEsList and type
juliaElastic Oct 26, 2023
c63fb1e
added missing check
juliaElastic Oct 26, 2023
6795a82
Merge branch 'main' into remote-es
juliaElastic Oct 30, 2023
6cd8491
fixes after merge main
juliaElastic Oct 30, 2023
4b72d71
fix lint
juliaElastic Oct 30, 2023
1d6c31f
added changelog
juliaElastic Oct 30, 2023
53de755
create bulker for remote es
juliaElastic Oct 30, 2023
5bbc1f1
cleanup unused remoteEsClients and functions
juliaElastic Oct 30, 2023
ab7487d
fix linter
juliaElastic Oct 30, 2023
e901a80
renamed to remote_elasticsearch
juliaElastic Oct 30, 2023
2465b2a
added tracer, removed opts
juliaElastic Nov 1, 2023
1fd90a8
enable intrumentation on remote es client
juliaElastic Nov 1, 2023
8db4da2
Merge branch 'main' into remote-es
juliaElastic Nov 1, 2023
7953456
updated changelog
juliaElastic Nov 1, 2023
9452494
Merge branch 'main' into remote-es
juliaElastic Nov 2, 2023
11bc90c
try copy policyData
juliaElastic Nov 2, 2023
8312727
revert policyData, printing out value of policy
juliaElastic Nov 2, 2023
8cae7a4
setting policyData in other integration tests
juliaElastic Nov 2, 2023
bff9015
added missing param
juliaElastic Nov 2, 2023
a52b32b
watching for remote output cfg changes and restart
juliaElastic Nov 2, 2023
41f4ced
fixes
juliaElastic Nov 2, 2023
649bfdb
fix linter
juliaElastic Nov 2, 2023
f6994bd
fix
juliaElastic Nov 2, 2023
84787bd
copy map instead of reference
juliaElastic Nov 3, 2023
d36f83a
added integration test
juliaElastic Nov 3, 2023
255c28b
unit test on CheckRemoteOutputChanged
juliaElastic Nov 3, 2023
108fe47
fix lint
juliaElastic Nov 3, 2023
fd7e406
fix lint
juliaElastic Nov 3, 2023
4f99d62
added tests on policy_output remote ES
juliaElastic Nov 3, 2023
cc6a02e
added test on parsed_policy change
juliaElastic Nov 3, 2023
6dc08cd
fix lint
juliaElastic Nov 6, 2023
9a0e8b1
invalidate api key if remote output is removed
juliaElastic Nov 7, 2023
830b401
fix lint
juliaElastic Nov 7, 2023
fa4698a
Merge branch 'main' into remote-es
juliaElastic Nov 8, 2023
62a49c6
fix after conflicts
juliaElastic Nov 8, 2023
0f1ed68
reporting remote es error in fleet-server state
juliaElastic Nov 8, 2023
1d865d8
ignore lint error
juliaElastic Nov 8, 2023
72a7249
fix test
juliaElastic Nov 9, 2023
6ee9086
fix test
juliaElastic Nov 9, 2023
374b1ae
fix test
juliaElastic Nov 9, 2023
dad52b0
fixed test
juliaElastic Nov 9, 2023
8d4dbbc
start new bulker if remote config changed
juliaElastic Nov 10, 2023
de6724a
fix tests
juliaElastic Nov 10, 2023
e465d3d
added test for retire remote api key
juliaElastic Nov 10, 2023
bf6c9e3
monitor tests
juliaElastic Nov 10, 2023
2cc1e7d
Merge branch 'main' into remote-es
juliaElastic Nov 13, 2023
608ba26
remote es ping in standalone self monitor
juliaElastic Nov 14, 2023
699ac62
added break
juliaElastic Nov 14, 2023
81899b3
fix lint
juliaElastic Nov 14, 2023
14f9507
fix issue
juliaElastic Nov 14, 2023
963b87a
license header
juliaElastic Nov 14, 2023
180cae8
fix lint
juliaElastic Nov 14, 2023
3989856
cleanup, more test
juliaElastic Nov 15, 2023
6b727b8
added test on engine
juliaElastic Nov 15, 2023
60a3122
use output bulker when read update api key
juliaElastic Nov 15, 2023
8797823
added remote es ping to self.go
juliaElastic Nov 15, 2023
1be0575
review comments
juliaElastic Nov 16, 2023
dcf39f9
invalidate all ids with corresponding output
juliaElastic Nov 16, 2023
bb32a41
only add toRetireAPIKeys if does not exist
juliaElastic Nov 16, 2023
3261472
added a retry loop to integration test
juliaElastic Nov 16, 2023
991c689
fixed monitor tests
juliaElastic Nov 16, 2023
75fdb8f
remove break loop in self monitor
juliaElastic Nov 20, 2023
a8447b2
fix lint
juliaElastic Nov 20, 2023
9c7d6fe
openapi spec: added degraded state to desc
juliaElastic Nov 20, 2023
8df35d1
added 2nd elasticsearch to integration test
juliaElastic Nov 20, 2023
3b2dc11
Merge branch 'main' into remote-es
juliaElastic Nov 20, 2023
10021a8
added build.Info to remote bulkers from fleet
juliaElastic Nov 20, 2023
4c99949
added semaphore for updating bulkerMap
juliaElastic Nov 21, 2023
8182bab
revert self monitor degraded on remote es error
juliaElastic Nov 21, 2023
37106b0
fix tests
juliaElastic Nov 21, 2023
64277ff
verify api key exists in remote es
juliaElastic Nov 21, 2023
ea1d3bd
fix lint
juliaElastic Nov 21, 2023
e15f995
remote remoteOutputErrorMap as not used
juliaElastic Nov 21, 2023
6d4f87e
fix lint
juliaElastic Nov 21, 2023
85142e2
revert openapi description about degraded state
juliaElastic Nov 22, 2023
727daa4
reading output from policies index if bulker not found
juliaElastic Nov 22, 2023
462c063
unit test on handleAck
juliaElastic Nov 22, 2023
96bdb42
added test to query policies
juliaElastic Nov 22, 2023
053fbed
fix test
juliaElastic Nov 23, 2023
60cd6ee
fix integration test
juliaElastic Nov 23, 2023
760ea2f
added integration test for invalidate api key
juliaElastic Nov 23, 2023
9382975
test for child bulker cancel
juliaElastic Nov 23, 2023
0006533
fixed test, replace semaphore with mutex
juliaElastic Nov 27, 2023
41219f0
removed unused arg
juliaElastic Nov 27, 2023
75c890b
added warning log if api keys orphaned
juliaElastic Nov 27, 2023
4d2d67d
removed unused error
juliaElastic Nov 27, 2023
cd877f1
fix lint
juliaElastic Nov 27, 2023
aa7bf11
Merge branch 'main' into remote-es
juliaElastic Nov 27, 2023
d1ffe3f
try to fix test
juliaElastic Nov 27, 2023
5885068
read output secret before prepare remote es
juliaElastic Nov 27, 2023
4d5b906
try to fix test
juliaElastic Nov 27, 2023
d39141a
removed test
juliaElastic Nov 28, 2023
299cdae
removed unused imports
juliaElastic Nov 28, 2023
6fdba62
Merge branch 'main' into remote-es
juliaElastic Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions changelog/fragments/1698660041-remote-es-output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Support for remote elasticsearch output

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3051

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
36 changes: 29 additions & 7 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,15 @@ func (ack *AckT) handlePolicyChange(ctx context.Context, zlog zerolog.Logger, ag
return nil
}

for _, output := range agent.Outputs {
for outputName, output := range agent.Outputs {
if output.Type != policy.OutputTypeElasticsearch {
continue
}

err := ack.updateAPIKey(ctx,
zlog,
agent.Id,
output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds)
output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName)
if err != nil {
return err
}
Expand All @@ -445,20 +445,31 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
zlog zerolog.Logger,
agentID string,
apiKeyID, permissionHash string,
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems) error {
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
bulk := ack.bulk
// use output bulker if exists
if outputName != "" {
outputBulk := ack.bulk.GetBulker(outputName)
if outputBulk != nil {
zlog.Debug().Str("outputName", outputName).Msg("Using output bulker in updateAPIKey")
bulk = outputBulk
}
}
if apiKeyID != "" {
res, err := ack.bulk.APIKeyRead(ctx, apiKeyID, true)
res, err := bulk.APIKeyRead(ctx, apiKeyID, true)
if err != nil {
if isAgentActive(ctx, zlog, ack.bulk, agentID) {
zlog.Error().
Err(err).
Str(LogAPIKeyID, apiKeyID).
Str("outputName", outputName).
Msg("Failed to read API Key roles")
} else {
// race when API key was invalidated before acking
zlog.Info().
Err(err).
Str(LogAPIKeyID, apiKeyID).
Str("outputName", outputName).
Msg("Failed to read invalidated API Key roles")

// prevents future checks
Expand All @@ -473,14 +484,15 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
Str(LogAPIKeyID, apiKeyID).
Msg("Failed to cleanup roles")
} else if removedRolesCount > 0 {
if err := ack.bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Msg("Failed to update API Key")
if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str("outputName", outputName).Msg("Failed to update API Key")
} else {
zlog.Debug().
Str("hash.sha256", permissionHash).
Str(LogAPIKeyID, apiKeyID).
RawJSON("roles", clean).
Int("removedRoles", removedRolesCount).
Str("outputName", outputName).
Msg("Updating agent record to pick up reduced roles.")
}
}
Expand Down Expand Up @@ -556,16 +568,26 @@ func cleanRoles(roles json.RawMessage) (json.RawMessage, int, error) {

func (ack *AckT) invalidateAPIKeys(ctx context.Context, zlog zerolog.Logger, toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, skip string) {
ids := make([]string, 0, len(toRetireAPIKeyIDs))
output := ""
for _, k := range toRetireAPIKeyIDs {
if k.ID == skip || k.ID == "" {
continue
}
ids = append(ids, k.ID)
output = k.Output
}
// using remote es bulker to invalidate api key - supposing all retire api key ids belong to the same remote es
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
bulk := ack.bulk
if output != "" {
outputBulk := ack.bulk.GetBulker(output)
if outputBulk != nil {
bulk = outputBulk
}
}

if len(ids) > 0 {
zlog.Info().Strs("fleet.policy.apiKeyIDsToRetire", ids).Msg("Invalidate old API keys")
if err := ack.bulk.APIKeyInvalidate(ctx, ids...); err != nil {
if err := bulk.APIKeyInvalidate(ctx, ids...); err != nil {
zlog.Info().Err(err).Strs("ids", ids).Msg("Failed to invalidate API keys")
}
}
Expand Down
40 changes: 40 additions & 0 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,46 @@ func TestInvalidateAPIKeys(t *testing.T) {
}
}

func TestInvalidateAPIKeysRemoteOutput(t *testing.T) {
toRetire1 := []model.ToRetireAPIKeyIdsItems{{
ID: "toRetire1",
Output: "remote1",
}}

wants := map[string][]string{
"1": {"toRetire1"},
}

agent := model.Agent{
Outputs: map[string]*model.PolicyOutput{
"1": {ToRetireAPIKeyIds: toRetire1},
},
}

for i, out := range agent.Outputs {
want := wants[i]

bulker := ftesting.NewMockBulk()
remoteBulker := ftesting.NewMockBulk()
bulker.On("GetBulker", mock.AnythingOfType("string")).Return(remoteBulker)
if len(want) > 0 {
remoteBulker.On("APIKeyInvalidate",
context.Background(), mock.MatchedBy(func(ids []string) bool {
// if A contains B and B contains A => A = B
return assert.Subset(t, ids, want) &&
assert.Subset(t, want, ids)
})).
Return(nil)
}

logger := testlog.SetLogger(t)
ack := &AckT{bulk: bulker}
ack.invalidateAPIKeys(context.Background(), logger, out.ToRetireAPIKeyIds, "")

bulker.AssertExpectations(t)
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
}
}

func TestAckHandleUpgrade(t *testing.T) {
tests := []struct {
name string
Expand Down
146 changes: 146 additions & 0 deletions internal/pkg/bulk/bulk_remote_output_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !integration

package bulk

import (
"testing"

testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
"github.com/stretchr/testify/assert"
)

func Test_CheckRemoteOutputChanged(t *testing.T) {
testcases := []struct {
name string
cfg map[string]interface{}
newCfg map[string]interface{}
changed bool
}{
{
name: "initial nil",
cfg: nil,
newCfg: map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []string{"https://remote-es:443"},
"service_token": "token1",
},
changed: false,
},
{
name: "no changes",
cfg: map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []string{"https://remote-es:443"},
"service_token": "token1",
},
newCfg: map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []string{"https://remote-es:443"},
"service_token": "token1",
},
changed: false,
},
{
name: "change to service token",
cfg: map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []string{"https://remote-es:443"},
"service_token": "token1",
},
newCfg: map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []string{"https://remote-es:443"},
"service_token": "token2",
},
changed: true,
},
{
name: "change to advanced config",
cfg: map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []string{"https://remote-es:443"},
"service_token": "token1",
"server.memory_limit": "4",
},
newCfg: map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []string{"https://remote-es:443"},
"service_token": "token1",
"server.memory_limit": "5",
},
changed: true,
}}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
bulker.remoteOutputConfigMap["remote1"] = tc.cfg
hasChanged := bulker.CheckRemoteOutputChanged(log, "remote1", tc.newCfg)
assert.Equal(t, tc.changed, hasChanged)
juliaElastic marked this conversation as resolved.
Show resolved Hide resolved
})
}
}

func Test_CreateAndGetBulkerNew(t *testing.T) {
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
outputMap := make(map[string]map[string]interface{})
outputMap["remote1"] = map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []interface{}{"https://remote-es:443"},
"service_token": "token1",
}
newBulker, hasChanged, err := bulker.CreateAndGetBulker(log, "remote1", "token1", outputMap)
assert.NotNil(t, newBulker)
assert.Equal(t, false, hasChanged)
assert.Nil(t, err)
}

func Test_CreateAndGetBulkerExisting(t *testing.T) {
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
outputBulker := NewBulker(nil, nil)
bulker.bulkerMap["remote1"] = outputBulker
outputMap := make(map[string]map[string]interface{})
cfg := map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []interface{}{"https://remote-es:443"},
"service_token": "token1",
}
bulker.remoteOutputConfigMap["remote1"] = cfg
outputMap["remote1"] = cfg
newBulker, hasChanged, err := bulker.CreateAndGetBulker(log, "remote1", "token1", outputMap)
assert.Equal(t, outputBulker, newBulker)
assert.Equal(t, false, hasChanged)
assert.Nil(t, err)
}

func Test_CreateAndGetBulkerChanged(t *testing.T) {
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
outputBulker := NewBulker(nil, nil)
bulker.bulkerMap["remote1"] = outputBulker
outputMap := make(map[string]map[string]interface{})
bulker.remoteOutputConfigMap["remote1"] = map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []interface{}{"https://remote-es:443"},
"service_token": "token1",
}
outputMap["remote1"] = map[string]interface{}{
"type": "remote_elasticsearch",
"hosts": []interface{}{"https://remote-es:443"},
"service_token": "token2",
}
cancelFnCalled := false
outputBulker.cancelFn = func() { cancelFnCalled = true }
newBulker, hasChanged, err := bulker.CreateAndGetBulker(log, "remote1", "token2", outputMap)
assert.NotEqual(t, outputBulker, newBulker)
assert.Equal(t, true, hasChanged)
assert.Nil(t, err)
assert.Equal(t, true, cancelFnCalled)
}
Loading
Loading