Skip to content

Commit

Permalink
Return connection settings and remote config on registration
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Sep 24, 2024
1 parent e5c4f8c commit 2cc57c4
Showing 1 changed file with 100 additions and 36 deletions.
136 changes: 100 additions & 36 deletions internal/pkg/opamp/opamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package opamp

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -149,42 +150,11 @@ func (o *opamp) process(ctx context.Context, agent *model.Agent, message *protob
var remoteConfig *protobufs.AgentRemoteConfig
select {
case pp := <-sub.Output():
zerolog.Ctx(ctx).Debug().Msg("Found policy update.")
if len(pp.Policy.Data.Outputs) == 0 {
return nil, fmt.Errorf("no outputs defined in policy")
}
data := model.ClonePolicyData(pp.Policy.Data)
for name, output := range data.Outputs {
err := policy.ProcessOutputSecret(ctx, output, o.bulk)
if err != nil {
return nil, fmt.Errorf("failed to process output secrets %q: %w", name, err)
}
}
for _, output := range pp.Outputs {
err := output.Prepare(ctx, *zerolog.Ctx(ctx), o.bulk, agent, data.Outputs)
if err != nil {
return nil, fmt.Errorf("failed to pepare output %q: %w", output.Name, err)
}
}
data.Inputs = pp.Inputs

// FIXME: We should be sure to handle outputs seperately here
remoteConfig, _, err = o.preparePolicy(ctx, agent, pp)
// FIXME: We should be sure to handle outputs seperately here using the returned data (2nd arg)

Check failure on line 154 in internal/pkg/opamp/opamp.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`seperately` is a misspelling of `separately` (misspell)
// At a minimum we need to set ConnectionSettingsOffers.opamp

body, err := json.Marshal(data)
if err != nil {
return nil, fmt.Errorf("unable to marshal policy: %w", err)
}
remoteConfig = &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": &protobufs.AgentConfigFile{
Body: body,
ContentType: "application/json",
},
},
},
ConfigHash: []byte{}, // TODO
return nil, fmt.Errorf("unable to prepare remote config: %w", err)
}
default:
zerolog.Ctx(ctx).Debug().Msg("No policy update.")
Expand Down Expand Up @@ -256,14 +226,108 @@ func (o *opamp) register(ctx context.Context, policyID string, namespaces []stri
// TODO: Set agent to inactive if error is returned below
o.cache.SetAPIKey(*key, true)

resp, err := o.process(ctx, &agent, message)
sub, err := o.pm.Subscribe(uid.String(), policyID, 0) // subscription should get the latest policy
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create policy subscription when registering agent: %w", err)
}
defer func() {
err := o.pm.Unsubscribe(sub)
if err != nil {
zerolog.Ctx(ctx).Error().Err(err).Msg("Unable to unsubscribe from policy.")
}
}()

var remoteConfig *protobufs.AgentRemoteConfig
var data *model.PolicyData
select {
case pp := <-sub.Output():
remoteConfig, data, err = o.preparePolicy(ctx, &agent, pp)
if err != nil {
return nil, fmt.Errorf("unable to prepare remote config: %w", err)
}
case <-time.After(time.Second * 5): // TODO make configurable
return nil, fmt.Errorf("unable to retrieve policy within timeout")
}
// handle connection settings here
// TODO the non-opamp settings
hash := sha256.New()
hash.Write([]byte(key.ID))
hash.Write(data.Fleet)

fleet := struct {
Hosts []string `json:"hosts"`
}{}
if err := json.Unmarshal(data.Fleet, &fleet); err != nil {
return nil, fmt.Errorf("unable to unmarshal fleet hosts: %w", err)
}
if len(fleet.Hosts) == 0 {
return nil, fmt.Errorf("no fleet hosts found")
}

resp := &protobufs.ServerToAgent{
InstanceUid: message.InstanceUid,
RemoteConfig: remoteConfig,
Capabilities: serverCapabilities,
ConnectionSettings: &protobufs.ConnectionSettingsOffers{
Hash: hash.Sum(nil),
Opamp: &protobufs.OpAMPConnectionSettings{
DestinationEndpoint: fleet.Hosts[0],
Headers: &protobufs.Headers{
Headers: []*protobufs.Header{
&protobufs.Header{
Key: "Authorization",
Value: "ApiKey " + key.Token(),
},
},
},
},
},
}
if replaceID {
resp.AgentIdentification = &protobufs.AgentIdentification{
NewInstanceUid: uid.Bytes(),
}
}

return resp, nil
}

func (o *opamp) preparePolicy(ctx context.Context, agent *model.Agent, pp *policy.ParsedPolicy) (*protobufs.AgentRemoteConfig, *model.PolicyData, error) {
zerolog.Ctx(ctx).Debug().Msg("Found policy update.")
if len(pp.Policy.Data.Outputs) == 0 {
return nil, nil, fmt.Errorf("no outputs defined in policy")
}
data := model.ClonePolicyData(pp.Policy.Data)
for name, output := range data.Outputs {
err := policy.ProcessOutputSecret(ctx, output, o.bulk)
if err != nil {
return nil, nil, fmt.Errorf("failed to process output secrets %q: %w", name, err)
}
}
for _, output := range pp.Outputs {
err := output.Prepare(ctx, *zerolog.Ctx(ctx), o.bulk, agent, data.Outputs)
if err != nil {
return nil, nil, fmt.Errorf("failed to pepare output %q: %w", output.Name, err)
}
}
data.Inputs = pp.Inputs

body, err := json.Marshal(data)
if err != nil {
return nil, nil, fmt.Errorf("unable to marshal policy: %w", err)
}
hash := sha256.New()
hash.Write(body)
remoteConfig := &protobufs.AgentRemoteConfig{
Config: &protobufs.AgentConfigMap{
ConfigMap: map[string]*protobufs.AgentConfigFile{
"": &protobufs.AgentConfigFile{
Body: body,
ContentType: "application/json",
},
},
},
ConfigHash: hash.Sum(nil),
}
return remoteConfig, data, nil
}

0 comments on commit 2cc57c4

Please sign in to comment.