Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Sep 20, 2023
1 parent c2425d8 commit 19dc846
Show file tree
Hide file tree
Showing 13 changed files with 310 additions and 2 deletions.
10 changes: 10 additions & 0 deletions internal/pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type apiServer struct {
bi build.Info
ut *UploadT
ft *FileDeliveryT
pt *PGPRetrieverT
bulker bulk.Bulk
}

Expand Down Expand Up @@ -138,6 +139,15 @@ func (a *apiServer) GetFile(w http.ResponseWriter, r *http.Request, id string) {
}
}

func (a *apiServer) GetPGPKey(w http.ResponseWriter, r *http.Request, major, minor, patch int) {
zlog := hlog.FromRequest(r).With().Logger()
if err := a.pt.handlePGPKey(zlog, w, r, major, minor, patch); err != nil {
cntGetPGP.IncError(err)
w.Header().Set("Content-Type", "application/json")
ErrorResp(w, r, err)
}
}

func (a *apiServer) Status(w http.ResponseWriter, r *http.Request, params StatusParams) {
zlog := hlog.FromRequest(r).With().
Str("mod", kStatusMod).
Expand Down
18 changes: 18 additions & 0 deletions internal/pkg/api/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,24 @@ func NewHTTPErrResp(err error) HTTPErrResp {
zerolog.InfoLevel,
},
},
{
ErrTLSRequired,
HTTPErrResp{
http.StatusNotImplemented,
"ErrTLSRequired",
"server must run with tls to use this endpoint",
zerolog.InfoLevel,
},
},
{
ErrPGPPermissions,
HTTPErrResp{
http.StatusInternalServerError,
"ErrPGPPermissions",
"fleet-server PGP key has incorrect permissions",
zerolog.ErrorLevel,
},
},
// apikey
{
apikey.ErrNoAuthHeader,
Expand Down
168 changes: 168 additions & 0 deletions internal/pkg/api/handlePGPRequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package api

import (
"context"
"errors"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/cache"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/rs/zerolog"
"go.elastic.co/apm/v2"
)

const defaultKeyName = "default.pgp"

var (
ErrTLSRequired = errors.New("api call requires a TLS connection")
ErrPGPPermissions = errors.New("pgp key permissions are not 0700")
)

type PGPRetrieverT struct {
bulker bulk.Bulk
cache cache.Cache
cfg config.PGP
}

func NewPGPRetrieverT(cfg *config.Server, bulker bulk.Bulk, c cache.Cache) *PGPRetrieverT {
return &PGPRetrieverT{
bulker: bulker,
cache: c,
cfg: cfg.PGP,
}
}

func (pt *PGPRetrieverT) handlePGPKey(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, _, _, _ int) error {
if r.TLS == nil {
return ErrTLSRequired
}
key, err := authAPIKey(r, pt.bulker, pt.cache)
if err != nil {
return err
}
zlog = zlog.With().Str(LogEnrollAPIKeyID, key.ID).Logger()
ctx := zlog.WithContext(r.Context())
r = r.WithContext(ctx)

Check failure on line 50 in internal/pkg/api/handlePGPRequest.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA4017: WithContext doesn't have side effects and its return value is ignored (staticcheck)

p, err := pt.getPGPKey(ctx, zlog)
if err != nil {
return err
}

_, err = w.Write(p)
return err
}

// getPGPKey will return the PGP key bytes
//
// First the local cache will be checked

Check failure on line 63 in internal/pkg/api/handlePGPRequest.go

View workflow job for this annotation

GitHub Actions / lint (linux)

File is not `goimports`-ed (goimports)
//
// If it's not found in the cache, we attempt to read from disk
// If it's found we set the cache and return the bytes
//
// If it's not found on disk we attempt to retrieve the upstream key
// If that succeeds we set the cache then write to disk (with best effort).
func (pt *PGPRetrieverT) getPGPKey(ctx context.Context, zlog zerolog.Logger) ([]byte, error) {
// key that will be retrieved, if this ever changes we should ensure that it can't be used as part of an injection attack as it is joined as part of the filepath for operations.
key := defaultKeyName

span, ctx := apm.StartSpan(ctx, "getPGPKey", "process")
span.Context.SetLabel("key", key)
defer span.End()

p, ok := pt.cache.GetPGPKey(key)
if ok {
return p, nil
}
p, err := pt.getPGPFromDir(ctx, key)

// sucessfully retrieved from disk

Check failure on line 84 in internal/pkg/api/handlePGPRequest.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`sucessfully` is a misspelling of `successfully` (misspell)
if err == nil {
pt.cache.SetPGPKey(key, p)
return p, nil
}

// file exists but the read failed
if !errors.Is(err, fs.ErrNotExist) {
return nil, err
}

// file does not exist
p, err = pt.getPGPFromUpstream(ctx)
if err != nil {
return nil, err
}
pt.cache.SetPGPKey(key, p)
pt.writeKeyToDir(ctx, zlog, key, p)
return p, nil
}

// getPGPFromDir will return the PGP contents if found in the directory
//
// Key contents are only returned if the key has valid permission bits.
func (pt *PGPRetrieverT) getPGPFromDir(ctx context.Context, key string) ([]byte, error) {
span, ctx := apm.StartSpan(ctx, "getPGPFromDir", "process")

Check failure on line 109 in internal/pkg/api/handlePGPRequest.go

View workflow job for this annotation

GitHub Actions / lint (linux)

ineffectual assignment to ctx (ineffassign)
defer span.End()

stat, err := os.Stat(filepath.Join(pt.cfg.Dir, key))
if err != nil {
return nil, err
}
if stat.Mode().Perm() != 0700 { // TODO determine what permission bits we want to check
return nil, ErrPGPPermissions
}
return os.ReadFile(filepath.Join(pt.cfg.Dir, key))
}

// getPGPFromUpstream returns the PGP key contentents from the key specified in the upstream URL.
func (pt *PGPRetrieverT) getPGPFromUpstream(ctx context.Context) ([]byte, error) {
span, ctx := apm.StartSpan(ctx, "getPGPFromUpstream", "process")
defer span.End()

// NOTE: If we are concerned about this making too many requests we can add a lock, or use something like sync.Once
req, err := http.NewRequestWithContext(ctx, http.MethodGet, pt.cfg.UpstreamURL, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}

// writeKeyToDir will write the specified key to the keys directory
//
// If the directory does not exist it will create it
// Otherwise it is treated as a best-effort attempt
func (pt *PGPRetrieverT) writeKeyToDir(ctx context.Context, zlog zerolog.Logger, key string, p []byte) {
span, ctx := apm.StartSpan(ctx, "writeKeyToDir", "process")

Check failure on line 145 in internal/pkg/api/handlePGPRequest.go

View workflow job for this annotation

GitHub Actions / lint (linux)

ineffectual assignment to ctx (ineffassign)
defer span.End()

_, err := os.Stat(pt.cfg.Dir)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
if err := os.Mkdir(pt.cfg.Dir, 0700); err != nil {
zlog.Error().Err(err).Str("path", pt.cfg.Dir).Msgf("Unable to create directory")
return
}
} else {
zlog.Error().Err(err).Str("path", pt.cfg.Dir).Msgf("Unable to verify if directory exists")
return
}
}

err = os.WriteFile(filepath.Join(pt.cfg.Dir, key), p, 0700)

Check failure on line 161 in internal/pkg/api/handlePGPRequest.go

View workflow job for this annotation

GitHub Actions / lint (linux)

G306: Expect WriteFile permissions to be 0600 or less (gosec)
if err != nil {
zlog.Error().Err(err).Str("path", filepath.Join(pt.cfg.Dir, key)).Msg("Unable to write file.")
return
}
zlog.Info().Str("path", filepath.Join(pt.cfg.Dir, key)).Msg("Key written to storage.")

}
2 changes: 2 additions & 0 deletions internal/pkg/api/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
cntUploadChunk routeStats
cntUploadEnd routeStats
cntFileDeliv routeStats
cntGetPGP routeStats
cntArtifacts artifactStats

