diff --git a/internal/pkg/server/remote_es_output_integration_test.go b/internal/pkg/server/remote_es_output_integration_test.go index 4c408f3d8..9b78eb293 100644 --- a/internal/pkg/server/remote_es_output_integration_test.go +++ b/internal/pkg/server/remote_es_output_integration_test.go @@ -20,17 +20,17 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/apikey" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/gofrs/uuid" "github.com/hashicorp/go-cleanhttp" "github.com/stretchr/testify/require" ) -func Checkin(t *testing.T, ctx context.Context, srv *tserver, agentID, key string) string { - str := agentID +func Checkin(t *testing.T, ctx context.Context, srv *tserver, agentID, key string, shouldHaveRemoveES bool) (string, string) { cli := cleanhttp.DefaultClient() var obj map[string]interface{} - t.Logf("Fake a checkin for agent %s", str) - req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+str+"/checkin", strings.NewReader(checkinBody)) + t.Logf("Fake a checkin for agent %s", agentID) + req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+agentID+"/checkin", strings.NewReader(checkinBody)) require.NoError(t, err) req.Header.Set("Authorization", "ApiKey "+key) req.Header.Set("User-Agent", "elastic agent "+serverVersion) @@ -49,9 +49,15 @@ func Checkin(t *testing.T, ctx context.Context, srv *tserver, agentID, key strin require.True(t, ok, "expected actions is missing") actions, ok := actionsRaw.([]interface{}) require.True(t, ok, "expected actions to be an array") - require.Greater(t, len(actions), 0, "expected at least 1 action") + require.Equal(t, len(actions), 1, "expected 1 action") action, ok := actions[0].(map[string]interface{}) require.True(t, ok, "expected action to be an object") + + aIDRaw, ok := action["id"] + require.True(t, ok, "expected action id attribute missing") + actionID, ok := aIDRaw.(string) + require.True(t, ok, "expected action id to be string") + typeRaw := action["type"] require.Equal(t, "POLICY_CHANGE", typeRaw) dataRaw := action["data"] @@ -61,21 +67,57 @@ func Checkin(t *testing.T, ctx context.Context, srv *tserver, agentID, key strin require.True(t, ok, "expected policy to be map") outputs, ok := policy["outputs"].(map[string]interface{}) require.True(t, ok, "expected outputs to be map") - remoteES, ok := outputs["remoteES"].(map[string]interface{}) - require.True(t, ok, "expected remoteES to be map") - oType, ok := remoteES["type"].(string) - require.True(t, ok, "expected type to be string") - require.Equal(t, "elasticsearch", oType) - serviceToken := remoteES["service_token"] - require.Equal(t, nil, serviceToken) - remoteAPIKey, ok := remoteES["api_key"].(string) - require.True(t, ok, "expected remoteAPIKey to be string") + var remoteAPIKey string + if shouldHaveRemoveES { + remoteES, ok := outputs["remoteES"].(map[string]interface{}) + require.True(t, ok, "expected remoteES to be map") + oType, ok := remoteES["type"].(string) + require.True(t, ok, "expected type to be string") + require.Equal(t, "elasticsearch", oType) + serviceToken := remoteES["service_token"] + require.Equal(t, nil, serviceToken) + remoteAPIKey, ok = remoteES["api_key"].(string) + require.True(t, ok, "expected remoteAPIKey to be string") + } defaultOutput, ok := outputs["default"].(map[string]interface{}) require.True(t, ok, "expected default to be map") defaultAPIKey, ok := defaultOutput["api_key"].(string) require.True(t, ok, "expected defaultAPIKey to be string") require.NotEqual(t, remoteAPIKey, defaultAPIKey, "expected remote api key to be different than default") - return remoteAPIKey + return remoteAPIKey, actionID +} + +func Ack(t *testing.T, ctx context.Context, srv *tserver, actionID, agentID, key string) { + t.Logf("Fake an ack for action %s for agent %s", actionID, agentID) + body := fmt.Sprintf(`{ + "events": [{ + "action_id": "%s", + "agent_id": "%s", + "message": "test-message", + "type": "ACTION_RESULT", + "subtype": "ACKNOWLEDGED" + }] + }`, actionID, agentID) + req, err := http.NewRequestWithContext(ctx, "POST", srv.baseURL()+"/api/fleet/agents/"+agentID+"/acks", strings.NewReader(body)) + require.NoError(t, err) + req.Header.Set("Authorization", "ApiKey "+key) + req.Header.Set("Content-Type", "application/json") + cli := cleanhttp.DefaultClient() + res, err := cli.Do(req) + require.NoError(t, err) + + require.Equal(t, http.StatusOK, res.StatusCode) + t.Log("Ack successful, verify body") + p, _ := io.ReadAll(res.Body) + res.Body.Close() + var ackObj map[string]interface{} + err = json.Unmarshal(p, &ackObj) + require.NoError(t, err) + + // NOTE the checkin response will only have the errors attribute if it's set to true in the response. + // When decoding to a (typed) struct, the default will implicitly be false if it's missing + _, ok := ackObj["errors"] + require.Falsef(t, ok, "expected response to have no errors attribute, errors are present: %+v", ackObj) } func Test_Agent_Remote_ES_Output(t *testing.T) { @@ -98,7 +140,7 @@ func Test_Agent_Remote_ES_Output(t *testing.T) { t.Log("Create policy with remote ES output") - var policyRemoteID = "policyRemoteID" + var policyRemoteID = uuid.Must(uuid.NewV4()).String() remoteESHost := "localhost:9201" var policyDataRemoteES = model.PolicyData{ Outputs: map[string]map[string]interface{}{ @@ -113,6 +155,7 @@ func Test_Agent_Remote_ES_Output(t *testing.T) { }, OutputPermissions: json.RawMessage(`{"default": {}, "remoteES": {}}`), Inputs: []map[string]interface{}{}, + Agent: json.RawMessage(`{"monitoring": {"use_output":"remoteES"}}`), } _, err = dl.CreatePolicy(ctx, srv.bulker, model.Policy{ @@ -165,15 +208,52 @@ func Test_Agent_Remote_ES_Output(t *testing.T) { // cleanup defer func() { - err2 := srv.bulker.Delete(ctx, dl.FleetAgents, agentID) - if err2 != nil { + err = srv.bulker.Delete(ctx, dl.FleetAgents, agentID) + if err != nil { t.Log("could not clean up agent") } }() - remoteAPIKey := Checkin(t, ctx, srvCopy, agentID, key) + remoteAPIKey, actionID := Checkin(t, ctx, srvCopy, agentID, key, true) apiKeyID := strings.Split(remoteAPIKey, ":")[0] + verifyRemoteAPIKey(t, ctx, remoteESHost, apiKeyID, false) + + Ack(t, ctx, srvCopy, actionID, agentID, key) + + t.Log("Update policy to remove remote ES output") + + var policyData = model.PolicyData{ + Outputs: map[string]map[string]interface{}{ + "default": { + "type": "elasticsearch", + }, + }, + OutputPermissions: json.RawMessage(`{"default": {}}`), + Inputs: []map[string]interface{}{}, + } + + _, err = dl.CreatePolicy(ctx, srv.bulker, model.Policy{ + PolicyID: policyRemoteID, + RevisionIdx: 2, + DefaultFleetServer: false, + Data: &policyData, + }) + if err != nil { + t.Fatal(err) + } + + t.Log("Checkin so that agent gets new policy revision") + _, actionID = Checkin(t, ctx, srvCopy, agentID, key, false) + + t.Log("Ack so that fleet triggers remote api key invalidate") + Ack(t, ctx, srvCopy, actionID, agentID, key) + + verifyRemoteAPIKey(t, ctx, remoteESHost, apiKeyID, true) + +} + +func verifyRemoteAPIKey(t *testing.T, ctx context.Context, remoteESHost, apiKeyID string, invalidated bool) { // need to wait a bit before querying the api key time.Sleep(time.Second) @@ -194,5 +274,5 @@ func Test_Agent_Remote_ES_Output(t *testing.T) { respString, err := io.ReadAll(res.Body) require.NoError(t, err, "did not expect error when parsing api key response") - require.Contains(t, string(respString), "\"invalidated\":false") + require.Contains(t, string(respString), fmt.Sprintf("\"invalidated\":%t", invalidated)) }