-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for remote ES output #3051
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i need to reacquaint myself with output policy generation in order to better review the actual work being done
Is there an open issue that we can track? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, great test coverage!
I just think we need to warn if we are going to orphan keys, and we should decide on how much concurrency control we need for the bulker map
internal/pkg/api/handleAck.go
Outdated
// read output config from .fleet-policies, not filtering by policy id as agent could be reassigned | ||
policy, err := dl.QueryOutputFromPolicy(ctx, ack.bulk, outputName) | ||
if err != nil || policy == nil { | ||
zlog.Debug().Str("outputName", outputName).Msg("Output policy not found") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we can't find the policy associated with an output and need to invalidate the API key, that means it's an orphaned key, right?
Should we emit a WARN log about the key for the agent being orphaned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to warning log
remoteBulker2.AssertExpectations(t) | ||
} | ||
|
||
func TestInvalidateAPIKeysRemoteOutputReadFromPolicies(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
great test cases
internal/pkg/bulk/bulk_test.go
Outdated
go func() { | ||
defer wg.Done() | ||
|
||
_, err := childBulker.APIKeyAuth(ctx, apikey.APIKey{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that APIKeyAuth
calls are synchronous; they don't actually get bulked or use the bulkCtx
that is created for the child context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, I might be able to test it with APIKeyUpdate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't seem to work with APIKeyUpdate either, now the test times out after 10m. I tried to Run the bulker but doesn't seem to help.
I'm still getting the error with the invalid es host, do you know how can we mock the es client in bulker for this test?
dial tcp: lookup remote-es: no such host","message":"Error sending bulk API Key update request to Elasticsearch
EDIT: I managed to mock the es client, and now not getting any errors. Shouldn't APIKeyUpdate
use bulkCtx
? It calls waitBulkAction
.
engine.go:256: {"level":"info","name":"remote","message":"remote output configuration has changed"}
engine.go:187: {"level":"debug","outputName":"remote","message":"Bulk context done"}
engine.go:176: {"level":"debug","outputName":"remote","message":"Bulker started"}
opApiKey.go:231: {"level":"debug","IDs":[""],"role":,"message":"API Keys updated."}
bulk_test.go:372: <nil>
bulk_test.go:375: Expected context cancel err: <nil>
--- FAIL: TestCancelCtxChildBulkerReplaced (5.00s)
Maybe there is a way to validate that "Bulk context done"
is logged out?
I feel like I'm spending too much time on this test.
I'm also seeing a data race warning in buildkite for this test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michel-laterman removed this test for now as I couldn't get it working
toRetireAPIKeys = &model.ToRetireAPIKeyIdsItems{ | ||
ID: agentOutput.APIKeyID, | ||
RetiredAt: time.Now().UTC().Format(time.RFC3339), | ||
Output: agentOutputName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a good solution to get this out, thanks for all the effort!
This is done here in Fleet Server's "agent" module for interacting with the control protocol: fleet-server/internal/pkg/server/agent.go Line 170 in 0dbce8c
EDIT: I think the state is actually updates in the fleet module, see these examples: fleet-server/internal/pkg/server/fleet.go Line 199 in 0dbce8c
I believe we could modify the output unit's state to degraded with an informative message whenever one of the remote outputs are not accessible. |
Yes, Craig gave some pointers last week, though it's not as easy as it sounds as the outputs are not assigned to the Fleet server policy, so these units should be added as extra units to the state reporter. I'll have to dig into this to understand how this works, which I'll do in a follow up pr. |
Yeah we might have to hack it somehow and just use the single output Fleet Server is assigned to report on all remote outputs it connects to. Agree on a follow up 👍 |
@@ -748,6 +748,13 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a | |||
} | |||
|
|||
data := model.ClonePolicyData(pp.Policy.Data) | |||
for policyName, policyOutput := range data.Outputs { | |||
err := policy.ProcessOutputSecret(ctx, policyOutput, bulker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is needed to read remote ES output's service_token
from .fleet-secrets
before calling Prepare
.
Tested together with the kibana changes to save service_token
as secret: elastic/kibana#171875
This function is a noop if the output secrets are not enabled.
@joshdover I'm coming back to this, and if we start reporting degraded state on at least one unit, Fleet Server will go to overall unhealthy state and show degraded state on EDIT: I'm planning to continue with the data stream output health reporting instead, supposing it is required for 8.12: #3116 |
## Summary Related to elastic#104986 Making remote ES output's service_token a secret. fleet-server change here: elastic/fleet-server#3051 (comment) Steps to verify: - Enable remote ES output and output secrets in `kibana.dev.yml` locally: ``` xpack.fleet.enableExperimental: ['remoteESOutput', 'outputSecretsStorage'] ``` - Start es, kibana, fleet-server locally and start a second es locally - see detailed steps here: elastic/fleet-server#3051 - Create a remote ES output, verify that the service_token is stored as a secret reference ``` GET .kibana_ingest/_search?q=type:ingest-outputs ``` - Verify that the enrolled agent sends data to the remote ES successfully <img width="561" alt="image" src="https://github.com/elastic/kibana/assets/90178898/122d9800-a2ec-47f8-97a7-acf64b87172a"> <img width="549" alt="image" src="https://github.com/elastic/kibana/assets/90178898/e1751bdd-5aaf-4f68-9f92-7076b306cdfe"> ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios
SonarQube Quality Gate |
Changes in this pr:
Fleet Server handles remote_elasticsearch outputs: creates a new child bulker for each, uses this bulker to read, create, update API keys for remote ES
if remote config changed, stop and start a new child bulker
API keys invalidated if remote ES output is removed from agent policy
added unit and integration tests to cover the changes
will be added in another pr: report error state if remote ES is not accessible
What is the problem this PR solves?
Remote ES output support in Fleet Server
How does this PR solve the problem?
Generate API keys for agents using remote ES output host and service_token created for fleet-server-remote
How to test this PR locally
Design Checklist
Checklist
./changelog/fragments
using the changelog toolRelated issues
Closes https://github.com/elastic/ingest-dev/issues/1017