Skip to content

Commit

Permalink
Cache and limit defaults are derived from system memory.
Browse files Browse the repository at this point in the history
Allow the fleet-server to get matching limits settings based off system
memory if no agent max is provided. Allow default settings to be used
if a negative value is specified as the agents max setting.
Change ServerLimits and Cache InitDefault functions to nops to fix an
undiscovered bug with loading limits.
  • Loading branch information
michel-laterman committed Nov 10, 2023
1 parent 0718a55 commit a39e6c7
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 42 deletions.
4 changes: 1 addition & 3 deletions internal/pkg/config/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ type Cache struct {
APIKeyJitter time.Duration `config:"jitter_api_key"`
}

func (c *Cache) InitDefaults() {
c.LoadLimits(loadLimits(0))
}
func (c *Cache) InitDefaults() {}

// LoadLimits loads envLimits for any attribute that is not defined in Cache
func (c *Cache) LoadLimits(limits *envLimits) {
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func FromConfig(c *ucfg.Config) (*Config, error) {
}

// LoadFile take a path and load the file and return a new configuration.
// Only used in tests
func LoadFile(path string) (*Config, error) {
c, err := yaml.NewConfigWithFile(path, DefaultOptions...)
if err != nil {
Expand Down
86 changes: 79 additions & 7 deletions internal/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/gofrs/uuid"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -125,15 +126,15 @@ func TestConfig(t *testing.T) {
},
CompressionLevel: 1,
CompressionThresh: 1024,
Limits: generateServerLimits(12500),
Limits: generateServerLimits(0),
Bulk: defaultServerBulk(),
GC: defaultServerGC(),
PGP: PGP{
UpstreamURL: defaultPGPUpstreamURL,
Dir: filepath.Join(retrieveExecutableDir(), defaultPGPDirectoryName),
},
},
Cache: generateCache(12500),
Cache: generateCache(0),
Monitor: Monitor{
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
Expand All @@ -160,7 +161,8 @@ func TestConfig(t *testing.T) {

for name, test := range testcases {
t.Run(name, func(t *testing.T) {
_ = testlog.SetLogger(t)
l := testlog.SetLogger(t)
zerolog.DefaultContextLogger = &l
path := filepath.Join("testdata", name+".yml")
cfg, err := LoadFile(path)
if test.err != "" {
Expand All @@ -184,6 +186,43 @@ func TestConfig(t *testing.T) {
}
})
}

t.Run("config specifies agent count", func(t *testing.T) {
l := testlog.SetLogger(t)
zerolog.DefaultContextLogger = &l
path := filepath.Join("testdata", "input-specify-agents.yml")
cfg, err := LoadFile(path)
t.Logf("cfg fileread: %+v", cfg.Inputs[0].Server.Limits)
require.NoError(t, err)
err = cfg.LoadServerLimits()
require.NoError(t, err)
t.Logf("cfg loaded: %+v", cfg.Inputs[0].Server.Limits)

t.Log("Before expect")
expected := Config{
Fleet: defaultFleet(),
Output: Output{
Elasticsearch: defaultElastic(),
},
Inputs: []Input{
{
Type: "fleet-server",
Server: defaultServer(),
Cache: generateCache(2500),
Monitor: Monitor{
FetchSize: defaultFetchSize,
PollTimeout: defaultPollTimeout,
},
},
},
Logging: defaultLogging(),
HTTP: defaultHTTP(),
}
expected.Inputs[0].Server.Limits = generateServerLimits(2500)
t.Log("After expect")
assert.EqualExportedValues(t, expected, *cfg)

})
}

func TestLoadStandaloneAgentMetadata(t *testing.T) {
Expand All @@ -203,8 +242,39 @@ func TestLoadServerLimits(t *testing.T) {
c := &Config{Inputs: []Input{{}}}
err := c.LoadServerLimits()
assert.NoError(t, err)
assert.Equal(t, int64(defaultCheckinMaxBody), c.Inputs[0].Server.Limits.CheckinLimit.MaxBody)
assert.Equal(t, defaultActionTTL, c.Inputs[0].Cache.ActionTTL)
assert.NotZero(t, c.Inputs[0].Server.Limits.CheckinLimit.MaxBody)
assert.NotZero(t, c.Inputs[0].Cache.ActionTTL)
})
t.Run("agent count limits load", func(t *testing.T) {
c := &Config{Inputs: []Input{{
Server: Server{
Limits: ServerLimits{
MaxAgents: 2500,
},
},
}}}
err := c.LoadServerLimits()
assert.NoError(t, err)
assert.NotZero(t, c.Inputs[0].Server.Limits.CheckinLimit.MaxBody)
assert.Equal(t, time.Millisecond*5, c.Inputs[0].Server.Limits.CheckinLimit.Interval)

})
t.Run("agent count limits load does not override", func(t *testing.T) {
c := &Config{Inputs: []Input{{
Server: Server{
Limits: ServerLimits{
MaxAgents: 2500,
ActionLimit: Limit{
Interval: time.Millisecond,
},
},
},
}}}
err := c.LoadServerLimits()
assert.NoError(t, err)
assert.NotZero(t, c.Inputs[0].Server.Limits.CheckinLimit.MaxBody)
assert.Equal(t, time.Millisecond, c.Inputs[0].Server.Limits.ActionLimit.Interval)

})
t.Run("existing values are not overridden", func(t *testing.T) {
c := &Config{
Expand All @@ -224,8 +294,8 @@ func TestLoadServerLimits(t *testing.T) {
err := c.LoadServerLimits()
assert.NoError(t, err)
assert.Equal(t, int64(5*defaultCheckinMaxBody), c.Inputs[0].Server.Limits.CheckinLimit.MaxBody)
assert.Equal(t, defaultCheckinBurst, c.Inputs[0].Server.Limits.CheckinLimit.Burst)
assert.Equal(t, time.Minute, c.Inputs[0].Cache.ActionTTL)
assert.NotZero(t, c.Inputs[0].Server.Limits.CheckinLimit.Burst)
assert.NotZero(t, c.Inputs[0].Cache.ActionTTL)
})

}
Expand All @@ -235,6 +305,7 @@ func TestLoadServerLimits(t *testing.T) {
func defaultCache() Cache {
var d Cache
d.InitDefaults()
d.LoadLimits(loadLimits(0))
return d
}

Expand Down Expand Up @@ -299,6 +370,7 @@ func defaultElastic() Elasticsearch {
func defaultServer() Server {
var d Server
d.InitDefaults()
d.Limits.LoadLimits(loadLimits(0))
return d
}

Expand Down
87 changes: 63 additions & 24 deletions internal/pkg/config/env_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,43 +41,44 @@ const (
defaultArtifactMaxBody = 0

defaultEnrollInterval = time.Millisecond * 10
defaultEnrollBurst = 100
defaultEnrollMax = 50
defaultEnrollBurst = 50
defaultEnrollMax = 100
defaultEnrollMaxBody = 1024 * 512

defaultAckInterval = time.Millisecond * 10
defaultAckBurst = 100
defaultAckMax = 50
defaultAckBurst = 50
defaultAckMax = 100
defaultAckMaxBody = 1024 * 1024 * 2

defaultStatusInterval = time.Millisecond * 5
defaultStatusBurst = 25
defaultStatusMax = 50
defaultStatusMaxBody = 0

defaultUploadStartInterval = time.Second * 3
defaultUploadStartBurst = 8
defaultUploadStartMax = 3
defaultUploadStartInterval = time.Second * 2
defaultUploadStartBurst = 5
defaultUploadStartMax = 10
defaultUploadStartMaxBody = 1024 * 1024 * 5

defaultUploadEndInterval = time.Second * 2
defaultUploadEndBurst = 5
defaultUploadEndMax = 2
defaultUploadEndMax = 10
defaultUploadEndMaxBody = 1024

defaultUploadChunkInterval = time.Millisecond * 3
defaultUploadChunkBurst = 10
defaultUploadChunkMax = 5
defaultUploadChunkBurst = 5
defaultUploadChunkMax = 10
defaultUploadChunkMaxBody = 1024 * 1024 * 4 // this is also enforced in handler, a chunk MAY NOT be larger than 4 MiB

defaultFileDelivInterval = time.Millisecond * 100
defaultFileDelivBurst = 8
defaultFileDelivMax = 5
defaultFileDelivMaxBody = 1024 * 1024 * 5
defaultFileDelivBurst = 5
defaultFileDelivMax = 10
defaultFileDelivMaxBody = 0

defaultPGPRetrievalInterval = time.Millisecond * 10
defaultPGPRetrievalBurst = 100
defaultPGPRetrievalInterval = time.Millisecond * 5
defaultPGPRetrievalBurst = 25
defaultPGPRetrievalMax = 50
defaultPGPRetrievalMaxBody = 0
)

type valueRange struct {
Expand Down Expand Up @@ -116,10 +117,20 @@ func defaultCacheLimits() *cacheLimits {
}

type limit struct {
// Interval is the rate limiter's max frequency of requests (1s means 1req/s, 1ms means 1req/ms)
// A rate of 0 disables the rate limiter
Interval time.Duration `config:"interval"`
Burst int `config:"burst"`
Max int64 `config:"max"`
MaxBody int64 `config:"max_body_byte_size"`
// Burst is the rate limiter's burst allocation that allows for spikes of traffic.
// Having a burst value > max is functionally setting it to the same as max.
// A burst of 0 allows no requests.
Burst int `config:"burst"`
// Max is the total number of requests allowed to an endpoint.
// A zero value disables the max limiter
Max int64 `config:"max"`
// MaxBody is the request body size limit.
// Used in the ack, checkin, and enroll endpoints.
// A zero value disabled the check.
MaxBody int64 `config:"max_body_byte_size"`
}

type serverLimitDefaults struct {
Expand Down Expand Up @@ -206,7 +217,7 @@ func defaultserverLimitDefaults() *serverLimitDefaults {
Interval: defaultPGPRetrievalInterval,
Burst: defaultPGPRetrievalBurst,
Max: defaultPGPRetrievalMax,
MaxBody: 0,
MaxBody: defaultPGPRetrievalMaxBody,
},
}
}
Expand Down Expand Up @@ -248,13 +259,16 @@ func init() {
}
}

// loadLimits loads cache and server_limit settings based on the passed agentLimit number.
// If agentLimit < 0 the default settings are used.
// If agentLimit > 0 the settings from the matching default/*.yml file are used based off agent count.
// If agentLimit == 0 then the settings from default/*.yml are used based off system memory.
// If a lookup fails, default settings are used.
func loadLimits(agentLimit int) *envLimits {
return loadLimitsForAgents(agentLimit)
}

func loadLimitsForAgents(agentLimit int) *envLimits {
if agentLimit == 0 {
if agentLimit < 0 {
return defaultEnvLimits()
} else if agentLimit == 0 {
return memEnvLimits()
}
log := zerolog.Ctx(context.TODO())
for _, l := range defaults {
Expand All @@ -272,6 +286,31 @@ func loadLimitsForAgents(agentLimit int) *envLimits {
return defaultEnvLimits()
}

// memMB returns the system total memory in MB
// It wraps memory.TotalMemory() so that we can replace the var in unit tests.
var memMB func() int = func() int {
return int(memory.TotalMemory() / 1024 / 1024)
}

func memEnvLimits() *envLimits {
mem := memMB()
k := 0
recRAM := 0
log := zerolog.Ctx(context.TODO())
for i, l := range defaults {
if mem >= l.RecommendedRAM && l.RecommendedRAM > recRAM {
k = i
recRAM = l.RecommendedRAM
}
}
if recRAM == 0 {
log.Warn().Int("memory_mb", mem).Msg("No settings with recommended ram found, using default.")
return defaultEnvLimits()
}
log.Info().Int("memory_mb", mem).Int("recommended_mb", recRAM).Msg("Found settings with recommended ram.")
return defaults[k]
}

func getMaxInt() int64 {
if strings.HasSuffix(runtime.GOARCH, "64") {
return math.MaxInt64
Expand Down
5 changes: 2 additions & 3 deletions internal/pkg/config/env_defaults_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions internal/pkg/config/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ type ServerLimits struct {
}

// InitDefaults initializes the defaults for the configuration.
func (c *ServerLimits) InitDefaults() {
c.LoadLimits(loadLimits(0))
}
func (c *ServerLimits) InitDefaults() {}

func (c *ServerLimits) LoadLimits(limits *envLimits) {
l := limits.Server
Expand Down
2 changes: 0 additions & 2 deletions internal/pkg/config/testdata/input-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ inputs:
server:
host: "localhost"
port: 8888
limits:
max_agents: 12500
timeouts:
read: 20s
write: 5s
Expand Down
13 changes: 13 additions & 0 deletions internal/pkg/config/testdata/input-specify-agents.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
output:
elasticsearch:
hosts: ["localhost:9200"]
service_token: "test-token"
fleet:
agent:
id: 1e4954ce-af37-4731-9f4a-407b08e69e42
inputs:
- type: fleet-server
server:
limits:
max_agents: 2500

0 comments on commit a39e6c7

Please sign in to comment.