Skip to content

Commit

Permalink
chore(changelog): reduce mem footprint
Browse files Browse the repository at this point in the history
The current Changelog structure consumes a significant amount of memory
due to the allocation of metadata for each field/instance. As the number
of fields increases, the memory usage grows linearly. Approximately 240
bytes per field were observed just for metadata, excluding the actual
data and pointers for each field.

To reduce memory consumption, the new changelog.Entries[T]
implementation uses a flat slice of T type instead of storing metadata
for each field separately.

---

| Caches | GOGC | Branch | *Heap Use | *Heap   | Diff of | Proctree |
|        |      |        | (Avg)     | Growth  | main    |          |
|--------|------|--------|--------- -|------- -|---------|----------|
| -      | -    | main   | 28        | -       | -       | off      |
| 16384  | -    | main   | 199       | 171     | -       | on       |
| 32768  | -    | main   | 331       | 303     | -       | on       |
| -      | 5    | main   | 18        | -       | -       | off      |
| 16384  | 5    | main   | 125       | 107     | -       | on       |
| 32768  | 5    | main   | 209       | 191     | -       | on       |
|---------------------------------- --------- ----------------------|
| -      | -    | new    | 28        | -       | -       | off      |
| 16384  | -    | new    | 111       | 83      | -51.46% | on       |
| 32768  | -    | new    | 158       | 130     | -57.10% | on       |
| -      | 5    | new    | 18        | -       | -       | off      |
| 16384  | 5    | new    | 72        | 54      | -49.53% | on       |
| 32768  | 5    | new    | 102       | 84      | -56.02% | on       |

* in MB

With GOGC set to 5, the new implementation reduces average heap usage by
approximately 49% when using cache sizes of 16,384. For cache sizes of
32,768, the reduction is around 56%.

When GOGC is set to the default value, the reductions are roughly 51%
and 57% for cache sizes of 16,384 and 32,768, respectively.

The "Heap Use" and "Heap Growth" columns serve as a good indicator of
memory consumption and can assist in determining optimal cache sizes.
  • Loading branch information
geyslan committed Dec 4, 2024
1 parent ec0e756 commit 3d0c7a4
Show file tree
Hide file tree
Showing 6 changed files with 529 additions and 701 deletions.
315 changes: 149 additions & 166 deletions pkg/changelog/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,219 +6,202 @@ import (
"github.com/aquasecurity/tracee/pkg/logger"
)

type comparable interface {
~int | ~float64 | ~string
}

type item[T comparable] struct {
timestamp time.Time // timestamp of the change
value T // value of the change
}
//
// Entries
//

// The changelog package provides a changelog data structure. It is a list of changes, each with a
// timestamp. The changelog can be queried for the value at a given time.
// MemberKind represents the unique identifier for each kind of entry in the Entries.
// It is used to categorize different kinds of changes tracked by the Entries.
type MemberKind uint8

// ATTENTION: You should use Changelog within a struct and provide methods to access it,
// coordinating access through your struct mutexes. DO NOT EXPOSE the changelog object directly to
// the outside world as it is not thread-safe.
// MaxEntries represents the maximum number of entries that can be stored for a given kind of entry.
type MaxEntries uint8

