-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaccumulator.go
131 lines (110 loc) · 3.1 KB
/
accumulator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package main
import (
"errors"
"sync"
)
var (
//ErrBigWindow happens when we try to get more datapoints to average what
//we can put in our accumulator. Good place to panic
ErrBigWindow = errors.New("averaging window is bigger than accumulator")
//ErrSmallSample happens when we just began and got not enough datapoints
//to fill our window. Normal working situation.
ErrSmallSample = errors.New("not enough data saved")
//ErrWrongSize happens when signed int was expected to be positive but
//ended up zero or negative. Sad, sad situation. Panic.
ErrWrongSize = errors.New("size can not be less than one")
)
//Acc is a write-only ring buffer of finite and static capacity, with methods that
//provide sum and average of last n values pushed inside
type Acc struct {
head int //as Acc is read-only, there is no need to know position of tail
vals []int64
mu sync.RWMutex
full bool
}
//NewAcc returns accumulator of given size
func NewAcc(s int) *Acc {
if s < 1 {
panic(ErrWrongSize)
}
return &Acc{
vals: make([]int64, s, s),
}
}
//Push adds new value into Acc, overwriting old ones and wrapping
//around the ring as needed
func (a *Acc) Push(v int64) {
a.mu.Lock()
defer a.mu.Unlock()
a.vals[a.head] = v
if a.head == cap(a.vals)-1 {
a.full = true
a.head = 0
} else {
a.head = a.head + 1
}
}
//Sum returns sum total of deltas in latest window of given size
func (a *Acc) Sum(w int) (sum int64, err error) {
//if window is bigger than our accumulator, we fail horribly
if w > cap(a.vals) {
return 0, ErrBigWindow
}
//Critical section - DeltaAcc should not chage while we are in it
a.mu.RLock()
defer a.mu.RUnlock()
//if we don't have enough data to fill window, we fail, less horribly
if !a.full && a.head < w {
return 0, ErrSmallSample
}
//if we need to get bits from different ends of our slice, let it be so
if a.head < w {
for _, v := range a.vals[:a.head] {
sum += v
}
//pointer math, circular buffer, yay!
for _, v := range a.vals[cap(a.vals)+a.head-w:] {
sum += v
}
} else { //sane, classic situation - window is inside the slice
for _, v := range a.vals[a.head-w : a.head] {
sum += v
}
}
return sum, nil
}
//Average returns sum average of deltas in latest window of given size
func (a *Acc) Average(w int) (avg float32, err error) {
sum, err := a.Sum(w)
avg = float32(sum) / float32(w)
return
}
//DeltaAcc accumulates changes between pushed values.
//For DeltaAcc amount of remembered points is either size of underlying Acc or
//pushed amount minus one.
type DeltaAcc struct {
last uint64
Acc
initd bool
}
//NewDeltaAcc returns delta-accumulator of given size
func NewDeltaAcc(s int) *DeltaAcc {
return &DeltaAcc{
Acc: *NewAcc(s),
}
}
//Push takes a value and adds difference between it and previous value into
//accumulator, with circular overwrite on full capacity
//First delta happens only after two values were pushed in
func (a *DeltaAcc) Push(v uint64) {
defer func() { a.last = v }()
if !a.initd {
a.initd = true
return
}
if a.last >= v {
a.Acc.Push(-int64(a.last - v))
} else { //stupid unsigned math
a.Acc.Push(int64(v - a.last))
}
}