forked from RussellLuo/slidingwindow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathslidingwindow.go
149 lines (118 loc) · 3.94 KB
/
slidingwindow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package slidingwindow
import (
"sync"
"time"
)
// Window represents a fixed-window.
type Window interface {
// Start returns the start boundary.
Start() time.Time
// Count returns the accumulated count.
Count() int64
// AddCount increments the accumulated count by n.
AddCount(n int64)
// Reset sets the state of the window with the given settings.
Reset(s time.Time, c int64)
// Sync tries to exchange data between the window and the central
// datastore at time now, to keep the window's count up-to-date.
Sync(now time.Time)
}
// StopFunc stops the window's sync behaviour.
type StopFunc func()
// NewWindow creates a new window, and returns a function to stop
// the possible sync behaviour within it.
type NewWindow func() (Window, StopFunc)
type Limiter struct {
size time.Duration
limit int64
mu sync.Mutex
curr Window
prev Window
}
// NewLimiter creates a new limiter, and returns a function to stop
// the possible sync behaviour within the current window.
func NewLimiter(size time.Duration, limit int64, newWindow NewWindow) (*Limiter, StopFunc) {
currWin, currStop := newWindow()
// The previous window is static (i.e. no add changes will happen within it),
// so we always create it as an instance of LocalWindow.
//
// In this way, the whole limiter, despite containing two windows, now only
// consumes at most one goroutine for the possible sync behaviour within
// the current window.
prevWin, _ := NewLocalWindow()
lim := &Limiter{
size: size,
limit: limit,
curr: currWin,
prev: prevWin,
}
return lim, currStop
}
// CurrentWindowCount returns the accumulated count of the curr window.
func (lim *Limiter) CurrentWindowCount() int64 {
return lim.curr.Count()
}
// PreviousWindowCount returns the accumulated count of the prev window.
func (lim *Limiter) PreviousWindowCount() int64 {
return lim.prev.Count()
}
// Size returns the time duration of one window size. Note that the size
// is defined to be read-only, if you need to change the size,
// create a new limiter with a new size instead.
func (lim *Limiter) Size() time.Duration {
return lim.size
}
// Limit returns the maximum events permitted to happen during one window size.
func (lim *Limiter) Limit() int64 {
lim.mu.Lock()
defer lim.mu.Unlock()
return lim.limit
}
// SetLimit sets a new Limit for the limiter.
func (lim *Limiter) SetLimit(newLimit int64) {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.limit = newLimit
}
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
func (lim *Limiter) AllowN(now time.Time, n int64) bool {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.advance(now)
elapsed := now.Sub(lim.curr.Start())
weight := float64(lim.size-elapsed) / float64(lim.size)
count := int64(weight*float64(lim.prev.Count())) + lim.curr.Count()
// Trigger the possible sync behaviour.
defer lim.curr.Sync(now)
if count+n > lim.limit {
return false
}
lim.curr.AddCount(n)
return true
}
// advance updates the current/previous windows resulting from the passage of time.
func (lim *Limiter) advance(now time.Time) {
// Calculate the start boundary of the expected current-window.
newCurrStart := now.Truncate(lim.size)
diffSize := newCurrStart.Sub(lim.curr.Start()) / lim.size
if diffSize >= 1 {
// The current-window is at least one-window-size behind the expected one.
newPrevCount := int64(0)
if diffSize == 1 {
// The new previous-window will overlap with the old current-window,
// so it inherits the count.
//
// Note that the count here may be not accurate, since it is only a
// SNAPSHOT of the current-window's count, which in itself tends to
// be inaccurate due to the asynchronous nature of the sync behaviour.
newPrevCount = lim.curr.Count()
}
lim.prev.Reset(newCurrStart.Add(-lim.size), newPrevCount)
// The new current-window always has zero count.
lim.curr.Reset(newCurrStart, 0)
}
}