type Changelog[T comparable] struct {
changes []item[T] // list of changes
timestamps map[time.Time]struct{} // set of timestamps (used to avoid duplicates)
maxSize int // maximum amount of changes to keep track of
// entry is an internal structure representing a single change in the Entries.
// It includes the kind of the entry, the timestamp of the change, and the value of the change.
type entry[T comparable] struct {
k MemberKind // Kind of the member, used to categorize the entry.
t time.Time // Timestamp of when the change occurred.
value T // Value of the change.
}

// NewChangelog creates a new changelog.
func NewChangelog[T comparable](maxSize int) *Changelog[T] {
return &Changelog[T]{
changes: []item[T]{},
timestamps: map[time.Time]struct{}{},
maxSize: maxSize,
}
// Entries is the main structure that manages a list of changes (entries).
// It keeps track of specifically configured members indicated by MemberKind identifiers.
// When instantiating an Entries struct, one must supply a relevant mapping between the desired
// unique members and the maximum amount of changes that member can track.
//
// ATTENTION: You should use Entries within a struct and provide methods to access it,
// coordinating access through your struct mutexes. DO NOT EXPOSE the Entries object directly to
// the outside world as it is not thread-safe.
type Entries[T comparable] struct {
entryFlags []MaxEntries // Configuration slice defining flags for each member kind.
entries []entry[T] // List of recorded entries.
}

// Getters
// NewEntries initializes a new `Entries` structure using the provided flags.
func NewEntries[T comparable](f []MaxEntries) *Entries[T] {
flags := make([]MaxEntries, 0, len(f))
for _, maxEntries := range f {
if maxEntries == 0 {
logger.Fatalw("maxEntries must be greater than 0")
}

// GetCurrent: Observation on single element changelog.
//
// If there's one element in the changelog, after the loop, left would be set to 1 if the single
// timestamp is before the targetTime, and 0 if it's equal or after.
//
// BEFORE: If the single timestamp is before the targetTime, when we return
// clv.changes[left-1].value, returns clv.changes[0].value, which is the expected behavior.
//
// AFTER: If the single timestamp is equal to, or after the targetTime, the current logic would
// return a "zero" value because of the condition if left == 0.
//
// We need to find the last change that occurred before or exactly at the targetTime. The binary
// search loop finds the position where a new entry with the targetTime timestamp would be inserted
// to maintain chronological order:
//
// This position is stored in "left".
//
// So, to get the last entry that occurred before the targetTime, we need to access the previous
// position, which is left-1.
//
// GetCurrent returns the latest value of the changelog.
func (clv *Changelog[T]) GetCurrent() T {
if len(clv.changes) == 0 {
return returnZero[T]()
flags = append(flags, maxEntries)
}

return clv.changes[len(clv.changes)-1].value
}

// Get returns the value of the changelog at the given time.
func (clv *Changelog[T]) Get(targetTime time.Time) T {
if len(clv.changes) == 0 {
return returnZero[T]()
return &Entries[T]{
entryFlags: flags,
entries: []entry[T]{},
}
}

idx := clv.findIndex(targetTime)
if idx == 0 {
return returnZero[T]()
// Set adds or updates an entry in the Entries for the specified `MemberKind` ordered by timestamp.
// If the new entry has the same value as the latest one, only the timestamp is updated.
// If there are already the maximum number of entries for this kind, it reuses or replaces an existing entry.
//
// ATTENTION: Make sure to pass a value of the correct type for the specified `MemberKind`.
func (e *Entries[T]) Set(k MemberKind, value T, t time.Time) {
if k >= MemberKind(len(e.entryFlags)) {
logger.Errorw("kind is not present in the entryFlags", "kind", k)
}

return clv.changes[idx-1].value
}
maxEntries := e.entryFlags[k]
maxSize := int(maxEntries)
indexes := make([]int, 0)

// GetAll returns all the values of the changelog.
func (clv *Changelog[T]) GetAll() []T {
values := make([]T, 0, len(clv.changes))
for _, change := range clv.changes {
values = append(values, change.value)
// collect indexes of entries equal to kind
for idx, entry := range e.entries {
if entry.k == k {
indexes = append(indexes, idx)
}
}
return values
}

// Setters

// SetCurrent sets the latest value of the changelog.
func (clv *Changelog[T]) SetCurrent(value T) {
clv.setAt(value, time.Now())
}
// if there are entries for kind check if the last entry has the same value
if len(indexes) > 0 {
lastIdx := indexes[len(indexes)-1]
if e.entries[lastIdx].value == value && t.After(e.entries[lastIdx].t) {
// only update timestamp and return
e.entries[lastIdx].t = t
return
}
}

// Set sets the value of the changelog at the given time.
func (clv *Changelog[T]) Set(value T, targetTime time.Time) {
clv.setAt(value, targetTime)
}
newEntry := entry[T]{
k: k,
t: t,
value: value,
}

// private
//
// if there is space, insert the new entry at the correct position
//

// setAt sets the value of the changelog at the given time.
func (clv *Changelog[T]) setAt(value T, targetTime time.Time) {
// If the timestamp is already set, update that value only.
_, ok := clv.timestamps[targetTime]
if ok {
index := clv.findIndex(targetTime) - 1
if index < 0 {
logger.Debugw("changelog internal error: illegal index for existing timestamp")
}
if !clv.changes[index].timestamp.Equal(targetTime) { // sanity check only (time exists already)
logger.Debugw("changelog internal error: timestamp mismatch")
if len(indexes) < maxSize {
insertPos := e.findInsertIdx(indexes, t)
if insertPos == len(e.entries) {
e.entries = append(e.entries, newEntry)
return
}
if clv.changes[index].value != value {
logger.Debugw("changelog error: value mismatch for same timestamp")
}
clv.changes[index].value = value
return
}

entry := item[T]{
timestamp: targetTime,
value: value,
e.insertAt(insertPos, newEntry)
return
}

idx := clv.findIndex(entry.timestamp)
// If the changelog has reached its maximum size and the new change would be inserted as the oldest,
// there is no need to add the new change. We can simply return without making any modifications.
if len(clv.changes) >= clv.maxSize && idx == 0 {
return
//
// as there is no space, replace an entry
//

replaceIdx := indexes[len(indexes)-1] // default index to replace
if t.After(e.entries[replaceIdx].t) {
// reallocate values to the left
e.shiftLeft(indexes)
} else {
// find the correct position to store the entry
replaceIdx = e.findInsertIdx(indexes, t) - 1
if replaceIdx == -1 {
replaceIdx = 0
}
}
// Insert the new entry in the changelog, keeping the list sorted by timestamp.
clv.changes = append(clv.changes, item[T]{})
copy(clv.changes[idx+1:], clv.changes[idx:])
clv.changes[idx] = entry
// Mark the timestamp as set.
clv.timestamps[targetTime] = struct{}{}

clv.enforceSizeBoundary()
e.entries[replaceIdx] = newEntry
}

// findIndex returns the index of the first item in the changelog that is after the given time.
func (clv *Changelog[T]) findIndex(target time.Time) int {
left, right := 0, len(clv.changes)
// Get retrieves the value of the entry for the specified `MemberKind` at or before the given timestamp.
// If no matching entry is found, it returns the default value for the entry type.
func (e *Entries[T]) Get(k MemberKind, timestamp time.Time) T {
for i := len(e.entries) - 1; i >= 0; i-- {
if e.entries[i].k != k {
continue
}

for left < right {
middle := (left + right) / 2
if clv.changes[middle].timestamp.After(target) {
right = middle
} else {
left = middle + 1
if e.entries[i].t.Before(timestamp) || e.entries[i].t.Equal(timestamp) {
return e.entries[i].value
}
}

return left
return getZero[T]()
}

// enforceSizeBoundary ensures that the size of the inner array doesn't exceed the limit.
// It applies two methods to reduce the log size to the maximum allowed:
// 1. Unite duplicate values that are trailing one another, removing the oldest of the pair.
// 2. Remove the oldest logs as they are likely less important.

func (clv *Changelog[T]) enforceSizeBoundary() {
if len(clv.changes) <= clv.maxSize {
// Nothing to do
return
// GetCurrent retrieves the most recent value for the specified `MemberKind`.
// If no entry is found, it returns the default value for the entry type.
func (e *Entries[T]) GetCurrent(k MemberKind) T {
for i := len(e.entries) - 1; i >= 0; i-- {
if e.entries[i].k == k {
return e.entries[i].value
}
}

boundaryDiff := len(clv.changes) - clv.maxSize
changed := false

// Compact the slice in place
writeIdx := 0
for readIdx := 0; readIdx < len(clv.changes); readIdx++ {
nextIdx := readIdx + 1
if nextIdx < len(clv.changes) &&
clv.changes[nextIdx].value == clv.changes[readIdx].value &&
boundaryDiff > 0 {
// Remove the oldest (readIdx) from the duplicate pair
delete(clv.timestamps, clv.changes[readIdx].timestamp)
boundaryDiff--
changed = true
continue
}
return getZero[T]()
}

// If elements have been removed or moved, update the map and the slice
if changed {
clv.changes[writeIdx] = clv.changes[readIdx]
// GetAll retrieves all values for the specified `MemberKind`, from the newest to the oldest.
func (e *Entries[T]) GetAll(k MemberKind) []T {
values := make([]T, e.Count(k))
for i := len(e.entries) - 1; i >= 0; i-- {
if e.entries[i].k == k {
values = append(values, e.entries[i].value)
}

writeIdx++
}

if changed {
clear(clv.changes[writeIdx:])
clv.changes = clv.changes[:writeIdx]
return values
}

// Count returns the number of entries recorded for the specified `MemberKind`.
func (e *Entries[T]) Count(k MemberKind) int {
count := 0
for _, entry := range e.entries {
if entry.k == k {
count++
}
}

if len(clv.changes) <= clv.maxSize {
// Size is within limits after compaction
return
return count
}

// findInsertIdx finds the correct index to insert a new entry based on its timestamp.
func (e *Entries[T]) findInsertIdx(indexes []int, t time.Time) int {
for i := len(indexes) - 1; i >= 0; i-- {
if e.entries[indexes[i]].t.Before(t) {
return indexes[i] + 1
}
}

// As it still exceeds maxSize, remove the oldest entries in the remaining slice
boundaryDiff = len(clv.changes) - clv.maxSize
for i := 0; i < boundaryDiff; i++ {
delete(clv.timestamps, clv.changes[i].timestamp)
return len(indexes)
}

// insertAt inserts a new entry at the specified index in the entries list.
func (e *Entries[T]) insertAt(idx int, newEntry entry[T]) {
e.entries = append(e.entries[:idx], append([]entry[T]{newEntry}, e.entries[idx:]...)...)
}

// shiftLeft shifts entries within the given indexes to the left, discarding the oldest entry.
func (e *Entries[T]) shiftLeft(indexes []int) {
for i := 0; i < len(indexes)-1; i++ {
e.entries[indexes[i]] = e.entries[indexes[i+1]]
}
clear(clv.changes[:boundaryDiff])
clv.changes = clv.changes[boundaryDiff:]
}

// returnZero returns the zero value of the type T.
func returnZero[T any]() T {
// getZero returns the zero value for the type `T`.
func getZero[T comparable]() T {
var zero T
return zero
}
Loading

0 comments on commit 3d0c7a4

Please sign in to comment.