diff --git a/go.mod b/go.mod index 67833582a..b4ceb6b2e 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,8 @@ require ( github.com/mailru/easyjson v0.7.7 github.com/miolini/datacounter v1.0.3 github.com/oapi-codegen/runtime v1.1.1 + github.com/oklog/ulid/v2 v2.1.0 + github.com/open-telemetry/opamp-go v0.15.0 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/prometheus/client_golang v1.19.0 github.com/rs/xid v1.5.0 @@ -38,7 +40,7 @@ require ( golang.org/x/sync v0.8.0 golang.org/x/time v0.5.0 google.golang.org/grpc v1.63.2 - google.golang.org/protobuf v1.33.0 + google.golang.org/protobuf v1.34.1 gopkg.in/yaml.v3 v3.0.1 ) @@ -64,6 +66,7 @@ require ( github.com/golang/glog v1.2.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/gorilla/websocket v1.5.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect diff --git a/go.sum b/go.sum index aeff5e921..1122b07ef 100644 --- a/go.sum +++ b/go.sum @@ -90,6 +90,8 @@ github.com/google/pprof v0.0.0-20230426061923-93006964c1fc h1:AGDHt781oIcL4EFk7c github.com/google/pprof v0.0.0-20230426061923-93006964c1fc/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= @@ -125,12 +127,17 @@ github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6U github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro= github.com/oapi-codegen/runtime v1.1.1/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= +github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= +github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= +github.com/open-telemetry/opamp-go v0.15.0 h1:X2TWhEsGQ8GP7Uos3Ic9v/1aFUqoECZXKS7xAF5HqsA= +github.com/open-telemetry/opamp-go v0.15.0/go.mod h1:QyPeN56JXlcZt5yG5RMdZ50Ju+zMFs1Ihy/hwHyF8Oo= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= +github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -287,8 +294,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= diff --git a/internal/pkg/api/router.go b/internal/pkg/api/router.go index 5b81c6576..9fbb0a71e 100644 --- a/internal/pkg/api/router.go +++ b/internal/pkg/api/router.go @@ -9,24 +9,34 @@ import ( "regexp" "strings" - "github.com/elastic/fleet-server/v7/internal/pkg/config" - "github.com/elastic/fleet-server/v7/internal/pkg/limit" - "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/rs/zerolog" "go.elastic.co/apm/module/apmchiv5/v2" "go.elastic.co/apm/v2" + + "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/limit" + "github.com/elastic/fleet-server/v7/internal/pkg/logger" + "github.com/elastic/fleet-server/v7/internal/pkg/opamp" + + opampserver "github.com/open-telemetry/opamp-go/server" ) -func newRouter(cfg *config.ServerLimits, si ServerInterface, tracer *apm.Tracer) http.Handler { +func newRouter(cfg *config.ServerLimits, si ServerInterface, tracer *apm.Tracer, handlerFn opampserver.HTTPHandlerFunc) http.Handler { r := chi.NewRouter() if tracer != nil { r.Use(apmchiv5.Middleware(apmchiv5.WithTracer(tracer))) } + r.Use(logger.Middleware) // Attach middlewares to router directly so the occur before any request parsing/validation r.Use(middleware.Recoverer) r.Use(Limiter(cfg).middleware) + r.HandleFunc(opamp.DefaultPath, http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + handlerFn(w, r) + }, + )) return HandlerWithOptions(si, ChiServerOptions{ BaseRouter: r, ErrorHandlerFunc: ErrorResp, @@ -79,6 +89,9 @@ func pathToOperation(path string) string { if path == "/api/status" { return "status" } + if path == opamp.DefaultPath { + return "opamp" + } if path == "/api/fleet/uploads" { return "uploadBegin" } diff --git a/internal/pkg/api/server.go b/internal/pkg/api/server.go index 8c65ea70e..be1f9b3d6 100644 --- a/internal/pkg/api/server.go +++ b/internal/pkg/api/server.go @@ -13,29 +13,37 @@ import ( "net" "net/http" + "go.elastic.co/apm/v2" + "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/fleet-server/v7/internal/pkg/build" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/cache" "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/limit" "github.com/elastic/fleet-server/v7/internal/pkg/logger" + "github.com/elastic/fleet-server/v7/internal/pkg/opamp" "github.com/elastic/fleet-server/v7/internal/pkg/policy" - "go.elastic.co/apm/v2" + "github.com/open-telemetry/opamp-go/protobufs" + opampserver "github.com/open-telemetry/opamp-go/server" + "github.com/open-telemetry/opamp-go/server/types" "github.com/rs/zerolog" ) type server struct { - cfg *config.Server - addr string - handler http.Handler + cfg *config.Server + addr string + handler http.Handler + contextWithConn opampserver.ConnContext } // NewServer creates a new HTTP api for the passed addr. // // The server has a listener specific conn limit and endpoint specific rate-limits. // The underlying API structs (such as *CheckinT) may be shared between servers. -func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, sm policy.SelfMonitor, bi build.Info, ut *UploadT, ft *FileDeliveryT, pt *PGPRetrieverT, audit *AuditT, bulker bulk.Bulk, tracer *apm.Tracer) *server { +func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, sm policy.SelfMonitor, bi build.Info, ut *UploadT, ft *FileDeliveryT, pt *PGPRetrieverT, audit *AuditT, bulker bulk.Bulk, cache cache.Cache, pm policy.Monitor, tracer *apm.Tracer) *server { // this is messy, we have an open issue to refactor a := &apiServer{ ct: ct, et: et, @@ -50,10 +58,87 @@ func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at audit: audit, bulker: bulker, } + + ompampServer := opampserver.New(nil) + op := opamp.NewHandler(bulker, cache, pm) + handlerFn, contextWithConn, _ := ompampServer.Attach(opampserver.Settings{ + Callbacks: opampserver.CallbacksStruct{ + OnConnectingFunc: func(request *http.Request) types.ConnectionResponse { + opts := make([]opamp.Option, 0) + agent, err := authAgent(request, nil, bulker, cache) + if errors.Is(err, ErrAgentNotFound) { // No agent associated, get the enrollment token's associated policyID for a register on first use flow + enrollKey, err := authAPIKey(request, bulker, cache) + if err != nil { + zerolog.Ctx(request.Context()).Warn().Err(err).Msg("Opamp registration api key auth failed.") + return types.ConnectionResponse{ + Accept: false, + HTTPStatusCode: http.StatusUnauthorized, + } + } + // TODO handle static enrollment tokens + key, ok := cache.GetEnrollmentAPIKey(enrollKey.ID) + if !ok { + rec, err := dl.FindEnrollmentAPIKey(request.Context(), bulker, dl.QueryEnrollmentAPIKeyByID, dl.FieldAPIKeyID, enrollKey.ID) + if err != nil { + return types.ConnectionResponse{ + Accept: false, + HTTPStatusCode: http.StatusInternalServerError, + } + } + if !rec.Active { + return types.ConnectionResponse{ + Accept: false, + HTTPStatusCode: http.StatusUnauthorized, + } + } + cache.SetEnrollmentAPIKey(enrollKey.ID, rec, int64(len(rec.APIKey))) + key = rec + } + opts = append(opts, opamp.WithPolicyID(key.PolicyID), opamp.WithNamespaces(key.Namespaces)) + } else if err != nil { + zerolog.Ctx(request.Context()).Warn().Err(err).Msg("Opamp request api key auth failed.") + return types.ConnectionResponse{ + Accept: false, + HTTPStatusCode: http.StatusUnauthorized, + } + } else { + opts = append(opts, opamp.WithAgent(agent)) + } + return types.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: opampserver.ConnectionCallbacksStruct{ + OnConnectedFunc: func(ctx context.Context, _ types.Connection) { + zerolog.Ctx(ctx).Info().Msg("Opamp connection started.") + }, + OnMessageFunc: func(ctx context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + zerolog.Ctx(ctx).Info().Msg("Opamp message received.") + response, err := op.Process(ctx, message, opts...) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("Error processing opamp request.") + return &protobufs.ServerToAgent{ + InstanceUid: message.InstanceUid, + ErrorResponse: &protobufs.ServerErrorResponse{ + ErrorMessage: err.Error(), + }, + } + } + return response + }, + OnConnectionCloseFunc: func(_ types.Connection) { + zerolog.Ctx(request.Context()).Info().Msg("Opamp connection ended.") + }, + }, + } + + }, + }, + }) + return &server{ - addr: addr, - cfg: cfg, - handler: newRouter(&cfg.Limits, a, tracer), + addr: addr, + cfg: cfg, + handler: newRouter(&cfg.Limits, a, tracer, handlerFn), + contextWithConn: contextWithConn, } } @@ -75,6 +160,7 @@ func (s *server) Run(ctx context.Context) error { BaseContext: func(net.Listener) context.Context { return ctx }, ErrorLog: errLogger(ctx), ConnState: diagConn, + ConnContext: s.contextWithConn, } var listenCfg net.ListenConfig diff --git a/internal/pkg/opamp/opamp.go b/internal/pkg/opamp/opamp.go new file mode 100644 index 000000000..b5f4301ee --- /dev/null +++ b/internal/pkg/opamp/opamp.go @@ -0,0 +1,420 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// opamp provides a poc of fleet-server serving the opamp spec +// It can serve new policies + +package opamp + +import ( + "context" + "crypto/sha256" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/oklog/ulid/v2" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/rs/zerolog" + + "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" + "github.com/elastic/fleet-server/v7/internal/pkg/cache" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + "github.com/elastic/fleet-server/v7/internal/pkg/policy" + "github.com/elastic/fleet-server/v7/internal/pkg/sqn" +) + +const DefaultPath = "/v1/opamp" + +var healthToStatus = map[bool]string{ + true: "healthy", + false: "unhealthy", +} + +const serverCapabilities uint64 = 0x00000001 | 0x00000002 | 0x00000004 | 0x00000020 // status, offers remote config, accepts effective config, offers connection settings + +const kFleetAccessRolesJSON = ` +{ + "fleet-apikey-access": { + "cluster": [], + "applications": [{ + "application": "fleet", + "privileges": ["no-privileges"], + "resources": ["*"] + }] + } +} +` + +type opamp struct { + bulk bulk.Bulk + cache cache.Cache + pm policy.Monitor +} + +func NewHandler(bulk bulk.Bulk, cache cache.Cache, pm policy.Monitor) *opamp { + return &opamp{ + bulk: bulk, + cache: cache, + pm: pm, + } +} + +type processArgs struct { + agent *model.Agent + policyID string + namespaces []string +} + +type Option func(*processArgs) + +func WithAgent(agent *model.Agent) Option { + return func(p *processArgs) { + p.agent = agent + } +} + +func WithPolicyID(id string) Option { + return func(p *processArgs) { + p.policyID = id + } +} + +func WithNamespaces(namespaces []string) Option { + return func(p *processArgs) { + p.namespaces = namespaces + } +} + +// Process handles AgentToServer messages +func (o *opamp) Process(ctx context.Context, message *protobufs.AgentToServer, opts ...Option) (*protobufs.ServerToAgent, error) { + if message.GetCapabilities()&0x00000001 == 0 { // ReportsStatus must be set on all agents + return nil, fmt.Errorf("capaability: ReportsStatus is unset") + } + args := &processArgs{} + for _, opt := range opts { + opt(args) + } + + if args.agent == nil && args.policyID != "" { + return o.register(ctx, message, args.policyID, args.namespaces) + } + if args.agent.Id != string(message.InstanceUid) { + return nil, fmt.Errorf("API key's associated agent does not match InstanceUid") + } + return o.process(ctx, message, args.agent) +} + +// process is a func that is similar to the api checkin path +// it will update health status (but not metadata yet) and dispatch new config +// configs are dispatched if the sent config has a lower revision number than the current policy, or if no config is sent if the agent doc has a lower revision number. +func (o *opamp) process(ctx context.Context, message *protobufs.AgentToServer, agent *model.Agent) (*protobufs.ServerToAgent, error) { + if agent == nil { + return nil, fmt.Errorf("no agent record found") + } + ts := time.Now().UTC() + tsStr := ts.Format(time.RFC3339) + + // update the agent description if health status has changed. otherwise just a minimal update + updateAgent := false + if health := message.GetHealth(); health != nil && agent.LastCheckinStatus != healthToStatus[health.Healthy] { + updateAgent = true + } + update := bulk.UpdateFields{ + dl.FieldLastCheckin: tsStr, + dl.FieldUpdatedAt: tsStr, + } + if updateAgent { + update[dl.FieldLastCheckinStatus] = healthToStatus[message.GetHealth().Healthy] + update[dl.FieldLastCheckinMessage] = message.GetHealth().Status + update[dl.FieldComponents] = toComponentList(message.GetHealth().GetComponentHealthMap()) + } + updateBody, err := update.Marshal() + if err != nil { + return nil, fmt.Errorf("failed to marshal agent update: %w", err) + } + if err := o.bulk.Update(ctx, dl.FleetAgents, agent.Id, updateBody); err != nil { + return nil, fmt.Errorf("failed to update agent doc: %w", err) + } + + rev := agent.PolicyRevisionIdx + // use revisionIDx from agent's config if it's sent + var cfg *protobufs.AgentConfigFile + ecfg := message.GetEffectiveConfig() + if ecfg != nil { + if cm := ecfg.GetConfigMap(); cm != nil { + if cfile, ok := cm.GetConfigMap()[""]; ok { + cfg = cfile + } + } + } + if len(cfg.Body) > 0 { + switch cfg.ContentType { + case "application/json": + var policy model.Policy + if err := json.Unmarshal(cfg.Body, &policy); err != nil { + return nil, fmt.Errorf("unmarshal effective policy failed: %w", err) + } + rev = policy.RevisionIdx + default: + zerolog.Ctx(ctx).Warn().Str("Content-Type", cfg.ContentType).Msg("Unknown content type.") + } + } + + sub, err := o.pm.Subscribe(agent.Id, agent.PolicyID, rev) + if err != nil { + return nil, fmt.Errorf("unable to get policy subscription for agent: %w", err) + } + defer func() { + err := o.pm.Unsubscribe(sub) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("Unable to unsubscribe from policy.") + } + }() + var remoteConfig *protobufs.AgentRemoteConfig + select { + case pp := <-sub.Output(): + remoteConfig, _, err = o.preparePolicy(ctx, agent, pp) + // FIXME: We should be sure to handle outputs separately here using the returned data (2nd arg) + // At a minimum we need to set ConnectionSettingsOffers.opamp + if err != nil { + return nil, fmt.Errorf("unable to prepare remote config: %w", err) + } + default: + zerolog.Ctx(ctx).Debug().Msg("No policy update.") + } + + return &protobufs.ServerToAgent{ + InstanceUid: message.InstanceUid, + RemoteConfig: remoteConfig, + Capabilities: serverCapabilities, + }, nil +} + +func (o *opamp) register(ctx context.Context, message *protobufs.AgentToServer, policyID string, namespaces []string) (*protobufs.ServerToAgent, error) { + if message.GetCapabilities()&0x00000100 == 0 { + return nil, fmt.Errorf("capability: AcceptsOpAMPConnectionSettings is unset") + } + // NOTE: message.ConnectionSettingsRequest.Opamp is used for a CSR flow, we don't support this workflow at the moment + replaceID := message.GetFlags()&uint64(protobufs.AgentToServerFlags_AgentToServerFlags_RequestInstanceUid) != 0 + uid := ulid.ULID(message.InstanceUid) + agent, err := dl.FindAgent(ctx, o.bulk, dl.QueryAgentByID, dl.FieldID, uid.String()) + if err == nil { + // ID collides with an existing agent that has not checked in + if agent.Id != "" && agent.LastCheckin == "" { + if err := invalidateAPIKey(ctx, o.bulk, agent.AccessAPIKeyID); err != nil { + return nil, fmt.Errorf("agent id collision, unable to invalidate previous API key: %w", err) + } + if err := o.bulk.Delete(ctx, dl.FleetAgents, agent.Id); err != nil { + return nil, fmt.Errorf("agent id collision, unable to delete previous agent doc: %w", err) + } + } else { + zerolog.Ctx(ctx).Debug().Msg("Agent registration has detected uid collision") + uid = ulid.Make() // TODO replace this with a better call? + replaceID = true + } + } else if errors.Is(err, dl.ErrNotFound) { + zerolog.Ctx(ctx).Trace().Msg("Agent registration no uid collision") + } else { + return nil, fmt.Errorf("unable to check for uid collision on registration") + } + + key, err := o.bulk.APIKeyCreate(ctx, uid.String(), "", []byte(kFleetAccessRolesJSON), apikey.NewMetadata(uid.String(), "", apikey.TypeAccess)) + if err != nil { + return nil, fmt.Errorf("registration failed to make ApiKey: %w", err) + } + // TODO need a way to split agent description into local metadata, tags, and version info + var localMeta json.RawMessage + if ad := message.GetAgentDescription(); ad != nil { + localMeta, err = json.Marshal(ad) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("Unable to marshal agent description") + } + } + // TODO Invalidate key if func returns error after this + agent = model.Agent{ + Active: true, + PolicyID: policyID, + Namespaces: namespaces, + Type: "opamp", // regular agents use PERMANENT, might be nice to distinguish + EnrolledAt: time.Now().UTC().Format(time.RFC3339), + LocalMetadata: localMeta, + AccessAPIKeyID: key.ID, + ActionSeqNo: []int64{sqn.UndefinedSeqNo}, + Agent: &model.AgentMetadata{ + ID: uid.String(), + // TODO version + }, + // TODO tags + // TODO handle enrolmentId + } + body, err := json.Marshal(agent) + if err != nil { + return nil, fmt.Errorf("unable to marshal agent doc: %w", err) + } + if _, err := o.bulk.Create(ctx, dl.FleetAgents, uid.String(), body, bulk.WithRefresh()); err != nil { + return nil, fmt.Errorf("unable to index agent doc: %w", err) + } + // TODO: Set agent to inactive if error is returned below + o.cache.SetAPIKey(*key, true) + + sub, err := o.pm.Subscribe(uid.String(), policyID, 0) // subscription should get the latest policy + if err != nil { + return nil, fmt.Errorf("failed to create policy subscription when registering agent: %w", err) + } + defer func() { + err := o.pm.Unsubscribe(sub) + if err != nil { + zerolog.Ctx(ctx).Error().Err(err).Msg("Unable to unsubscribe from policy.") + } + }() + + var remoteConfig *protobufs.AgentRemoteConfig + var data *model.PolicyData + select { + case pp := <-sub.Output(): + remoteConfig, data, err = o.preparePolicy(ctx, &agent, pp) + if err != nil { + return nil, fmt.Errorf("unable to prepare remote config: %w", err) + } + case <-time.After(time.Second * 5): // TODO make configurable + return nil, fmt.Errorf("unable to retrieve policy within timeout") + } + // handle connection settings here + // TODO the non-opamp settings + hash := sha256.New() + hash.Write([]byte(key.ID)) + hash.Write(data.Fleet) + + fleet := struct { + Hosts []string `json:"hosts"` + }{} + if err := json.Unmarshal(data.Fleet, &fleet); err != nil { + return nil, fmt.Errorf("unable to unmarshal fleet hosts: %w", err) + } + if len(fleet.Hosts) == 0 { + return nil, fmt.Errorf("no fleet hosts found") + } + + resp := &protobufs.ServerToAgent{ + InstanceUid: message.InstanceUid, + RemoteConfig: remoteConfig, + Capabilities: serverCapabilities, + ConnectionSettings: &protobufs.ConnectionSettingsOffers{ + Hash: hash.Sum(nil), + Opamp: &protobufs.OpAMPConnectionSettings{ + DestinationEndpoint: fleet.Hosts[0] + DefaultPath, + Headers: &protobufs.Headers{ + Headers: []*protobufs.Header{ + &protobufs.Header{ + Key: "Authorization", + Value: "ApiKey " + key.Token(), + }, + }, + }, + }, + }, + } + if replaceID { + resp.AgentIdentification = &protobufs.AgentIdentification{ + NewInstanceUid: uid.Bytes(), + } + } + + return resp, nil +} + +func (o *opamp) preparePolicy(ctx context.Context, agent *model.Agent, pp *policy.ParsedPolicy) (*protobufs.AgentRemoteConfig, *model.PolicyData, error) { + zerolog.Ctx(ctx).Debug().Msg("Found policy update.") + if len(pp.Policy.Data.Outputs) == 0 { + return nil, nil, fmt.Errorf("no outputs defined in policy") + } + data := model.ClonePolicyData(pp.Policy.Data) + for name, output := range data.Outputs { + err := policy.ProcessOutputSecret(ctx, output, o.bulk) + if err != nil { + return nil, nil, fmt.Errorf("failed to process output secrets %q: %w", name, err) + } + } + for _, output := range pp.Outputs { + err := output.Prepare(ctx, *zerolog.Ctx(ctx), o.bulk, agent, data.Outputs) + if err != nil { + return nil, nil, fmt.Errorf("failed to pepare output %q: %w", output.Name, err) + } + } + data.Inputs = pp.Inputs + + body, err := json.Marshal(data) + if err != nil { + return nil, nil, fmt.Errorf("unable to marshal policy: %w", err) + } + hash := sha256.New() + hash.Write(body) + remoteConfig := &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": &protobufs.AgentConfigFile{ + Body: body, + ContentType: "application/json", + }, + }, + }, + ConfigHash: hash.Sum(nil), + } + return remoteConfig, data, nil +} + +// toComponentsList will transform opamp components health to fleet's componets list +// it will only go one level down in the opamp map +func toComponentList(comps map[string]*protobufs.ComponentHealth) []model.ComponentsItems { + if len(comps) == 0 { + return nil + } + arr := make([]model.ComponentsItems, len(comps)) + for k, v := range comps { + status := healthToStatus[v.Healthy] + units := make([]model.UnitsItems, len(v.ComponentHealthMap)) + for uk, uv := range v.ComponentHealthMap { + uStatus := healthToStatus[uv.Healthy] + units = append(units, model.UnitsItems{ + ID: uk, + Message: uv.Status, + Status: uStatus, + }) + } + arr = append(arr, model.ComponentsItems{ + ID: k, + Message: v.Status, + Status: status, + Units: units, + }) + } + return arr +} + +func invalidateAPIKey(ctx context.Context, bulker bulk.Bulk, id string) error { + timer := time.NewTimer(time.Minute) + defer timer.Stop() +LOOP: + for { + _, err := bulker.APIKeyRead(ctx, id, true) + switch { + case err == nil: + break LOOP + case !errors.Is(err, apikey.ErrAPIKeyNotFound): + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return fmt.Errorf("apikey index failed to refresh") + case <-time.After(time.Second): + } + } + return bulker.APIKeyInvalidate(ctx, id) +} diff --git a/internal/pkg/server/fleet.go b/internal/pkg/server/fleet.go index 677c8027a..56e5cdd18 100644 --- a/internal/pkg/server/fleet.go +++ b/internal/pkg/server/fleet.go @@ -14,10 +14,11 @@ import ( "sync" "time" - "github.com/elastic/elastic-agent-client/v7/pkg/client" "go.elastic.co/apm/v2" apmtransport "go.elastic.co/apm/v2/transport" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/fleet-server/v7/internal/pkg/action" "github.com/elastic/fleet-server/v7/internal/pkg/api" "github.com/elastic/fleet-server/v7/internal/pkg/build" @@ -543,7 +544,7 @@ func (f *Fleet) runSubsystems(ctx context.Context, cfg *config.Config, g *errgro auditT := api.NewAuditT(&cfg.Inputs[0].Server, bulker, f.cache) for _, endpoint := range (&cfg.Inputs[0].Server).BindEndpoints() { - apiServer := api.NewServer(endpoint, &cfg.Inputs[0].Server, ct, et, at, ack, st, sm, f.bi, ut, ft, pt, auditT, bulker, tracer) + apiServer := api.NewServer(endpoint, &cfg.Inputs[0].Server, ct, et, at, ack, st, sm, f.bi, ut, ft, pt, auditT, bulker, f.cache, pm, tracer) g.Go(loggedRunFunc(ctx, "Http server", func(ctx context.Context) error { return apiServer.Run(ctx) }))