Skip to content

Commit

Permalink
added remote es ping to self.go
Browse files Browse the repository at this point in the history
  • Loading branch information
juliaElastic committed Nov 15, 2023
1 parent 60a3122 commit 8797823
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 1 deletion.
2 changes: 1 addition & 1 deletion internal/pkg/policy/policy_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *Output) prepareElasticsearch(
// document on ES does not have OutputPermissionsHash for any other output
// besides the default one. It seems to me error-prone to rely on the default
// output permissions hash to generate new API keys for other outputs.
zlog.Debug().Msg("must generate api key as policy output permissions changed")
zlog.Debug().Msg("must update api key as policy output permissions changed")
needUpdateKey = true
default:
zlog.Debug().Msg("policy output permissions are the same")
Expand Down
24 changes: 24 additions & 0 deletions internal/pkg/policy/self.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,36 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error
if value != "" {
hasError = true
remoteESPayload[key] = value
break
}
}
if hasError {
m.state = client.UnitStateDegraded
m.reporter.UpdateState(client.UnitStateDegraded, "Could not connect to remote ES output", remoteESPayload) //nolint:errcheck // not clear what to do in failure cases
return client.UnitStateDegraded, nil
} else {
bulkerMap := m.bulker.GetBulkerMap()
for outputName, outputBulker := range bulkerMap {
res, err := outputBulker.Client().Ping(outputBulker.Client().Ping.WithContext(ctx))
if err != nil {
m.log.Error().Err(err).Msg("error calling remote es ping")
m.state = client.UnitStateDegraded
message := fmt.Sprintf("Could not ping remote ES: %s, error: %s", outputName, err.Error())
m.reporter.UpdateState(m.state, message, nil) //nolint:errcheck // not clear what to do in failure cases
hasError = true
break
} else if res.StatusCode != 200 {
m.state = client.UnitStateDegraded
message := fmt.Sprintf("Could not connect to remote ES output: %s, status code: %d", outputName, res.StatusCode)
m.log.Debug().Msg(message)
m.reporter.UpdateState(m.state, message, nil) //nolint:errcheck // not clear what to do in failure cases
hasError = true
break
}
}
if hasError {
return m.state, nil
}
}

state := client.UnitStateHealthy
Expand Down
150 changes: 150 additions & 0 deletions internal/pkg/policy/self_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"testing"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/model"
mmock "github.com/elastic/fleet-server/v7/internal/pkg/monitor/mock"
ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing"
"github.com/elastic/fleet-server/v7/internal/pkg/testing/esutil"
)

func TestSelfMonitor_DefaultPolicy(t *testing.T) {
Expand All @@ -49,6 +51,8 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) {
bulker := ftesting.NewMockBulk()
emptyMap := make(map[string]string)
bulker.On("GetRemoteOutputErrorMap").Return(emptyMap)
emptyBulkerMap := make(map[string]bulk.Bulk)
bulker.On("GetBulkerMap").Return(emptyBulkerMap)

monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter)
sm := monitor.(*selfMonitorT)
Expand Down Expand Up @@ -188,6 +192,8 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) {

emptyMap := make(map[string]string)
bulker.On("GetRemoteOutputErrorMap").Return(emptyMap)
emptyBulkerMap := make(map[string]bulk.Bulk)
bulker.On("GetBulkerMap").Return(emptyBulkerMap)

monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter)
sm := monitor.(*selfMonitorT)
Expand Down Expand Up @@ -347,6 +353,8 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) {
bulker := ftesting.NewMockBulk()
emptyMap := make(map[string]string)
bulker.On("GetRemoteOutputErrorMap").Return(emptyMap)
emptyBulkerMap := make(map[string]bulk.Bulk)
bulker.On("GetBulkerMap").Return(emptyBulkerMap)

monitor := NewSelfMonitor(cfg, bulker, mm, policyID, reporter)
sm := monitor.(*selfMonitorT)
Expand Down Expand Up @@ -485,6 +493,8 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) {
bulker := ftesting.NewMockBulk()
emptyMap := make(map[string]string)
bulker.On("GetRemoteOutputErrorMap").Return(emptyMap)
emptyBulkerMap := make(map[string]bulk.Bulk)
bulker.On("GetBulkerMap").Return(emptyBulkerMap)

monitor := NewSelfMonitor(cfg, bulker, mm, policyID, reporter)
sm := monitor.(*selfMonitorT)
Expand Down Expand Up @@ -665,6 +675,8 @@ func TestSelfMonitor_RemoteOutput_Degraded(t *testing.T) {
errorMap := make(map[string]string)
errorMap["remote output"] = "error connecting to remote output"
bulker.On("GetRemoteOutputErrorMap").Return(errorMap)
emptyBulkerMap := make(map[string]bulk.Bulk)
bulker.On("GetBulkerMap").Return(emptyBulkerMap)

monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter)
sm := monitor.(*selfMonitorT)
Expand Down Expand Up @@ -793,6 +805,9 @@ func TestSelfMonitor_RemoteOutput_Back_To_Healthy(t *testing.T) {
emptyMap := make(map[string]string)
bulker.On("GetRemoteOutputErrorMap").Return(emptyMap).Once()

emptyBulkerMap := make(map[string]bulk.Bulk)
bulker.On("GetBulkerMap").Return(emptyBulkerMap)

monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter)
sm := monitor.(*selfMonitorT)
sm.checkTime = 100 * time.Millisecond
Expand Down Expand Up @@ -891,3 +906,138 @@ func TestSelfMonitor_RemoteOutput_Back_To_Healthy(t *testing.T) {
t.Fatal(merr)
}
}

