From 08318370b738df50bc550cde0031c924a7e771d0 Mon Sep 17 00:00:00 2001 From: Michel Laterman <82832767+michel-laterman@users.noreply.github.com> Date: Thu, 26 Oct 2023 12:11:41 -0700 Subject: [PATCH] Add top level keys for policy definition into specs (#3048) Define agent policy structure as part of the openapi and json schema specs. Use the generated json schema struct when fleet-server interacts with Elasticsearch. --- .../1698276551-Define-policy-data-schema.yaml | 35 +++ internal/pkg/api/handleCheckin.go | 39 +-- .../coordinator/monitor_integration_test.go | 4 +- internal/pkg/coordinator/v0.go | 8 +- internal/pkg/coordinator/v0_test.go | 6 +- internal/pkg/dl/policies_integration_test.go | 2 +- internal/pkg/model/schema.go | 119 +++++--- internal/pkg/policy/monitor.go | 1 - .../pkg/policy/monitor_integration_test.go | 10 +- internal/pkg/policy/monitor_test.go | 77 ++--- internal/pkg/policy/parsed_policy.go | 81 ++--- internal/pkg/policy/parsed_policy_test.go | 132 +++----- internal/pkg/policy/policy_output.go | 38 +-- .../policy/policy_output_integration_test.go | 3 +- internal/pkg/policy/policy_output_test.go | 25 +- internal/pkg/policy/secret.go | 42 +-- internal/pkg/policy/secret_test.go | 43 ++- internal/pkg/policy/self.go | 26 +- internal/pkg/policy/self_test.go | 74 ++--- internal/pkg/server/agent_integration_test.go | 25 +- internal/pkg/server/fleet_integration_test.go | 4 +- .../server/fleet_secrets_integration_test.go | 21 +- model/openapi.yml | 37 ++- model/schema.json | 289 +++++++++++------- 24 files changed, 573 insertions(+), 568 deletions(-) create mode 100644 changelog/fragments/1698276551-Define-policy-data-schema.yaml diff --git a/changelog/fragments/1698276551-Define-policy-data-schema.yaml b/changelog/fragments/1698276551-Define-policy-data-schema.yaml new file mode 100644 index 000000000..78ffd8970 --- /dev/null +++ b/changelog/fragments/1698276551-Define-policy-data-schema.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Define policy data schema + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + Define the policy.Data section as part of the JSON schema used to interact with Elasticsearch. + All top-level properties have been kept generic as this is the first definition. + The only exception to this is the secret_references property which is fully defined. + +# Affected component; a word indicating the component this changeset affects. +component: + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/api/handleCheckin.go b/internal/pkg/api/handleCheckin.go index fb72fe381..79f029ec3 100644 --- a/internal/pkg/api/handleCheckin.go +++ b/internal/pkg/api/handleCheckin.go @@ -28,7 +28,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" "github.com/elastic/fleet-server/v7/internal/pkg/policy" - "github.com/elastic/fleet-server/v7/internal/pkg/smap" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/hashicorp/go-version" @@ -685,54 +684,32 @@ func processPolicy(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, a return nil, err } - // Parse the outputs maps in order to prepare the outputs - const outputsProperty = "outputs" - outputs, err := smap.Parse(pp.Fields[outputsProperty]) - if err != nil { - return nil, err - } - - if outputs == nil { + if len(pp.Policy.Data.Outputs) == 0 { return nil, ErrNoPolicyOutput } // Iterate through the policy outputs and prepare them for _, policyOutput := range pp.Outputs { - err = policyOutput.Prepare(ctx, zlog, bulker, &agent, outputs) + err = policyOutput.Prepare(ctx, zlog, bulker, &agent, pp.Policy.Data.Outputs) if err != nil { return nil, fmt.Errorf("failed to prepare output %q:: %w", policyOutput.Name, err) } } + // Add replace inputs with prepared version + pp.Policy.Data.Inputs = pp.Inputs - outputRaw, err := json.Marshal(outputs) + p, err := json.Marshal(pp.Policy.Data) if err != nil { return nil, err } - // Dupe field map; pp is immutable - fields := make(map[string]json.RawMessage, len(pp.Fields)) - - for k, v := range pp.Fields { - fields[k] = v - } - - // Update only the output fields to avoid duping the whole map - fields[outputsProperty] = json.RawMessage(outputRaw) - - // replace agent policy inputs with the processed inputs where the secret references were replaced with the secret values - inputsRaw, err := json.Marshal(pp.Inputs) - if err != nil { - return nil, err - } - - fields["inputs"] = json.RawMessage(inputsRaw) - rewrittenPolicy := struct { - Policy map[string]json.RawMessage `json:"policy"` - }{fields} + Policy json.RawMessage `json:"policy"` + }{p} r := policy.RevisionFromPolicy(pp.Policy) + // FIXME use better action definition from open api spec resp := Action{ AgentId: agent.Id, CreatedAt: pp.Policy.Timestamp, diff --git a/internal/pkg/coordinator/monitor_integration_test.go b/internal/pkg/coordinator/monitor_integration_test.go index f2a9a680e..902d1d708 100644 --- a/internal/pkg/coordinator/monitor_integration_test.go +++ b/internal/pkg/coordinator/monitor_integration_test.go @@ -54,7 +54,7 @@ func TestMonitorLeadership(t *testing.T) { policy1 := model.Policy{ PolicyID: policy1Id, CoordinatorIdx: 0, - Data: []byte("{}"), + Data: nil, RevisionIdx: 1, } _, err = dl.CreatePolicy(ctx, bulker, policy1, dl.WithIndexName(policiesIndex)) @@ -85,7 +85,7 @@ func TestMonitorLeadership(t *testing.T) { policy2 := model.Policy{ PolicyID: policy2Id, CoordinatorIdx: 0, - Data: []byte("{}"), + Data: nil, RevisionIdx: 1, } _, err = dl.CreatePolicy(ctx, bulker, policy2, dl.WithIndexName(policiesIndex)) diff --git a/internal/pkg/coordinator/v0.go b/internal/pkg/coordinator/v0.go index 5550357ce..28ec56918 100644 --- a/internal/pkg/coordinator/v0.go +++ b/internal/pkg/coordinator/v0.go @@ -6,7 +6,6 @@ package coordinator import ( "context" - "encoding/json" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -73,13 +72,12 @@ func (c *coordinatorZeroT) Output() <-chan model.Policy { // updatePolicy performs the working of incrementing the coordinator idx. func (c *coordinatorZeroT) updatePolicy(p model.Policy) error { - newData, err := c.handlePolicy(p.Data) + _, err := c.handlePolicy(p.Data) if err != nil { return err } - if p.CoordinatorIdx == 0 || string(newData) != string(p.Data) { + if p.CoordinatorIdx == 0 { p.CoordinatorIdx += 1 - p.Data = newData c.policy = p c.out <- p } @@ -89,6 +87,6 @@ func (c *coordinatorZeroT) updatePolicy(p model.Policy) error { // handlePolicy performs the actual work of coordination. // // Does nothing at the moment. -func (c *coordinatorZeroT) handlePolicy(data json.RawMessage) (json.RawMessage, error) { +func (c *coordinatorZeroT) handlePolicy(data *model.PolicyData) (*model.PolicyData, error) { return data, nil } diff --git a/internal/pkg/coordinator/v0_test.go b/internal/pkg/coordinator/v0_test.go index 1fbd65ad4..d00ec096e 100644 --- a/internal/pkg/coordinator/v0_test.go +++ b/internal/pkg/coordinator/v0_test.go @@ -23,7 +23,7 @@ func TestCoordinatorZero(t *testing.T) { policy := model.Policy{ PolicyID: policyId, CoordinatorIdx: 0, - Data: []byte("{}"), + Data: nil, RevisionIdx: 1, } coord, err := NewCoordinatorZero(policy) @@ -55,7 +55,7 @@ func TestCoordinatorZero(t *testing.T) { policy = model.Policy{ PolicyID: policyId, CoordinatorIdx: 0, - Data: []byte("{}"), + Data: nil, RevisionIdx: 2, } if err := coord.Update(ctx, policy); err != nil { @@ -77,7 +77,7 @@ func TestCoordinatorZero(t *testing.T) { policy = model.Policy{ PolicyID: policyId, CoordinatorIdx: 1, - Data: []byte("{}"), + Data: nil, RevisionIdx: 2, } if err := coord.Update(ctx, policy); err != nil { diff --git a/internal/pkg/dl/policies_integration_test.go b/internal/pkg/dl/policies_integration_test.go index aafc3719a..18cdf4011 100644 --- a/internal/pkg/dl/policies_integration_test.go +++ b/internal/pkg/dl/policies_integration_test.go @@ -25,7 +25,7 @@ func createRandomPolicy(id string, revisionIdx int) model.Policy { PolicyID: id, RevisionIdx: int64(revisionIdx), CoordinatorIdx: 0, - Data: []byte("{}"), + Data: nil, DefaultFleetServer: false, Timestamp: now.Format(time.RFC3339), } diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index b237a3f55..edbf33e06 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -52,10 +52,8 @@ type Action struct { MinimumExecutionDuration int64 `json:"minimum_execution_duration,omitempty"` // The rollout duration (in seconds) provided for an action execution when scheduled by fleet-server. - RolloutDurationSeconds int64 `json:"rollout_duration_seconds,omitempty"` - - // The action signed data and signature. - Signed *Signed `json:"signed,omitempty"` + RolloutDurationSeconds int64 `json:"rollout_duration_seconds,omitempty"` + Signed *Signed `json:"signed,omitempty"` // The action start date/time StartTime string `json:"start_time,omitempty"` @@ -76,14 +74,6 @@ type Action struct { UserID string `json:"user_id,omitempty"` } -// ActionData The opaque payload. -type ActionData struct { -} - -// ActionResponse The custom action response payload. -type ActionResponse struct { -} - // ActionResult An Elastic Agent action results type ActionResult struct { ESDocument @@ -266,16 +256,53 @@ type Artifact struct { PackageName string `json:"package_name,omitempty"` } -// Body Encoded artifact data -type Body struct { +// Checkin An Elastic Agent checkin to Fleet +type Checkin struct { + ESDocument + Agent *AgentMetadata `json:"agent"` + Host *HostMetadata `json:"host,omitempty"` + + // The current overall status message of the Elastic Agent + Message string `json:"message"` + + // The current status of the applied policy + Policy *CheckinPolicy `json:"policy,omitempty"` + Server *ServerMetadata `json:"server,omitempty"` + + // The current overall status of the Elastic Agent + Status string `json:"status"` + + // Date/time the checkin was created + Timestamp string `json:"@timestamp,omitempty"` } -// Components Elastic Agent components detailed status information -type Components struct { +// CheckinPolicy The current status of the applied policy +type CheckinPolicy struct { + + // The ID for the policy + ID string `json:"id"` + + // The current input status per policy + Inputs []CheckinPolicyInputItems `json:"inputs"` + + // The revision of the policy + Revision int64 `json:"revision"` } -// Data The opaque payload. -type Data struct { +// CheckinPolicyInputItems +type CheckinPolicyInputItems struct { + + // The ID for the input + ID string `json:"id"` + + // The current status message of the intput + Message string `json:"message"` + + // The current status of the input + Status string `json:"status"` + + // The template ID for the input + TemplateID string `json:"template_id"` } // EnrollmentAPIKey An Elastic Agent enrollment API key @@ -315,19 +342,13 @@ type HostMetadata struct { Name string `json:"name"` } -// LocalMetadata Local metadata information for the Elastic Agent -type LocalMetadata struct { -} - // Policy A policy that an Elastic Agent is attached to type Policy struct { ESDocument // The coordinator index of the policy - CoordinatorIdx int64 `json:"coordinator_idx"` - - // The opaque payload. - Data json.RawMessage `json:"data"` + CoordinatorIdx int64 `json:"coordinator_idx"` + Data *PolicyData `json:"data"` // True when this policy is the default policy to start Fleet Server DefaultFleetServer bool `json:"default_fleet_server"` @@ -345,6 +366,35 @@ type Policy struct { UnenrollTimeout int64 `json:"unenroll_timeout,omitempty"` } +// PolicyData The policy data that an agent needs to run +type PolicyData struct { + + // The policy's agent configuration details + Agent json.RawMessage `json:"agent,omitempty"` + + // The policy's fleet configuration details + Fleet json.RawMessage `json:"fleet,omitempty"` + + // The policy's ID + ID string `json:"id"` + + // A list of all inputs the agent should run + Inputs []map[string]interface{} `json:"inputs,omitempty"` + + // The Elasticsearch permissions needed to run the policy + OutputPermissions json.RawMessage `json:"output_permissions,omitempty"` + + // A map of all outputs that the agent running the policy can use to send data to. + Outputs map[string]map[string]interface{} `json:"outputs"` + + // The policy revision number. Should match revision_idx + Revision int64 `json:"revision"` + + // A list of all secrets fleet-server needs to inject into the policy before passing it to the agent. This attribute is removed when policy data is send to an agent. + SecretReferences []SecretReferencesItems `json:"secret_references,omitempty"` + Signed *Signed `json:"signed,omitempty"` +} + // PolicyLeader The current leader Fleet Server for a policy type PolicyLeader struct { ESDocument @@ -374,6 +424,11 @@ type PolicyOutput struct { Type string `json:"type"` } +// SecretReferencesItems +type SecretReferencesItems struct { + ID string `json:"id"` +} + // Server A Fleet Server type Server struct { ESDocument @@ -399,10 +454,10 @@ type ServerMetadata struct { type Signed struct { // The base64 encoded, UTF-8 JSON serialized action bytes that are signed. - Data string `json:"data,omitempty"` + Data string `json:"data"` // The base64 encoded signature. - Signature string `json:"signature,omitempty"` + Signature string `json:"signature"` } // ToRetireAPIKeyIdsItems the Output API Keys that were replaced and should be retired @@ -414,11 +469,3 @@ type ToRetireAPIKeyIdsItems struct { // Date/time the API key was retired RetiredAt string `json:"retired_at,omitempty"` } - -// UpgradeDetails Additional upgrade status details. -type UpgradeDetails struct { -} - -// UserProvidedMetadata User provided metadata information for the Elastic Agent -type UserProvidedMetadata struct { -} diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 2c851f28d..ef6d8e0f7 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -173,7 +173,6 @@ LOOP: } func unmarshalHits(hits []es.HitT) ([]model.Policy, error) { - policies := make([]model.Policy, len(hits)) for i, hit := range hits { err := hit.Unmarshal(&policies[i]) diff --git a/internal/pkg/policy/monitor_integration_test.go b/internal/pkg/policy/monitor_integration_test.go index 9b854c65d..fd904fecb 100644 --- a/internal/pkg/policy/monitor_integration_test.go +++ b/internal/pkg/policy/monitor_integration_test.go @@ -21,7 +21,13 @@ import ( ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" ) -var policyBytes = []byte(`{"outputs":{"default":{"type":"elasticsearch"}}}`) +var intPolData = model.PolicyData{ + Outputs: map[string]map[string]interface{}{ + "default": { + "type": "elasticsearch", + }, + }, +} func TestMonitor_Integration(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -75,7 +81,7 @@ func TestMonitor_Integration(t *testing.T) { policy := model.Policy{ PolicyID: policyID, CoordinatorIdx: 1, - Data: policyBytes, + Data: &intPolData, RevisionIdx: 1, } ch := make(chan error, 1) diff --git a/internal/pkg/policy/monitor_test.go b/internal/pkg/policy/monitor_test.go index b12c9a86a..8edcd3c60 100644 --- a/internal/pkg/policy/monitor_test.go +++ b/internal/pkg/policy/monitor_test.go @@ -17,6 +17,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/rs/xid" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" @@ -27,7 +28,13 @@ import ( testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) -var policyBytes = []byte(`{"outputs":{"default":{"type":"elasticsearch"}}}`) +var policyDataDefault = &model.PolicyData{ + Outputs: map[string]map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + }, + }, +} func TestMonitor_NewPolicy(t *testing.T) { _ = testlog.SetLogger(t) @@ -57,17 +64,14 @@ func TestMonitor_NewPolicy(t *testing.T) { merr = monitor.Run(ctx) }() - if err := monitor.(*monitorT).waitStart(ctx); err != nil { - t.Fatal(err) - } + err := monitor.(*monitorT).waitStart(ctx) + require.NoError(t, err) agentId := uuid.Must(uuid.NewV4()).String() policyID := uuid.Must(uuid.NewV4()).String() s, err := monitor.Subscribe(agentId, policyID, 0, 0) defer monitor.Unsubscribe(s) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) rId := xid.New().String() policy := model.Policy{ @@ -78,13 +82,12 @@ func TestMonitor_NewPolicy(t *testing.T) { }, PolicyID: policyID, CoordinatorIdx: 1, - Data: policyBytes, + Data: policyDataDefault, RevisionIdx: 1, } policyData, err := json.Marshal(&policy) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + chHitT <- []es.HitT{{ ID: rId, SeqNo: 1, @@ -98,9 +101,7 @@ func TestMonitor_NewPolicy(t *testing.T) { case subPolicy := <-s.Output(): tm.Stop() diff := cmp.Diff(policy, subPolicy.Policy) - if diff != "" { - t.Fatal(diff) - } + require.Empty(t, diff) case <-tm.C: timedout = true } @@ -110,9 +111,7 @@ func TestMonitor_NewPolicy(t *testing.T) { if merr != nil && merr != context.Canceled { t.Fatal(merr) } - if timedout { - t.Fatal("never got policy update; timed out after 2s") - } + require.False(t, timedout, "never got policy update; timed out after 2s") ms.AssertExpectations(t) mm.AssertExpectations(t) } @@ -149,9 +148,7 @@ func TestMonitor_SamePolicy(t *testing.T) { policyId := uuid.Must(uuid.NewV4()).String() s, err := monitor.Subscribe(agentId, policyId, 1, 1) defer monitor.Unsubscribe(s) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) rId := xid.New().String() policy := model.Policy{ @@ -162,13 +159,12 @@ func TestMonitor_SamePolicy(t *testing.T) { }, PolicyID: policyId, CoordinatorIdx: 1, - Data: policyBytes, + Data: policyDataDefault, RevisionIdx: 1, } policyData, err := json.Marshal(&policy) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + chHitT <- []es.HitT{{ ID: rId, SeqNo: 1, @@ -190,9 +186,7 @@ func TestMonitor_SamePolicy(t *testing.T) { if merr != nil && merr != context.Canceled { t.Fatal(merr) } - if gotPolicy { - t.Fatal("got policy update when it was the same rev/coord idx") - } + require.False(t, gotPolicy, "got policy update when it was the same rev/coord idx") ms.AssertExpectations(t) mm.AssertExpectations(t) } @@ -229,9 +223,7 @@ func TestMonitor_NewPolicyUncoordinated(t *testing.T) { policyId := uuid.Must(uuid.NewV4()).String() s, err := monitor.Subscribe(agentId, policyId, 1, 1) defer monitor.Unsubscribe(s) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) rId := xid.New().String() policy := model.Policy{ @@ -242,13 +234,12 @@ func TestMonitor_NewPolicyUncoordinated(t *testing.T) { }, PolicyID: policyId, CoordinatorIdx: 0, - Data: policyBytes, + Data: policyDataDefault, RevisionIdx: 2, } policyData, err := json.Marshal(&policy) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + chHitT <- []es.HitT{{ ID: rId, SeqNo: 1, @@ -270,9 +261,7 @@ func TestMonitor_NewPolicyUncoordinated(t *testing.T) { if merr != nil && merr != context.Canceled { t.Fatal(merr) } - if gotPolicy { - t.Fatal("got policy update when it had coordinator_idx set to 0") - } + require.False(t, gotPolicy, "got policy update when it had coordinator_idx set to 0") ms.AssertExpectations(t) mm.AssertExpectations(t) } @@ -325,7 +314,7 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { }, PolicyID: policyId, CoordinatorIdx: 1, - Data: policyBytes, + Data: policyDataDefault, RevisionIdx: 2, } @@ -344,9 +333,7 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { s, err := monitor.Subscribe(agentId, policyId, 1, 1) defer monitor.Unsubscribe(s) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) timedout := false tm := time.NewTimer(2 * time.Second) @@ -354,9 +341,7 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { case subPolicy := <-s.Output(): tm.Stop() diff := cmp.Diff(policy, subPolicy.Policy) - if diff != "" { - t.Fatal(diff) - } + require.Empty(t, diff) case <-tm.C: timedout = true } @@ -366,7 +351,5 @@ func runTestMonitor_NewPolicyExists(t *testing.T, delay time.Duration) { if merr != nil && merr != context.Canceled { t.Fatal(merr) } - if timedout { - t.Fatal("never got policy update; timed out after 500ms") - } + require.False(t, timedout, "never got policy update; timed out after 500ms") } diff --git a/internal/pkg/policy/parsed_policy.go b/internal/pkg/policy/parsed_policy.go index 87f8c5050..53bf53e85 100644 --- a/internal/pkg/policy/parsed_policy.go +++ b/internal/pkg/policy/parsed_policy.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" @@ -43,7 +44,6 @@ type ParsedPolicyDefaults struct { type ParsedPolicy struct { Policy model.Policy - Fields map[string]json.RawMessage Roles RoleMapT Outputs map[string]Output Default ParsedPolicyDefaults @@ -53,35 +53,21 @@ type ParsedPolicy struct { func NewParsedPolicy(ctx context.Context, bulker bulk.Bulk, p model.Policy) (*ParsedPolicy, error) { var err error - - var fields map[string]json.RawMessage - if err = json.Unmarshal(p.Data, &fields); err != nil { - return nil, err - } - // Interpret the output permissions if available var roles map[string]RoleT - if perms := fields[FieldOutputPermissions]; len(perms) != 0 { - if roles, err = parsePerms(perms); err != nil { - return nil, err - } - } - - // Find the default role. - outputs, ok := fields[FieldOutputs] - if !ok { - return nil, ErrOutputsNotFound + if roles, err = parsePerms(p.Data.OutputPermissions); err != nil { + return nil, err } - policyOutputs, err := constructPolicyOutputs(outputs, roles) + policyOutputs, err := constructPolicyOutputs(p.Data.Outputs, roles) if err != nil { return nil, err } - defaultName, err := findDefaultOutputName(outputs) + defaultName, err := findDefaultOutputName(p.Data.Outputs) if err != nil { return nil, err } - policyInputs, err := getPolicyInputsWithSecrets(ctx, fields, bulker) + policyInputs, err := getPolicyInputsWithSecrets(ctx, p.Data, bulker) if err != nil { return nil, err } @@ -89,7 +75,6 @@ func NewParsedPolicy(ctx context.Context, bulker bulk.Bulk, p model.Policy) (*Pa // We are cool and the gang pp := &ParsedPolicy{ Policy: p, - Fields: fields, Roles: roles, Outputs: policyOutputs, Default: ParsedPolicyDefaults{ @@ -109,20 +94,17 @@ func NewParsedPolicy(ctx context.Context, bulker bulk.Bulk, p model.Policy) (*Pa return pp, nil } -func constructPolicyOutputs(outputsRaw json.RawMessage, roles map[string]RoleT) (map[string]Output, error) { - result := make(map[string]Output) - - outputsMap, err := smap.Parse(outputsRaw) - if err != nil { - return result, err - } - - for k := range outputsMap { - v := outputsMap.GetMap(k) +func constructPolicyOutputs(outputs map[string]map[string]interface{}, roles map[string]RoleT) (map[string]Output, error) { + result := make(map[string]Output, len(outputs)) + for k, v := range outputs { + typeStr, ok := v["type"].(string) + if !ok { + return nil, fmt.Errorf("missing or invalid output type: %+v", v) + } p := Output{ Name: k, - Type: v.GetString(FieldOutputType), + Type: typeStr, } if role, ok := roles[k]; ok { @@ -166,34 +148,33 @@ func parsePerms(permsRaw json.RawMessage) (RoleMapT, error) { return m, nil } -func findDefaultOutputName(outputsRaw json.RawMessage) (string, error) { - outputsMap, err := smap.Parse(outputsRaw) - if err != nil { - return "", err - } - +// findDefaultName returns the name of the 1st output with the "elasticsearch" type or falls back to behaviour that relies on deprecated fields. +// +// Previous fleet-server and elastic-agent released had a default output which was removed Sept 2021. +func findDefaultOutputName(outputs map[string]map[string]interface{}) (string, error) { // iterate across the keys finding the defaults var defaults []string var ESdefaults []string - for k := range outputsMap { - - v := outputsMap.GetMap(k) - + for k, v := range outputs { if v != nil { - outputType := v.GetString(FieldOutputType) - if outputType == OutputTypeElasticsearch { + typeStr, ok := v["type"].(string) + if ok && typeStr == OutputTypeElasticsearch { ESdefaults = append(ESdefaults, k) continue } - fleetServer := v.GetMap(FieldOutputFleetServer) - if fleetServer == nil { + + fleetServer, ok := v[FieldOutputFleetServer] + if !ok { defaults = append(defaults, k) continue } - serviceToken := fleetServer.GetString(FieldOutputServiceToken) - if serviceToken == "" { - defaults = append(defaults, k) - continue + fsMap, ok := fleetServer.(map[string]interface{}) + if ok { + str, ok := fsMap[FieldOutputServiceToken].(string) + if ok && str == "" { + defaults = append(defaults, k) + continue + } } } } diff --git a/internal/pkg/policy/parsed_policy_test.go b/internal/pkg/policy/parsed_policy_test.go index 6941efa1e..72f7aec6f 100644 --- a/internal/pkg/policy/parsed_policy_test.go +++ b/internal/pkg/policy/parsed_policy_test.go @@ -11,107 +11,65 @@ import ( "encoding/json" "testing" + "github.com/stretchr/testify/require" + "github.com/elastic/fleet-server/v7/internal/pkg/model" ) func TestNewParsedPolicy(t *testing.T) { // Run two formatting of the same payload to validate that the sha2 remains the same - payloads := []string{ - testPolicy, - minified, - } - - for _, payload := range payloads { - // Load the model into the policy object - var m model.Policy - if err := json.Unmarshal([]byte(payload), &m); err != nil { - t.Fatal(err) - } - - m.Data = json.RawMessage(testPolicy) - - pp, err := NewParsedPolicy(context.TODO(), nil, m) - if err != nil { - t.Fatal(err) - } - - fields := []string{ - "id", - "revision", - "outputs", - "output_permissions", - "agent", - "inputs", - "fleet", - } - - // Validate the fields; Expect the following top level items - if len(pp.Fields) != len(fields) { - t.Error("Expected N fields") - } - - for _, f := range fields { - if _, ok := pp.Fields[f]; !ok { - t.Errorf("Missing field %s", f) - } - } - - // Now validate output perms hash - if len(pp.Roles) != 1 { - t.Error("Only expected one role") - } - - // Validate that default was found - if pp.Default.Name != "other" { - t.Error("other output should be identified as default") - } - defaultOutput := pp.Outputs[pp.Default.Name] - if defaultOutput.Role == nil { - t.Error("other output role should be identified") - } - - expectedSha2 := "d4d0840fe28ca4900129a749b56cee729562c0a88c935192c659252b5b0d762a" - if defaultOutput.Role.Sha2 != expectedSha2 { - t.Fatalf("Expected sha2: '%s', got '%s'.", expectedSha2, defaultOutput.Role.Sha2) - } + testcases := []struct { + name string + payload string + defaultName string + }{{ + name: "test policy", + payload: testPolicy, + defaultName: "other", + }, { + name: "minified", + payload: minified, + defaultName: "default", + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // Load the model into the policy object + var m model.Policy + var d model.PolicyData + err := json.Unmarshal([]byte(tc.payload), &d) + require.NoError(t, err) + m.Data = &d + + pp, err := NewParsedPolicy(context.TODO(), nil, m) + require.NoError(t, err) + + // Now validate output perms hash + require.Len(t, pp.Roles, 1, "Only expected one role") + + // Validate that default was found + require.Equal(t, tc.defaultName, pp.Default.Name) + defaultOutput := pp.Outputs[pp.Default.Name] + require.NotNil(t, defaultOutput.Role, "output role should be identified") + + expectedSha2 := "d4d0840fe28ca4900129a749b56cee729562c0a88c935192c659252b5b0d762a" + require.Equal(t, expectedSha2, defaultOutput.Role.Sha2) + }) } } func TestNewParsedPolicyNoES(t *testing.T) { // Load the model into the policy object var m model.Policy - if err := json.Unmarshal([]byte(logstashOutputPolicy), &m); err != nil { - t.Fatal(err) - } + var d model.PolicyData + err := json.Unmarshal([]byte(logstashOutputPolicy), &d) + require.NoError(t, err) - m.Data = json.RawMessage(logstashOutputPolicy) + m.Data = &d pp, err := NewParsedPolicy(context.TODO(), nil, m) - if err != nil { - t.Fatal(err) - } - fields := []string{ - "id", - "revision", - "outputs", - "agent", - "inputs", - "fleet", - } - - // Validate the fields; Expect the following top level items - if len(pp.Fields) != len(fields) { - t.Errorf("Expected %d fields, got %d", len(fields), len(pp.Fields)) - } - - for _, f := range fields { - if _, ok := pp.Fields[f]; !ok { - t.Errorf("Missing field %s", f) - } - } + require.NoError(t, err) // Validate that default was found - if pp.Default.Name != "remote_not_es" { - t.Error("other output should be identified as default") - } + require.Equal(t, "remote_not_es", pp.Default.Name) } diff --git a/internal/pkg/policy/policy_output.go b/internal/pkg/policy/policy_output.go index f558ae0a4..b5b8ccbe3 100644 --- a/internal/pkg/policy/policy_output.go +++ b/internal/pkg/policy/policy_output.go @@ -42,7 +42,7 @@ type Output struct { // Prepare prepares the output p to be sent to the elastic-agent // The agent might be mutated for an elasticsearch output -func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap smap.Map) error { +func (p *Output) Prepare(ctx context.Context, zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, outputMap map[string]map[string]interface{}) error { span, ctx := apm.StartSpan(ctx, "prepareOutput", "process") defer span.End() span.Context.SetLabel("output_type", p.Type) @@ -74,13 +74,17 @@ func (p *Output) prepareElasticsearch( zlog zerolog.Logger, bulker bulk.Bulk, agent *model.Agent, - outputMap smap.Map) error { + outputMap map[string]map[string]interface{}) error { // The role is required to do api key management if p.Role == nil { zlog.Error(). Msg("policy does not contain required output permission section") return ErrNoOutputPerms } + if _, ok := outputMap[p.Name]; !ok { + zlog.Error().Err(ErrFailInjectAPIKey).Msg("Unable to find output in map") + return ErrFailInjectAPIKey + } output, foundOutput := agent.Outputs[p.Name] if !foundOutput { @@ -238,10 +242,7 @@ func (p *Output) prepareElasticsearch( // in place to reduce number of agent policy allocation when sending the updated // agent policy to multiple agents. // See: https://github.com/elastic/fleet-server/issues/1301 - if err := setMapObj(outputMap, output.APIKey, p.Name, "api_key"); err != nil { - return err - } - + outputMap[p.Name]["api_key"] = output.APIKey return nil } @@ -410,28 +411,3 @@ func generateOutputAPIKey( apikey.NewMetadata(agentID, outputName, apikey.TypeOutput), ) } - -func setMapObj(obj map[string]interface{}, val interface{}, keys ...string) error { - if len(keys) == 0 { - return fmt.Errorf("no key to be updated: %w", ErrFailInjectAPIKey) - } - - for _, k := range keys[:len(keys)-1] { - v, ok := obj[k] - if !ok { - return fmt.Errorf("no key %q not present on MapObj: %w", - k, ErrFailInjectAPIKey) - } - - obj, ok = v.(map[string]interface{}) - if !ok { - return fmt.Errorf("cannot cast %T to map[string]interface{}: %w", - obj, ErrFailInjectAPIKey) - } - } - - k := keys[len(keys)-1] - obj[k] = val - - return nil -} diff --git a/internal/pkg/policy/policy_output_integration_test.go b/internal/pkg/policy/policy_output_integration_test.go index 8b38ec96d..e1328cc8e 100644 --- a/internal/pkg/policy/policy_output_integration_test.go +++ b/internal/pkg/policy/policy_output_integration_test.go @@ -20,7 +20,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/elastic/fleet-server/v7/internal/pkg/smap" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" ) @@ -148,7 +147,7 @@ func TestPolicyOutputESPrepareRealES(t *testing.T) { Raw: TestPayload, }, } - policyMap := smap.Map{ + policyMap := map[string]map[string]interface{}{ "test output": map[string]interface{}{}, } diff --git a/internal/pkg/policy/policy_output_test.go b/internal/pkg/policy/policy_output_test.go index d8103f4d9..8791bbc37 100644 --- a/internal/pkg/policy/policy_output_test.go +++ b/internal/pkg/policy/policy_output_test.go @@ -16,7 +16,6 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/model" - "github.com/elastic/fleet-server/v7/internal/pkg/smap" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) @@ -35,7 +34,7 @@ func TestPolicyLogstashOutputPrepare(t *testing.T) { }, } - err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, map[string]map[string]interface{}{}) require.Nil(t, err, "expected prepare to pass") bulker.AssertExpectations(t) } @@ -48,7 +47,7 @@ func TestPolicyLogstashOutputPrepareNoRole(t *testing.T) { Role: nil, } - err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, map[string]map[string]interface{}{}) // No permissions are required by logstash currently require.Nil(t, err, "expected prepare to pass") bulker.AssertExpectations(t) @@ -66,7 +65,7 @@ func TestPolicyDefaultLogstashOutputPrepare(t *testing.T) { }, } - err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, map[string]map[string]interface{}{}) require.Nil(t, err, "expected prepare to pass") bulker.AssertExpectations(t) } @@ -83,7 +82,7 @@ func TestPolicyKafkaOutputPrepare(t *testing.T) { }, } - err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, map[string]map[string]interface{}{}) require.Nil(t, err, "expected prepare to pass") bulker.AssertExpectations(t) } @@ -96,7 +95,7 @@ func TestPolicyKafkaOutputPrepareNoRole(t *testing.T) { Role: nil, } - err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, map[string]map[string]interface{}{}) // No permissions are required by kafka currently require.Nil(t, err, "expected prepare to pass") bulker.AssertExpectations(t) @@ -111,7 +110,7 @@ func TestPolicyESOutputPrepareNoRole(t *testing.T) { Role: nil, } - err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, smap.Map{}) + err := po.Prepare(context.Background(), logger, bulker, &model.Agent{}, map[string]map[string]interface{}{}) require.NotNil(t, err, "expected prepare to error") bulker.AssertExpectations(t) } @@ -133,7 +132,7 @@ func TestPolicyOutputESPrepare(t *testing.T) { }, } - policyMap := smap.Map{ + policyMap := map[string]map[string]interface{}{ "test output": map[string]interface{}{}, } @@ -153,7 +152,7 @@ func TestPolicyOutputESPrepare(t *testing.T) { err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + key, ok := policyMap[output.Name]["api_key"].(string) gotOutput := testAgent.Outputs[output.Name] require.True(t, ok, "api key not present on policy map") @@ -204,7 +203,7 @@ func TestPolicyOutputESPrepare(t *testing.T) { }, } - policyMap := smap.Map{ + policyMap := map[string]map[string]interface{}{ "test output": map[string]interface{}{}, } @@ -224,7 +223,7 @@ func TestPolicyOutputESPrepare(t *testing.T) { err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + key, ok := policyMap[output.Name]["api_key"].(string) gotOutput := testAgent.Outputs[output.Name] require.True(t, ok, "unable to case api key") @@ -266,7 +265,7 @@ func TestPolicyOutputESPrepare(t *testing.T) { }, } - policyMap := smap.Map{ + policyMap := map[string]map[string]interface{}{ "test output": map[string]interface{}{}, } @@ -275,7 +274,7 @@ func TestPolicyOutputESPrepare(t *testing.T) { err := output.Prepare(context.Background(), logger, bulker, testAgent, policyMap) require.NoError(t, err, "expected prepare to pass") - key, ok := policyMap.GetMap(output.Name)["api_key"].(string) + key, ok := policyMap[output.Name]["api_key"].(string) gotOutput := testAgent.Outputs[output.Name] require.True(t, ok, "unable to case api key") diff --git a/internal/pkg/policy/secret.go b/internal/pkg/policy/secret.go index a5f15c4e0..d4c499309 100644 --- a/internal/pkg/policy/secret.go +++ b/internal/pkg/policy/secret.go @@ -6,35 +6,25 @@ package policy import ( "context" - "encoding/json" "regexp" "strings" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/model" ) -type SecretReference struct { - ID string `json:"id"` -} - var ( secretRegex = regexp.MustCompile(`\$co\.elastic\.secret{(.*)}`) ) // read secret values that belong to the agent policy's secret references, returns secrets as id:value map -func getSecretValues(ctx context.Context, secretRefsRaw json.RawMessage, bulker bulk.Bulk) (map[string]string, error) { - if secretRefsRaw == nil { +func getSecretValues(ctx context.Context, secretRefs []model.SecretReferencesItems, bulker bulk.Bulk) (map[string]string, error) { + if len(secretRefs) == 0 { return nil, nil } - var secretValues []SecretReference - err := json.Unmarshal([]byte(secretRefsRaw), &secretValues) - if err != nil { - return nil, err - } - - ids := make([]string, 0) - for _, ref := range secretValues { + ids := make([]string, 0, len(secretRefs)) + for _, ref := range secretRefs { ids = append(ids, ref.ID) } @@ -48,33 +38,27 @@ func getSecretValues(ctx context.Context, secretRefsRaw json.RawMessage, bulker // read inputs and secret_references from agent policy // replace values of secret refs in inputs and input streams properties -func getPolicyInputsWithSecrets(ctx context.Context, fields map[string]json.RawMessage, bulker bulk.Bulk) ([]map[string]interface{}, error) { - if fields["inputs"] == nil { +func getPolicyInputsWithSecrets(ctx context.Context, data *model.PolicyData, bulker bulk.Bulk) ([]map[string]interface{}, error) { + if len(data.Inputs) == 0 { return nil, nil } - var inputs []map[string]interface{} - err := json.Unmarshal([]byte(fields["inputs"]), &inputs) - if err != nil { - return nil, err - } - - if fields["secret_references"] == nil { - return inputs, nil + if len(data.SecretReferences) == 0 { + return data.Inputs, nil } - secretValues, err := getSecretValues(ctx, fields["secret_references"], bulker) + secretValues, err := getSecretValues(ctx, data.SecretReferences, bulker) if err != nil { return nil, err } result := make([]map[string]interface{}, 0) - for _, input := range inputs { + for _, input := range data.Inputs { newInput := make(map[string]interface{}) for k, v := range input { // replace secret refs in input stream fields if k == "streams" { - if streams, ok := input[k].([]any); ok { + if streams, ok := v.([]any); ok { newInput[k] = processStreams(streams, secretValues) } // replace secret refs in input fields @@ -89,7 +73,7 @@ func getPolicyInputsWithSecrets(ctx context.Context, fields map[string]json.RawM } result = append(result, newInput) } - delete(fields, "secret_references") + data.SecretReferences = nil return result, nil } diff --git a/internal/pkg/policy/secret_test.go b/internal/pkg/policy/secret_test.go index 78766743a..bd4b58fa0 100644 --- a/internal/pkg/policy/secret_test.go +++ b/internal/pkg/policy/secret_test.go @@ -8,10 +8,11 @@ package policy import ( "context" - "encoding/json" "testing" + "github.com/elastic/fleet-server/v7/internal/pkg/model" ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/stretchr/testify/assert" ) @@ -56,11 +57,10 @@ func TestReplaceSecretRefNotFound(t *testing.T) { } func TestGetSecretValues(t *testing.T) { - secretRefsJSON := []SecretReference{{ID: "ref1"}, {ID: "ref2"}} - secretRefsRaw, _ := json.Marshal(secretRefsJSON) + refs := []model.SecretReferencesItems{{ID: "ref1"}, {ID: "ref2"}} bulker := ftesting.NewMockBulk() - secretRefs, _ := getSecretValues(context.TODO(), secretRefsRaw, bulker) + secretRefs, _ := getSecretValues(context.TODO(), refs, bulker) expectedRefs := map[string]string{ "ref1": "ref1_value", @@ -70,13 +70,12 @@ func TestGetSecretValues(t *testing.T) { } func TestGetPolicyInputsWithSecretsAndStreams(t *testing.T) { - secretRefsJSON := []SecretReference{{ID: "ref1"}, {ID: "ref2"}, {ID: "ref3"}} - secretRefsRaw, _ := json.Marshal(secretRefsJSON) - inputsJSON := []map[string]interface{}{ + refs := []model.SecretReferencesItems{{ID: "ref1"}, {ID: "ref2"}, {ID: "ref3"}} + inputs := []map[string]interface{}{ {"id": "input1", "package_var_secret": "$co.elastic.secret{ref1}", "input_var_secret": "$co.elastic.secret{ref2}"}, - {"id": "input2", "streams": []map[string]interface{}{ - { + {"id": "input2", "streams": []interface{}{ + map[string]interface{}{ "id": "stream1", "package_var_secret": "$co.elastic.secret{ref1}", "input_var_secret": "$co.elastic.secret{ref2}", @@ -84,10 +83,9 @@ func TestGetPolicyInputsWithSecretsAndStreams(t *testing.T) { }, }}, } - inputsRaw, _ := json.Marshal(inputsJSON) - fields := map[string]json.RawMessage{ - "secret_references": secretRefsRaw, - "inputs": inputsRaw, + pData := model.PolicyData{ + SecretReferences: refs, + Inputs: inputs, } bulker := ftesting.NewMockBulk() expectedStream := map[string]interface{}{ @@ -102,26 +100,23 @@ func TestGetPolicyInputsWithSecretsAndStreams(t *testing.T) { {"id": "input2", "streams": []interface{}{expectedStream}}, } - result, _ := getPolicyInputsWithSecrets(context.TODO(), fields, bulker) + result, _ := getPolicyInputsWithSecrets(context.TODO(), &pData, bulker) assert.Equal(t, expectedResult, result) - var refs any - json.Unmarshal(fields["secret_references"], &refs) - assert.Equal(t, nil, refs) + assert.Nil(t, pData.SecretReferences) } func TestGetPolicyInputsNoopWhenNoSecrets(t *testing.T) { - inputsJSON := []map[string]interface{}{ + inputs := []map[string]interface{}{ {"id": "input1"}, - {"id": "input2", "streams": []map[string]interface{}{ - { + {"id": "input2", "streams": []interface{}{ + map[string]interface{}{ "id": "stream1", }, }}, } - inputsRaw, _ := json.Marshal(inputsJSON) - fields := map[string]json.RawMessage{ - "inputs": inputsRaw, + pData := model.PolicyData{ + Inputs: inputs, } bulker := ftesting.NewMockBulk() expectedStream := map[string]interface{}{ @@ -132,7 +127,7 @@ func TestGetPolicyInputsNoopWhenNoSecrets(t *testing.T) { {"id": "input2", "streams": []interface{}{expectedStream}}, } - result, _ := getPolicyInputsWithSecrets(context.TODO(), fields, bulker) + result, _ := getPolicyInputsWithSecrets(context.TODO(), &pData, bulker) assert.Equal(t, expectedResult, result) } diff --git a/internal/pkg/policy/self.go b/internal/pkg/policy/self.go index e44e8c637..8fc9b28e7 100644 --- a/internal/pkg/policy/self.go +++ b/internal/pkg/policy/self.go @@ -6,7 +6,6 @@ package policy import ( "context" - "encoding/json" "errors" "fmt" "sync" @@ -209,12 +208,7 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return client.UnitStateStarting, nil } - var data policyData - err := json.Unmarshal(m.policy.Data, &data) - if err != nil { - return client.UnitStateFailed, err - } - if !data.HasType("fleet-server") { + if !HasFleetServerInput(m.policy.Data.Inputs) { // no fleet-server input m.state = client.UnitStateStarting if m.policyID == "" { @@ -260,17 +254,13 @@ func (m *selfMonitorT) updateState(ctx context.Context) (client.UnitState, error return state, nil } -type policyData struct { - Inputs []policyInput `json:"inputs"` -} - -type policyInput struct { - Type string `json:"type"` -} - -func (d *policyData) HasType(val string) bool { - for _, input := range d.Inputs { - if input.Type == val { +func HasFleetServerInput(inputs []map[string]interface{}) bool { + for _, input := range inputs { + attr, ok := input["type"].(string) + if !ok { + return false + } + if attr == "fleet-server" { return true } } diff --git a/internal/pkg/policy/self_test.go b/internal/pkg/policy/self_test.go index c762a2420..bfd314a79 100644 --- a/internal/pkg/policy/self_test.go +++ b/internal/pkg/policy/self_test.go @@ -80,10 +80,7 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { policyID := uuid.Must(uuid.NewV4()).String() rId := xid.New().String() - policyContents, err := json.Marshal(&policyData{Inputs: []policyInput{}}) - if err != nil { - t.Fatal(err) - } + pData := model.PolicyData{Inputs: []map[string]interface{}{}} policy := model.Policy{ ESDocument: model.ESDocument{ Id: rId, @@ -92,11 +89,11 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { }, PolicyID: policyID, CoordinatorIdx: 1, - Data: policyContents, + Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, } - pData, err := json.Marshal(&policy) + p, err := json.Marshal(&policy) if err != nil { t.Fatal(err) } @@ -104,7 +101,7 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { ID: rId, SeqNo: 1, Version: 1, - Source: pData, + Source: p, }} // should still be set to starting @@ -120,14 +117,11 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { }) rId = xid.New().String() - policyContents, err = json.Marshal(&policyData{Inputs: []policyInput{ + pData = model.PolicyData{Inputs: []map[string]interface{}{ { - Type: "fleet-server", + "type": "fleet-server", }, - }}) - if err != nil { - t.Fatal(err) - } + }} policy = model.Policy{ ESDocument: model.ESDocument{ Id: rId, @@ -136,11 +130,11 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { }, PolicyID: policyID, CoordinatorIdx: 1, - Data: policyContents, + Data: &pData, RevisionIdx: 2, DefaultFleetServer: true, } - pData, err = json.Marshal(&policy) + p, err = json.Marshal(&policy) if err != nil { t.Fatal(err) } @@ -148,7 +142,7 @@ func TestSelfMonitor_DefaultPolicy(t *testing.T) { ID: rId, SeqNo: 2, Version: 1, - Source: pData, + Source: p, }} // should now be set to healthy @@ -236,14 +230,11 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { policyID := uuid.Must(uuid.NewV4()).String() rId := xid.New().String() - policyContents, err := json.Marshal(&policyData{Inputs: []policyInput{ + pData := model.PolicyData{Inputs: []map[string]interface{}{ { - Type: "fleet-server", + "type": "fleet-server", }, - }}) - if err != nil { - t.Fatal(err) - } + }} policy := model.Policy{ ESDocument: model.ESDocument{ Id: rId, @@ -252,7 +243,7 @@ func TestSelfMonitor_DefaultPolicy_Degraded(t *testing.T) { }, PolicyID: policyID, CoordinatorIdx: 1, - Data: policyContents, + Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, } @@ -381,10 +372,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { }, ftesting.RetrySleep(1*time.Second)) rId := xid.New().String() - policyContents, err := json.Marshal(&policyData{Inputs: []policyInput{}}) - if err != nil { - t.Fatal(err) - } + pData := model.PolicyData{Inputs: []map[string]interface{}{}} policy := model.Policy{ ESDocument: model.ESDocument{ Id: rId, @@ -393,11 +381,11 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { }, PolicyID: policyID, CoordinatorIdx: 1, - Data: policyContents, + Data: &pData, RevisionIdx: 2, DefaultFleetServer: true, } - pData, err := json.Marshal(&policy) + p, err := json.Marshal(&policy) if err != nil { t.Fatal(err) } @@ -405,7 +393,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { ID: rId, SeqNo: 1, Version: 1, - Source: pData, + Source: p, }} // should still be set to starting @@ -421,14 +409,11 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { }, ftesting.RetrySleep(1*time.Second)) rId = xid.New().String() - policyContents, err = json.Marshal(&policyData{Inputs: []policyInput{ + pData = model.PolicyData{Inputs: []map[string]interface{}{ { - Type: "fleet-server", + "type": "fleet-server", }, - }}) - if err != nil { - t.Fatal(err) - } + }} policy = model.Policy{ ESDocument: model.ESDocument{ Id: rId, @@ -437,11 +422,11 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { }, PolicyID: policyID, CoordinatorIdx: 1, - Data: policyContents, + Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, } - pData, err = json.Marshal(&policy) + p, err = json.Marshal(&policy) if err != nil { t.Fatal(err) } @@ -449,7 +434,7 @@ func TestSelfMonitor_SpecificPolicy(t *testing.T) { ID: rId, SeqNo: 2, Version: 1, - Source: pData, + Source: p, }} // should now be set to healthy @@ -537,14 +522,11 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { }, ftesting.RetrySleep(1*time.Second)) rId := xid.New().String() - policyContents, err := json.Marshal(&policyData{Inputs: []policyInput{ + pData := model.PolicyData{Inputs: []map[string]interface{}{ { - Type: "fleet-server", + "type": "fleet-server", }, - }}) - if err != nil { - t.Fatal(err) - } + }} policy := model.Policy{ ESDocument: model.ESDocument{ Id: rId, @@ -553,7 +535,7 @@ func TestSelfMonitor_SpecificPolicy_Degraded(t *testing.T) { }, PolicyID: policyID, CoordinatorIdx: 1, - Data: policyContents, + Data: &pData, RevisionIdx: 1, DefaultFleetServer: true, } diff --git a/internal/pkg/server/agent_integration_test.go b/internal/pkg/server/agent_integration_test.go index ea2e71499..29cc0cc4b 100644 --- a/internal/pkg/server/agent_integration_test.go +++ b/internal/pkg/server/agent_integration_test.go @@ -8,6 +8,7 @@ package server import ( "context" + "encoding/json" "errors" "fmt" "net" @@ -43,23 +44,17 @@ var biInfo = build.Info{ Commit: "integration", } -var policyData = []byte(` -{ - "outputs": { +var policyData = model.PolicyData{ + Outputs: map[string]map[string]interface{}{ "default": { - "type": "elasticsearch" - } - }, - "output_permissions": { - "default": {} + "type": "elasticsearch", + }, }, - "inputs": [ - { - "type": "fleet-server" - } - ] + OutputPermissions: json.RawMessage(`{"default": {}}`), + Inputs: []map[string]interface{}{{ + "type": "fleet-server", + }}, } -`) func TestAgent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -74,7 +69,7 @@ func TestAgent(t *testing.T) { PolicyID: policyID, RevisionIdx: 1, DefaultFleetServer: true, - Data: policyData, + Data: &policyData, }) require.NoError(t, err) diff --git a/internal/pkg/server/fleet_integration_test.go b/internal/pkg/server/fleet_integration_test.go index 3fecdb99c..d6b716165 100644 --- a/internal/pkg/server/fleet_integration_test.go +++ b/internal/pkg/server/fleet_integration_test.go @@ -120,7 +120,7 @@ func startTestServer(t *testing.T, ctx context.Context, opts ...Option) (*tserve PolicyID: policyID, RevisionIdx: 1, DefaultFleetServer: true, - Data: policyData, + Data: &policyData, }) if err != nil { return nil, err @@ -300,7 +300,7 @@ func TestServerConfigErrorReload(t *testing.T) { PolicyID: policyID, RevisionIdx: 1, DefaultFleetServer: true, - Data: policyData, + Data: &policyData, }) require.NoError(t, err) diff --git a/internal/pkg/server/fleet_secrets_integration_test.go b/internal/pkg/server/fleet_secrets_integration_test.go index 6f5c9d653..41ad71f9a 100644 --- a/internal/pkg/server/fleet_secrets_integration_test.go +++ b/internal/pkg/server/fleet_secrets_integration_test.go @@ -82,30 +82,27 @@ func createSecret(t *testing.T, ctx context.Context, bulker bulk.Bulk) string { func createAgentPolicyWithSecrets(t *testing.T, ctx context.Context, bulker bulk.Bulk, secretID string, secretRef string) string { policyID := uuid.Must(uuid.NewV4()).String() - var policyDataWSecrets = map[string]interface{}{ - "name": "TestPolicy " + policyID, - "outputs": map[string]interface{}{ - "default": map[string]string{ + var policyData = model.PolicyData{ + Outputs: map[string]map[string]interface{}{ + "default": { "type": "elasticsearch", - }}, - "output_permissions": map[string]interface{}{ - "default": map[string]string{}, + }, }, - "inputs": []map[string]string{{ + OutputPermissions: json.RawMessage(`{"default":{}}`), + Inputs: []map[string]interface{}{{ "type": "fleet-server", "package_var_secret": secretRef, }}, - "secret_references": []map[string]string{{ - "id": secretID, + SecretReferences: []model.SecretReferencesItems{{ + ID: secretID, }}, } - policyDataJSON, _ := json.Marshal(policyDataWSecrets) _, err := dl.CreatePolicy(ctx, bulker, model.Policy{ PolicyID: policyID, RevisionIdx: 1, DefaultFleetServer: true, - Data: policyDataJSON, + Data: &policyData, }) if err != nil { t.Fatal(err) diff --git a/model/openapi.yml b/model/openapi.yml index e5a7a5280..b1e63b4a1 100644 --- a/model/openapi.yml +++ b/model/openapi.yml @@ -490,7 +490,42 @@ components: yaml: "timeout" signed: $ref: "#/components/schemas/actionSignature" - + policyData: + type: object + description: The full policy that an agent should run after combining with local configuration/env vars. + properties: + id: + description: The policy's ID. + type: string + outputs: + description: A map of all outputs that the agent running the policy can use to send data to. + type: object + inputs: + description: A list of all inputs that the agent should run. + type: array + items: + type: object + secret_references: + description: | + A list of secrets that the fleet-server must inject into the policy before sending the policy to the agent. + The secret_references attribute is not communicated to agents. + type: array + items: + type: object + revision: + description: The revision number of the policy. Should match revision_idx. + type: integer + agent: + description: Agent configuration details associated with the policy. May include configuration toggling monitoring, uninstallation protection, etc. + type: object + signed: + $ref: "#/components/schemas/actionSignature" + output_permissions: + description: Elasticsearch permissions that the agent requires in order to run the policy. + type: object + fleet: + description: Agent configuration to describe how to connect to fleet-server. + type: object checkinResponse: type: object required: diff --git a/model/schema.json b/model/schema.json index 0f5616d15..7d8880983 100644 --- a/model/schema.json +++ b/model/schema.json @@ -72,22 +72,10 @@ }, "data": { "description": "The opaque payload.", - "type": "object", "format": "raw" }, "signed": { - "description": "The action signed data and signature.", - "type": "object", - "properties": { - "data": { - "description": "The base64 encoded, UTF-8 JSON serialized action bytes that are signed.", - "type": "string" - }, - "signature": { - "description": "The base64 encoded signature.", - "type": "string" - } - } + "$ref": "#/definitions/signed" } }, "required": [ @@ -95,6 +83,25 @@ ] }, + "signed": { + "description": "The action signed data and signature.", + "type": "object", + "properties": { + "data": { + "description": "The base64 encoded, UTF-8 JSON serialized action bytes that are signed.", + "type": "string" + }, + "signature": { + "description": "The base64 encoded signature.", + "type": "string" + } + }, + "required": [ + "data", + "signature" + ] + }, + "action-result": { "title": "Agent action results", "description": "An Elastic Agent action results", @@ -129,12 +136,10 @@ }, "action_data": { "description": "The opaque payload.", - "type": "object", "format": "raw" }, "action_response": { "description": "The custom action response payload.", - "type": "object", "format": "raw" }, "error": { @@ -143,7 +148,6 @@ }, "data": { "description": "The opaque payload.", - "type": "object", "format": "raw" } }, @@ -215,7 +219,6 @@ }, "body": { "description": "Encoded artifact data", - "type": "object", "format": "raw" }, "package_name": { @@ -333,9 +336,7 @@ "type": "integer" }, "data": { - "description": "The opaque payload.", - "type": "object", - "format": "raw" + "$ref": "#/definitions/policy-data" }, "default_fleet_server": { "description": "True when this policy is the default policy to start Fleet Server", @@ -496,7 +497,6 @@ "agent": { "$ref": "#/definitions/agent-metadata" }, "user_provided_metadata": { "description": "User provided metadata information for the Elastic Agent", - "type": "object", "format": "raw" }, "tags": { @@ -508,7 +508,6 @@ }, "local_metadata": { "description": "Local metadata information for the Elastic Agent", - "type": "object", "format": "raw" }, "policy_id": { @@ -552,7 +551,6 @@ }, "components": { "description": "Elastic Agent components detailed status information", - "type": "object", "format": "raw" }, "default_api_key_id": { @@ -593,7 +591,6 @@ }, "upgrade_details": { "description": "Additional upgrade status details.", - "type": "object", "format": "raw" } }, @@ -647,99 +644,171 @@ "api_key_id", "api_key" ] - } - }, - "checkin": { - "title": "Checkin", - "description": "An Elastic Agent checkin to Fleet", - "type": "object", - "properties": { - "_id": { - "description": "The unique identifier for the Elastic Agent checkin", - "type": "string", - "format": "uuid" - }, - "@timestamp": { - "description": "Date/time the checkin was created", - "type": "string", - "format": "date-time" - }, - "agent": { "$ref": "#/definitions/agent-metadata" }, - "host": { "$ref": "#/definitions/host-metadata" }, - "server": { "$ref": "#/definitions/server-metadata" }, - "status": { - "description": "The current overall status of the Elastic Agent", - "type": "string", - "enum": ["enrolling", "healthy", "error", "degraded", "offline", "unenrolling", "upgrading"] - }, - "message": { - "description": "The current overall status message of the Elastic Agent", - "type": "string" + }, + + "checkin": { + "title": "Checkin", + "description": "An Elastic Agent checkin to Fleet", + "type": "object", + "properties": { + "_id": { + "description": "The unique identifier for the Elastic Agent checkin", + "type": "string", + "format": "uuid" + }, + "@timestamp": { + "description": "Date/time the checkin was created", + "type": "string", + "format": "date-time" + }, + "agent": { "$ref": "#/definitions/agent-metadata" }, + "host": { "$ref": "#/definitions/host-metadata" }, + "server": { "$ref": "#/definitions/server-metadata" }, + "status": { + "description": "The current overall status of the Elastic Agent", + "type": "string", + "enum": ["enrolling", "healthy", "error", "degraded", "offline", "unenrolling", "upgrading"] + }, + "message": { + "description": "The current overall status message of the Elastic Agent", + "type": "string" + }, + "policy": { + "title": "Checkin Policy", + "description": "The current status of the applied policy", + "type": "object", + "properties": { + "id": { + "description": "The ID for the policy", + "type": "string", + "format": "uuid" + }, + "revision": { + "description": "The revision of the policy", + "type": "integer" + }, + "inputs": { + "title": "Checkin Policy Input", + "description": "The current input status per policy", + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "description": "The ID for the input", + "type": "string", + "format": "uuid" + }, + "template_id": { + "description": "The template ID for the input", + "type": "string", + "format": "uuid" + }, + "status": { + "description": "The current status of the input", + "type": "string", + "enum": ["healthy", "error", "degraded"] + }, + "message": { + "description": "The current status message of the intput", + "type": "string" + } + }, + "required": [ + "id", + "template_id", + "status", + "message" + ] + } + } + }, + "required": [ + "id", + "revision", + "inputs" + ] + } }, - "policy": { - "title": "Checkin Policy", - "description": "The current status of the applied policy", + "required": [ + "_id", + "agent", + "status", + "message", + "enrolled_at", + "updated_at" + ] + }, + + "policy-data": { + "title": "Policy Data", + "description": "The policy data that an agent needs to run", "type": "object", "properties": { - "id": { - "description": "The ID for the policy", - "type": "string", - "format": "uuid" - }, - "revision": { - "description": "The revision of the policy", - "type": "integer" - }, - "inputs": { - "title": "Checkin Policy Input", - "description": "The current input status per policy", - "type": "array", - "items": { - "type": "object", - "properties": { - "id": { - "description": "The ID for the input", - "type": "string", - "format": "uuid" - }, - "template_id": { - "description": "The template ID for the input", - "type": "string", - "format": "uuid" - }, - "status": { - "description": "The current status of the input", - "type": "string", - "enum": ["healthy", "error", "degraded"] - }, - "message": { - "description": "The current status message of the intput", - "type": "string" + "id": { + "description": "The policy's ID", + "type": "string" + }, + "outputs": { + "description": "A map of all outputs that the agent running the policy can use to send data to.", + "type": "object", + "additionalProperties": { + "type": "object", + "additionalProperties": { + "$comment": "results in map[string]map[string]interface{}. We use a dynamic output objects because it would be a larger task to define the properties we need to support for each output type, this may be done as future work" + } } - }, - "required": [ - "id", - "template_id", - "status", - "message" - ] + }, + "inputs": { + "description": "A list of all inputs the agent should run", + "type": "array", + "items": { + "type": "object", + "additionalProperties":{ + "$comment": "embedded additionalProperties type results in `outputs map[string]interface{}`" + } + } + }, + "secret_references": { + "description": "A list of all secrets fleet-server needs to inject into the policy before passing it to the agent. This attribute is removed when policy data is send to an agent.", + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "type": "string" + } + }, + "required": [ + "id" + ] + } + }, + "revision": { + "description": "The policy revision number. Should match revision_idx", + "type": "integer" + }, + "agent": { + "description": "The policy's agent configuration details", + "format": "raw" + }, + "signed": { + "$ref": "#/definitions/signed" + }, + "output_permissions": { + "description": "The Elasticsearch permissions needed to run the policy", + "format": "raw" + }, + "fleet": { + "description": "The policy's fleet configuration details", + "format": "raw" } - } }, "required": [ - "id", - "revision", - "inputs" + "id", + "revision", + "outputs" ] - } - }, - "required": [ - "_id", - "agent", - "status", - "message", - "enrolled_at", - "updated_at" - ] + } } }