Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: optimize proctree memory consumption #4384

Merged
merged 5 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/docs/advanced/data-sources/builtin/process-tree.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ The underlying structure is populated using the core `sched_process_fork`, `sche

> Introducing this secondary event source is strategic: it reduces interference with actively traced events, leading to more accurate and granular updates in the process tree.

The number of processes retained in the tree hinges on cache size. We have two separate caches at play: one for processes and another for threads. Both default to a size of 32K, supporting tracking for up to 32,768 processes and the same number of threads. It's worth noting that these are LRU caches: once full, they'll evict the least recently accessed entries to accommodate fresh ones.
The number of processes retained in the tree hinges on cache size. We have two separate caches at play: one for processes and another for threads. The default cache size for processes is 16K, supporting tracking for up to 16,384 processes, while the thread cache is 32K, supporting tracking for up to 32,768 threads. On average, a configuration ratio of 2:1 (thread:cache) is defined, as one thread is created for every process. It's worth noting that these are LRU caches: once full, they'll evict the least recently accessed entries to accommodate fresh ones.

The process tree query the procfs upon initialization and during runtime to fill missing data:
* During initialization, it runs over all procfs to fill all existing processes and threads
Expand All @@ -34,7 +34,7 @@ Example:
signals | process tree is built from signals.
both | process tree is built from both events and signals.
--proctree process-cache=8192 | will cache up to 8192 processes in the tree (LRU cache).
--proctree thread-cache=4096 | will cache up to 4096 threads in the tree (LRU cache).
--proctree thread-cache=16384 | will cache up to 16384 threads in the tree (LRU cache).
--proctree process-cache-ttl=60 | will set the process cache element TTL to 60 seconds.
--proctree thread-cache-ttl=60 | will set the thread cache element TTL to 60 seconds.
--proctree disable-procfs-query | Will disable procfs quering during runtime
Expand Down
231 changes: 34 additions & 197 deletions pkg/changelog/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,219 +6,56 @@ import (
"github.com/aquasecurity/tracee/pkg/logger"
)

type comparable interface {
~int | ~float64 | ~string
}

type item[T comparable] struct {
timestamp time.Time // timestamp of the change
value T // value of the change
}

// The changelog package provides a changelog data structure. It is a list of changes, each with a
// timestamp. The changelog can be queried for the value at a given time.

// Changelog manages a list of changes (entries) for a single type.
// When instantiating a Changelog struct, one must supply the maximum amount of changes
// that can be tracked.
//
// ATTENTION: You should use Changelog within a struct and provide methods to access it,
// coordinating access through your struct mutexes. DO NOT EXPOSE the changelog object directly to
// coordinating access through your struct mutexes. DO NOT EXPOSE the Changelog object directly to
// the outside world as it is not thread-safe.

type Changelog[T comparable] struct {
changes []item[T] // list of changes
timestamps map[time.Time]struct{} // set of timestamps (used to avoid duplicates)
maxSize int // maximum amount of changes to keep track of
list entryList[T]
}

// NewChangelog creates a new changelog.
func NewChangelog[T comparable](maxSize int) *Changelog[T] {
return &Changelog[T]{
changes: []item[T]{},
timestamps: map[time.Time]struct{}{},
maxSize: maxSize,
// NewChangelog initializes a new `Changelog` with the specified maximum number of entries.
func NewChangelog[T comparable](maxEntries MaxEntries) *Changelog[T] {
if maxEntries <= 0 {
logger.Fatalw("maxEntries must be greater than 0")
}
}

// Getters

// GetCurrent: Observation on single element changelog.
//
// If there's one element in the changelog, after the loop, left would be set to 1 if the single
// timestamp is before the targetTime, and 0 if it's equal or after.
//
// BEFORE: If the single timestamp is before the targetTime, when we return
// clv.changes[left-1].value, returns clv.changes[0].value, which is the expected behavior.
//
// AFTER: If the single timestamp is equal to, or after the targetTime, the current logic would
// return a "zero" value because of the condition if left == 0.
//
// We need to find the last change that occurred before or exactly at the targetTime. The binary
// search loop finds the position where a new entry with the targetTime timestamp would be inserted
// to maintain chronological order:
//
// This position is stored in "left".
//
// So, to get the last entry that occurred before the targetTime, we need to access the previous
// position, which is left-1.
//
// GetCurrent returns the latest value of the changelog.
func (clv *Changelog[T]) GetCurrent() T {
if len(clv.changes) == 0 {
return returnZero[T]()
}

return clv.changes[len(clv.changes)-1].value
}

// Get returns the value of the changelog at the given time.
func (clv *Changelog[T]) Get(targetTime time.Time) T {
if len(clv.changes) == 0 {
return returnZero[T]()
}

idx := clv.findIndex(targetTime)
if idx == 0 {
return returnZero[T]()
}

return clv.changes[idx-1].value
}

// GetAll returns all the values of the changelog.
func (clv *Changelog[T]) GetAll() []T {
values := make([]T, 0, len(clv.changes))
for _, change := range clv.changes {
values = append(values, change.value)
newList := newEntryList[T](maxEntries)
// DEBUG: uncomment this to populate entries to measure memory footprint.
// newList.populateEntries()
return &Changelog[T]{
list: newList,
}
return values
}

// Setters

// SetCurrent sets the latest value of the changelog.
func (clv *Changelog[T]) SetCurrent(value T) {
clv.setAt(value, time.Now())
}

// Set sets the value of the changelog at the given time.
func (clv *Changelog[T]) Set(value T, targetTime time.Time) {
clv.setAt(value, targetTime)
// Set adds or updates an entry in the Changelog, ordered by timestamp.
// If the new entry has the same value as the latest one, only the timestamp is updated.
// If there are already the maximum number of entries, it reuses or replaces an existing entry.
func (c *Changelog[T]) Set(value T, timestamp time.Time) {
c.list = c.list.set(value, timestamp)
}

// private

// setAt sets the value of the changelog at the given time.
func (clv *Changelog[T]) setAt(value T, targetTime time.Time) {
// If the timestamp is already set, update that value only.
_, ok := clv.timestamps[targetTime]
if ok {
index := clv.findIndex(targetTime) - 1
if index < 0 {
logger.Debugw("changelog internal error: illegal index for existing timestamp")
}
if !clv.changes[index].timestamp.Equal(targetTime) { // sanity check only (time exists already)
logger.Debugw("changelog internal error: timestamp mismatch")
return
}
if clv.changes[index].value != value {
logger.Debugw("changelog error: value mismatch for same timestamp")
}
clv.changes[index].value = value
return
}

entry := item[T]{
timestamp: targetTime,
value: value,
}

idx := clv.findIndex(entry.timestamp)
// If the changelog has reached its maximum size and the new change would be inserted as the oldest,
// there is no need to add the new change. We can simply return without making any modifications.
if len(clv.changes) >= clv.maxSize && idx == 0 {
return
}
// Insert the new entry in the changelog, keeping the list sorted by timestamp.
clv.changes = append(clv.changes, item[T]{})
copy(clv.changes[idx+1:], clv.changes[idx:])
clv.changes[idx] = entry
// Mark the timestamp as set.
clv.timestamps[targetTime] = struct{}{}

clv.enforceSizeBoundary()
// Get retrieves the value of the entry at or before the given timestamp.
// If no matching entry is found, it returns the default value for the entry type.
func (c *Changelog[T]) Get(timestamp time.Time) T {
return c.list.get(timestamp)
}

// findIndex returns the index of the first item in the changelog that is after the given time.
func (clv *Changelog[T]) findIndex(target time.Time) int {
left, right := 0, len(clv.changes)

for left < right {
middle := (left + right) / 2
if clv.changes[middle].timestamp.After(target) {
right = middle
} else {
left = middle + 1
}
}

return left
// GetCurrent retrieves the most recent value.
// If no entry is found, it returns the default value for the entry type.
func (c *Changelog[T]) GetCurrent() T {
return c.list.getCurrent()
}

// enforceSizeBoundary ensures that the size of the inner array doesn't exceed the limit.
// It applies two methods to reduce the log size to the maximum allowed:
// 1. Unite duplicate values that are trailing one another, removing the oldest of the pair.
// 2. Remove the oldest logs as they are likely less important.

func (clv *Changelog[T]) enforceSizeBoundary() {
if len(clv.changes) <= clv.maxSize {
// Nothing to do
return
}

boundaryDiff := len(clv.changes) - clv.maxSize
changed := false

// Compact the slice in place
writeIdx := 0
for readIdx := 0; readIdx < len(clv.changes); readIdx++ {
nextIdx := readIdx + 1
if nextIdx < len(clv.changes) &&
clv.changes[nextIdx].value == clv.changes[readIdx].value &&
boundaryDiff > 0 {
// Remove the oldest (readIdx) from the duplicate pair
delete(clv.timestamps, clv.changes[readIdx].timestamp)
boundaryDiff--
changed = true
continue
}

// If elements have been removed or moved, update the map and the slice
if changed {
clv.changes[writeIdx] = clv.changes[readIdx]
}

writeIdx++
}

if changed {
clear(clv.changes[writeIdx:])
clv.changes = clv.changes[:writeIdx]
}

if len(clv.changes) <= clv.maxSize {
// Size is within limits after compaction
return
}

// As it still exceeds maxSize, remove the oldest entries in the remaining slice
boundaryDiff = len(clv.changes) - clv.maxSize
for i := 0; i < boundaryDiff; i++ {
delete(clv.timestamps, clv.changes[i].timestamp)
}
clear(clv.changes[:boundaryDiff])
clv.changes = clv.changes[boundaryDiff:]
// GetAll retrieves all values, from the newest to the oldest.
func (c *Changelog[T]) GetAll() []T {
return c.list.getAll()
}

// returnZero returns the zero value of the type T.
func returnZero[T any]() T {
var zero T
return zero
// Count returns the number of entries recorded.
func (c *Changelog[T]) Count() int {
return len(c.list.entries)
}
Loading
Loading