func TestSelfMonitor_RemoteOutput_Ping_Degraded(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cfg := config.Fleet{
Agent: config.Agent{
ID: "agent-id",
},
}
reporter := &FakeReporter{}

chHitT := make(chan []es.HitT, 1)
defer close(chHitT)
ms := mmock.NewMockSubscription()
ms.On("Output").Return((<-chan []es.HitT)(chHitT))
mm := mmock.NewMockMonitor()
mm.On("Subscribe").Return(ms).Once()
mm.On("Unsubscribe", mock.Anything).Return().Once()
bulker := ftesting.NewMockBulk()

emptyMap := make(map[string]string)
bulker.On("GetRemoteOutputErrorMap").Return(emptyMap)

bulkerMap := make(map[string]bulk.Bulk)
outputBulker := ftesting.NewMockBulk()
mockES, mocktrans := esutil.MockESClient(t)

mocktrans.Response = &http.Response{
StatusCode: http.StatusInternalServerError,
Body: nil,
}

outputBulker.On("Client").Return(mockES)
bulkerMap["output1"] = outputBulker
bulker.On("GetBulkerMap").Return(bulkerMap)

monitor := NewSelfMonitor(cfg, bulker, mm, "", reporter)
sm := monitor.(*selfMonitorT)
sm.checkTime = 100 * time.Millisecond

var policyLock sync.Mutex
var policyResult []model.Policy
sm.policyF = func(ctx context.Context, bulker bulk.Bulk, opt ...dl.Option) ([]model.Policy, error) {
policyLock.Lock()
defer policyLock.Unlock()
return policyResult, nil
}

var tokenLock sync.Mutex
var tokenResult []model.EnrollmentAPIKey
sm.enrollmentTokenF = func(ctx context.Context, bulker bulk.Bulk, policyID string) ([]model.EnrollmentAPIKey, error) {
tokenLock.Lock()
defer tokenLock.Unlock()
return tokenResult, nil
}

var merr error
var mwg sync.WaitGroup
mwg.Add(1)
go func() {
defer mwg.Done()
merr = monitor.Run(ctx)
}()

if err := monitor.(*selfMonitorT).waitStart(ctx); err != nil {
t.Fatal(err)
}

// should be set to starting
ftesting.Retry(t, ctx, func(ctx context.Context) error {
state, msg, _ := reporter.Current()
if state != client.UnitStateStarting {
return fmt.Errorf("should be reported as starting; instead its %s", state)
}
if msg != "Waiting on default policy with Fleet Server integration" {
return fmt.Errorf("should be matching with default policy")
}
return nil
}, ftesting.RetrySleep(1*time.Second))

policyID := uuid.Must(uuid.NewV4()).String()
rId := xid.New().String()
pData := model.PolicyData{Inputs: []map[string]interface{}{
{
"type": "fleet-server",
},
}}
policy := model.Policy{
ESDocument: model.ESDocument{
Id: rId,
Version: 1,
SeqNo: 1,
},
PolicyID: policyID,
CoordinatorIdx: 1,
Data: &pData,
RevisionIdx: 1,
DefaultFleetServer: true,
}
policyData, err := json.Marshal(&policy)
if err != nil {
t.Fatal(err)
}

go func() {
chHitT <- []es.HitT{{
ID: rId,
SeqNo: 1,
Version: 1,
Source: policyData,
}}
policyLock.Lock()
defer policyLock.Unlock()
policyResult = append(policyResult, policy)
}()

// should be set to degraded because of remote output error
ftesting.Retry(t, ctx, func(ctx context.Context) error {
state, msg, _ := reporter.Current()
if state != client.UnitStateDegraded {
return fmt.Errorf("should be reported as degraded; instead its %s", state)
}
if msg != "Could not connect to remote ES output: output1, status code: 500" {
return fmt.Errorf("expected remote ES error")
}
return nil
}, ftesting.RetrySleep(1*time.Second))

cancel()
mwg.Wait()
if merr != nil && merr != context.Canceled {
t.Fatal(merr)
}
}

0 comments on commit 8797823

Please sign in to comment.