infoReg sync.Once
Expand Down Expand Up @@ -73,6 +74,7 @@ func init() {
cntUploadChunk.Register(routesRegistry.newRegistry("uploadChunk"))
cntUploadEnd.Register(routesRegistry.newRegistry("uploadEnd"))
cntFileDeliv.Register(routesRegistry.newRegistry("deliverFile"))
cntGetPGP.Register(routesRegistry.newRegistry("getPGPKey"))

}

Expand Down
11 changes: 11 additions & 0 deletions internal/pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api

import (
"net/http"
"regexp"
"strings"

"github.com/elastic/fleet-server/v7/internal/pkg/config"
Expand Down Expand Up @@ -47,6 +48,7 @@ type limiter struct {
uploadChunk *limit.Limiter
uploadComplete *limit.Limiter
deliverFile *limit.Limiter
getPGPKey *limit.Limiter
}

func Limiter(cfg *config.ServerLimits) *limiter {
Expand All @@ -60,9 +62,12 @@ func Limiter(cfg *config.ServerLimits) *limiter {
uploadChunk: limit.NewLimiter(&cfg.UploadChunkLimit),
uploadComplete: limit.NewLimiter(&cfg.UploadEndLimit),
deliverFile: limit.NewLimiter(&cfg.DeliverFileLimit),
getPGPKey: limit.NewLimiter(&cfg.GetPGPKey),
}
}

var pgpReg = regexp.MustCompile(`\/api\/agents\/upgrades\/[0-9]+\.[0-9]+\.[0-9]+\/pgp-public-key`)

// pathToOperation determines the endpoint passed on the request path.
// idealy we would be able to use chi's route context, but it is not ready this early in the stack
//
Expand All @@ -75,6 +80,10 @@ func pathToOperation(path string) string {
if path == "/api/fleet/uploads" {
return "uploadBegin"
}
if pgpReg.MatchString(path) {
return "getPGPKey"
}

if strings.HasPrefix(path, "/api/fleet/") {
pp := strings.Split(strings.TrimPrefix(path, "/"), "/")
if len(pp) == 4 {
Expand Down Expand Up @@ -119,6 +128,8 @@ func (l *limiter) middleware(next http.Handler) http.Handler {
l.uploadChunk.Wrap("uploadChunk", &cntUploadChunk, zerolog.DebugLevel)(next).ServeHTTP(w, r)
case "deliverFile":
l.deliverFile.Wrap("deliverFile", &cntFileDeliv, zerolog.DebugLevel)(next).ServeHTTP(w, r)
case "getPGPKey":
l.getPGPKey.Wrap("getPGPKey", &cntGetPGP, zerolog.DebugLevel)(next).ServeHTTP(w, r)
case "status":
l.status.Wrap("status", &cntStatus, zerolog.DebugLevel)(next).ServeHTTP(w, r)
default:
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type server struct {
//
// The server has a listener specific conn limit and endpoint specific rate-limits.
// The underlying API structs (such as *CheckinT) may be shared between servers.
func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, sm policy.SelfMonitor, bi build.Info, ut *UploadT, ft *FileDeliveryT, bulker bulk.Bulk, tracer *apm.Tracer) *server {
func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at *ArtifactT, ack *AckT, st *StatusT, sm policy.SelfMonitor, bi build.Info, ut *UploadT, ft *FileDeliveryT, pt *PGPRetrieverT, bulker bulk.Bulk, tracer *apm.Tracer) *server {
a := &apiServer{
ct: ct,
et: et,
Expand All @@ -45,6 +45,7 @@ func NewServer(addr string, cfg *config.Server, ct *CheckinT, et *EnrollerT, at
bi: bi,
ut: ut,
ft: ft,
pt: pt,
bulker: bulker,
}
return &server{
Expand Down
35 changes: 35 additions & 0 deletions internal/pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type Cache interface {

SetUpload(id string, info file.Info)
GetUpload(id string) (file.Info, bool)

SetPGPKey(id string, p []byte)
GetPGPKey(id string) ([]byte, bool)
}

type APIKey = apikey.APIKey
Expand Down Expand Up @@ -309,3 +312,35 @@ func (c *CacheT) GetUpload(id string) (file.Info, bool) { //nolint:dupl // a lit
log.Trace().Str("id", id).Msg("upload info cache MISS")
return file.Info{}, false
}

func (c *CacheT) SetPGPKey(id string, p []byte) {
c.mut.RLock()
defer c.mut.RUnlock()

scopedKey := "pgp:" + id
ttl := 30 * time.Minute // @todo: add to configurable
ok := c.cache.SetWithTTL(scopedKey, p, int64(len(p)), ttl)
log.Trace().
Bool("ok", ok).
Str("id", id).
Int("cost", len(p)).
Dur("ttl", ttl).
Msg("PGP key cache SET")

}
func (c *CacheT) GetPGPKey(id string) ([]byte, bool) {
c.mut.RLock()
defer c.mut.RUnlock()

scopedKey := "pgp:" + id
if v, ok := c.cache.Get(scopedKey); ok {
log.Trace().Str("id", id).Msg("PGP key cache HIT")
key, ok := v.([]byte)
if !ok {
log.Error().Str("id", id).Msg("upload info cache cast fail")
return nil, false
}
return key, ok
}
return nil, false
}
11 changes: 11 additions & 0 deletions internal/pkg/config/env_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ const (
defaultFileDelivBurst = 8
defaultFileDelivMax = 5
defaultFileDelivMaxBody = 1024 * 1024 * 5

defaultPGPRetrievalInterval = time.Millisecond * 10
defaultPGPRetrievalBurst = 100
defaultPGPRetrievalMax = 50
)

type valueRange struct {
Expand Down Expand Up @@ -127,6 +131,7 @@ type serverLimitDefaults struct {
UploadEndLimit limit `config:"upload_end_limit"`
UploadChunkLimit limit `config:"upload_chunk_limit"`
DeliverFileLimit limit `config:"file_delivery_limit"`
GetPGPKeyLimit limit `config:"pgp_retrieval_limit"`
}

func defaultserverLimitDefaults() *serverLimitDefaults {
Expand Down Expand Up @@ -188,6 +193,12 @@ func defaultserverLimitDefaults() *serverLimitDefaults {
Max: defaultFileDelivMax,
MaxBody: defaultFileDelivMaxBody,
},
GetPGPKeyLimit: limit{
Interval: defaultPGPRetrievalInterval,
Burst: defaultPGPRetrievalBurst,
Max: defaultPGPRetrievalMax,
MaxBody: 0,
},
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/config/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ type (
GC GC `config:"gc"`
Instrumentation Instrumentation `config:"instrumentation"`
StaticPolicyTokens StaticPolicyTokens `config:"static_policy_tokens"`
PGP PGP `config:"pgp"`
}

StaticPolicyTokens struct {
Expand Down Expand Up @@ -104,6 +105,7 @@ func (c *Server) InitDefaults() {
c.Runtime.InitDefaults()
c.Bulk.InitDefaults()
c.GC.InitDefaults()
c.PGP.InitDefaults()
}

// BindEndpoints returns the binding address for the all HTTP server listeners.
Expand Down
Loading

0 comments on commit 19dc846

Please sign in to comment.