From 8797823a149f773f73dae4ce4727168005a94a98 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Wed, 15 Nov 2023 16:20:43 +0100 Subject: [PATCH] added remote es ping to self.go --- internal/pkg/policy/policy_output.go | 2 +- internal/pkg/policy/self.go | 24 +++++ internal/pkg/policy/self_test.go | 150 +++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 1 deletion(-) diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index 1b6e6a7b4..4cb014116 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -189,7 +189,7 @@ func (p *Output) prepareElasticsearch( // document on ES does not have OutputPermissionsHash for any other output // besides the default one. It seems to me error-prone to rely on the default // output permissions hash to generate new API keys for other outputs. - zlog.Debug().Msg("must generate api key as policy output permissions changed") + zlog.Debug().Msg("must update api key as policy output permissions changed") needUpdateKey = true default: zlog.Debug().Msg("policy output permissions are the same") diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index 5a14374a6..e922df9dd 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -225,12 +225,36 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error if value != "" { hasError = true remoteESPayload[key] = value + break } } if hasError { m.state = client.UnitStateDegraded m.reporter.UpdateState(client.UnitStateDegraded, "Could not connect to remote ES output", remoteESPayload) //nolint:errcheck // not clear what to do in failure cases return client.UnitStateDegraded, nil + } else { + bulkerMap := m.bulker.GetBulkerMap() + for outputName, outputBulker := range bulkerMap { + res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx)) + if err != nil { + m.log.Error().Err(err).Msg("error calling remote es ping") + m.state = client.UnitStateDegraded + message := fmt.Sprintf("Could not ping remote ES: %s, error: %s", outputName, err.Error()) + m.reporter.UpdateState(m.state, message, nil) //nolint:errcheck // not clear what to do in failure cases + hasError = true + break + } else if res.StatusCode != 200 { + m.state = client.UnitStateDegraded + message := fmt.Sprintf("Could not connect to remote ES output: %s, status code: %d", outputName, res.StatusCode) + m.log.Debug().Msg(message) + m.reporter.UpdateState(m.state, message, nil) //nolint:errcheck // not clear what to do in failure cases + hasError = true + break + } + } + if hasError { + return m.state, nil + } } state := client.UnitStateHealthy diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index ab347700b..1b8431ee9 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "sync" "testing" "time" @@ -26,6 +27,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" mmock "github.com/elastic/fleet-server/v7/internal/pkg/monitor/mock" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/elastic/fleet-server/v7/internal/pkg/testing/esutil" ) func TestSelfMonitor_DefaultPolicy(t *testing.T) { @@ -49,6 +51,8 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { bulker := ftesting.NewMockBulk() emptyMap := make(map[string]string) bulker.On("GetRemoteOutputErrorMap").Return(emptyMap) + emptyBulkerMap := make(map[string]bulk.Bulk) + bulker.On("GetBulkerMap").Return(emptyBulkerMap) monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter) sm := monitor.(*selfMonitorT) @@ -188,6 +192,8 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { emptyMap := make(map[string]string) bulker.On("GetRemoteOutputErrorMap").Return(emptyMap) + emptyBulkerMap := make(map[string]bulk.Bulk) + bulker.On("GetBulkerMap").Return(emptyBulkerMap) monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter) sm := monitor.(*selfMonitorT) @@ -347,6 +353,8 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { bulker := ftesting.NewMockBulk() emptyMap := make(map[string]string) bulker.On("GetRemoteOutputErrorMap").Return(emptyMap) + emptyBulkerMap := make(map[string]bulk.Bulk) + bulker.On("GetBulkerMap").Return(emptyBulkerMap) monitor := NewSelfMonitor(cfg, bulker, mm, policyID, reporter) sm := monitor.(*selfMonitorT) @@ -485,6 +493,8 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { bulker := ftesting.NewMockBulk() emptyMap := make(map[string]string) bulker.On("GetRemoteOutputErrorMap").Return(emptyMap) + emptyBulkerMap := make(map[string]bulk.Bulk) + bulker.On("GetBulkerMap").Return(emptyBulkerMap) monitor := NewSelfMonitor(cfg, bulker, mm, policyID, reporter) sm := monitor.(*selfMonitorT) @@ -665,6 +675,8 @@ func TestSelfMonitor_RemoteOutput_Degraded(t *testing.T) { errorMap := make(map[string]string) errorMap["remote output"] = "error connecting to remote output" bulker.On("GetRemoteOutputErrorMap").Return(errorMap) + emptyBulkerMap := make(map[string]bulk.Bulk) + bulker.On("GetBulkerMap").Return(emptyBulkerMap) monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter) sm := monitor.(*selfMonitorT) @@ -793,6 +805,9 @@ func TestSelfMonitor_RemoteOutput_Back_To_Healthy(t *testing.T) { emptyMap := make(map[string]string) bulker.On("GetRemoteOutputErrorMap").Return(emptyMap).Once() + emptyBulkerMap := make(map[string]bulk.Bulk) + bulker.On("GetBulkerMap").Return(emptyBulkerMap) + monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter) sm := monitor.(*selfMonitorT) sm.checkTime = 100 * time.Millisecond @@ -891,3 +906,138 @@ func TestSelfMonitor_RemoteOutput_Back_To_Healthy(t *testing.T) { t.Fatal(merr) } } + +func TestSelfMonitor_RemoteOutput_Ping_Degraded(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfg := config.Fleet{ + Agent: config.Agent{ + ID: "agent-id", + }, + } + reporter := &FakeReporter{} + + chHitT := make(chan []es.HitT, 1) + defer close(chHitT) + ms := mmock.NewMockSubscription() + ms.On("Output").Return((<-chan []es.HitT)(chHitT)) + mm := mmock.NewMockMonitor() + mm.On("Subscribe").Return(ms).Once() + mm.On("Unsubscribe", mock.Anything).Return().Once() + bulker := ftesting.NewMockBulk() + + emptyMap := make(map[string]string) + bulker.On("GetRemoteOutputErrorMap").Return(emptyMap) + + bulkerMap := make(map[string]bulk.Bulk) + outputBulker := ftesting.NewMockBulk() + mockES, mocktrans := esutil.MockESClient(t) + + mocktrans.Response = &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: nil, + } + + outputBulker.On("Client").Return(mockES) + bulkerMap["output1"] = outputBulker + bulker.On("GetBulkerMap").Return(bulkerMap) + + monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter) + sm := monitor.(*selfMonitorT) + sm.checkTime = 100 * time.Millisecond + + var policyLock sync.Mutex + var policyResult []model.Policy + sm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) { + policyLock.Lock() + defer policyLock.Unlock() + return policyResult, nil + } + + var tokenLock sync.Mutex + var tokenResult []model.EnrollmentAPIKey + sm.enrollmentTokenF = func(ctx context.Context, bulker bulk.Bulk, policyID string) ([]model.EnrollmentAPIKey, error) { + tokenLock.Lock() + defer tokenLock.Unlock() + return tokenResult, nil + } + + var merr error + var mwg sync.WaitGroup + mwg.Add(1) + go func() { + defer mwg.Done() + merr = monitor.Run(ctx) + }() + + if err := monitor.(*selfMonitorT).waitStart(ctx); err != nil { + t.Fatal(err) + } + + // should be set to starting + ftesting.Retry(t, ctx, func(ctx context.Context) error { + state, msg, _ := reporter.Current() + if state != client.UnitStateStarting { + return fmt.Errorf("should be reported as starting; instead its %s", state) + } + if msg != "Waiting on default policy with Fleet Server integration" { + return fmt.Errorf("should be matching with default policy") + } + return nil + }, ftesting.RetrySleep(1*time.Second)) + + policyID := uuid.Must(uuid.NewV4()).String() + rId := xid.New().String() + pData := model.PolicyData{Inputs: []map[string]interface{}{ + { + "type": "fleet-server", + }, + }} + policy := model.Policy{ + ESDocument: model.ESDocument{ + Id: rId, + Version: 1, + SeqNo: 1, + }, + PolicyID: policyID, + CoordinatorIdx: 1, + Data: &pData, + RevisionIdx: 1, + DefaultFleetServer: true, + } + policyData, err := json.Marshal(&policy) + if err != nil { + t.Fatal(err) + } + + go func() { + chHitT <- []es.HitT{{ + ID: rId, + SeqNo: 1, + Version: 1, + Source: policyData, + }} + policyLock.Lock() + defer policyLock.Unlock() + policyResult = append(policyResult, policy) + }() + + // should be set to degraded because of remote output error + ftesting.Retry(t, ctx, func(ctx context.Context) error { + state, msg, _ := reporter.Current() + if state != client.UnitStateDegraded { + return fmt.Errorf("should be reported as degraded; instead its %s", state) + } + if msg != "Could not connect to remote ES output: output1, status code: 500" { + return fmt.Errorf("expected remote ES error") + } + return nil + }, ftesting.RetrySleep(1*time.Second)) + + cancel() + mwg.Wait() + if merr != nil && merr != context.Canceled { + t.Fatal(merr) + } +}