Skip to content

Commit

Permalink
Merge pull request #4 from jwells131313/block_1
Browse files Browse the repository at this point in the history
The blocking implementation finished
  • Loading branch information
jwells131313 authored Jul 26, 2023
2 parents 863569f + 10fd4a6 commit 4a67af8
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea/*
coverage.out
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ build:
# go get -u golang.org/x/lint/golint
golint -set_exit_status ./...
go test -v ./...

coverage:
go test -coverprofile=coverage.out ./...
go tool cover -html=coverage.out
22 changes: 22 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Changelog
All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

## [1.0.0] - 2023-07-26
### Changed
- Added a version that throttles full blocks
- Updated go version to 1.20
- Added Makefile
- Removed outdated werker build from README

## [0.1.0] - 2018-09-30
### Changed
- Fixed bugs

## [0.0.1] - 2018-09-19
### Changed
- Initial version
111 changes: 109 additions & 2 deletions rate/leakybucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ type Limiter interface {
// return 0 then the limiter is empty
Take() (uint64, time.Duration)

// Returns the limit this limiter will attempt to achieve in
// GetLimit Returns the limit this limiter will attempt to achieve in
// elements per second
GetLimit() uint64

// Sets a new limit in elements per second and returns the old limit.
// SetLimit Sets a new limit in elements per second and returns the old limit.
// Note that a limit of 0 is converted to a limit of 1
SetLimit(uint64) uint64

Expand All @@ -49,16 +49,37 @@ type limiterData struct {
limit uint64
limitAsFloat float64
boostedLimit float64
takeByBlock bool

currentBucketSize uint64
lastEvent *time.Time
replies *repliesList
blocks []block
nextBlockTime *time.Time
}

// Option is an option for New that allows the limiter to
// be configured
type Option func(l *limiterData)

// TakeByBlock if this is set then the elements MUST be returned
// in the exact sizes as given to the Add function. This is useful
// for cases where the incoming stream is a block-based protocol and
// the packets must not be split up. In this mode the Take method will
// always return the exact sequence of bytes given by Add in the same order
// and may return large blocking times to ensure that the total number of elements
// does not exceed the limit over time. However, in this mode it is
// possible for there to be spikes over the limit since the block itself
// could be larger than the limit (e.g., if my limit is 10 and the next block
// is 20 then when the system gives out 20 it will be over the limit for that
// time period but the next operation will return an appropriate wait duration
// to keep the total throughput at the limit)
func TakeByBlock() Option {
return func(l *limiterData) {
l.takeByBlock = true
}
}

// New creates a new Limiter with the limit in elements
// per second. If limit is 0 it will be set to 1
func New(limit uint64, opts ...Option) Limiter {
Expand All @@ -74,6 +95,7 @@ func New(limit uint64, opts ...Option) Limiter {
limitAsFloat: limitAsFloat,
boostedLimit: limitAsFloat * boostFactor,
replies: &repliesList{},
blocks: make([]block, 0),
}

for _, opt := range opts {
Expand All @@ -95,11 +117,20 @@ func (ld *limiterData) Add(chunkSize uint64) {
ld.lock.Lock()
defer ld.lock.Unlock()

if ld.takeByBlock {
ld.blocks = append(ld.blocks, block{chunkSize})
return
}

currentSize := ld.currentBucketSize
ld.currentBucketSize = currentSize + chunkSize
}

func (ld *limiterData) Take() (uint64, time.Duration) {
if ld.takeByBlock {
return ld.doTakeByBlock()
}

ld.lock.Lock()
defer ld.lock.Unlock()

Expand Down Expand Up @@ -167,6 +198,77 @@ func (ld *limiterData) Take() (uint64, time.Duration) {
return output, delay
}

func (ld *limiterData) doTakeByBlock() (uint64, time.Duration) {
ld.lock.Lock()
defer ld.lock.Unlock()

now := ld.clock.Now()

if ld.nextBlockTime != nil {
nbt := *ld.nextBlockTime

if now.Before(nbt) {
// user did not in fact wait long enough, return the difference
return 0, nbt.Sub(now)
}

// user waited enough time, just go on with the protocol
ld.nextBlockTime = nil
}

previousSecondSize := ld.replies.calculateAndCut(now)
if previousSecondSize <= ld.limit {
if len(ld.blocks) == 0 {
return 0, 0
}

block := ld.blocks[0]
ld.blocks = ld.blocks[1:]

blockSize := block.size

// not at limit for this second yet
ld.replies.add(blockSize, now)

totalThisSecond := blockSize + previousSecondSize
if totalThisSecond > ld.limit {
// we JUST pushed it over, need to set the future wait time
ld.setNextBlockTime(now, float64(totalThisSecond))
}

return blockSize, 0
}

overTime := ld.setNextBlockTime(now, float64(previousSecondSize))

return 0, overTime
}

func (ld *limiterData) setNextBlockTime(now time.Time, knownSize float64) time.Duration {
overTime := time.Duration(knownSize / (ld.limitAsFloat / float64(time.Second)))

oldestRecordTime := ld.replies.lastTime()
if oldestRecordTime == nil {
// does not seem possible to get into here
nextBlockTime := now.Add(overTime)
ld.nextBlockTime = &nextBlockTime

return overTime
}

durationServed := now.Sub(*oldestRecordTime)

overTime = overTime - durationServed
if overTime < 1 {
overTime = 1
}

nextBlockTime := now.Add(overTime)
ld.nextBlockTime = &nextBlockTime

return overTime
}

func (ld *limiterData) GetLimit() uint64 {
ld.lock.Lock()
defer ld.lock.Unlock()
Expand Down Expand Up @@ -226,6 +328,7 @@ func (rl *repliesList) lastTime() *time.Time {
return &rl.tail.replied
}

// calculateAndCut returns total amount sent in last second
func (rl *repliesList) calculateAndCut(now time.Time) uint64 {
if rl.head == nil {
return 0
Expand Down Expand Up @@ -266,6 +369,10 @@ type repliesEntry struct {
next *repliesEntry
}

type block struct {
size uint64
}

type defaultClock struct {
}

Expand Down
181 changes: 181 additions & 0 deletions rate/leakybucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,187 @@ func TestChangeLimit(t *testing.T) {
assert.Equal(t, 0, int(delay))
}

func TestBlockLimitBasic(t *testing.T) {
mc := &mockClock{}
mc.nextNow = time.Now()

limiter := New(100, WithClock(mc), TakeByBlock())

limiter.Add(200)
limiter.Add(10)

numTaken, waitDuration := limiter.Take()
assert.Equal(t, uint64(200), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(time.Second / 2) // add a half second and call again, should not be enough time

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, 1*time.Second+time.Second/2, waitDuration)

mc.nextNow = mc.nextNow.Add(waitDuration) // this should get us over the expected wait time

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(10), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(time.Second / 4) // another quarter second for veracity

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)
}

func TestBlockLimitMultipleTakesInOneSecond(t *testing.T) {
mc := &mockClock{}
mc.nextNow = time.Now()

limiter := New(100, WithClock(mc), TakeByBlock())

limiter.Add(10)
limiter.Add(10)
limiter.Add(10)
limiter.Add(71) // a total of 101
limiter.Add(10) // one past

numTaken, waitDuration := limiter.Take()
assert.Equal(t, uint64(10), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(10), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(10), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(71), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(1009999996), waitDuration) // 1.01 seconds minus 4 nanoseconds

mc.nextNow = mc.nextNow.Add(waitDuration)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(10), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(time.Second)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)
}

func TestBlockLimitMultipleTakesInterlevedAdds(t *testing.T) {
mc := &mockClock{}
mc.nextNow = time.Now()

limiter := New(100, WithClock(mc), TakeByBlock())

limiter.Add(10)

numTaken, waitDuration := limiter.Take()
assert.Equal(t, uint64(10), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

limiter.Add(10)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(10), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

limiter.Add(81)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(81), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(100)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(1009999898), waitDuration)

limiter.Add(100)

mc.nextNow = mc.nextNow.Add(time.Second)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(9999898), waitDuration)

mc.nextNow = mc.nextNow.Add(waitDuration)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(100), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)
}

func TestBlockLimitMultipleLargeInitialAdd(t *testing.T) {
mc := &mockClock{}
mc.nextNow = time.Now()

limiter := New(100, WithClock(mc), TakeByBlock())

limiter.Add(1000)

numTaken, waitDuration := limiter.Take()
assert.Equal(t, uint64(1000), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(time.Second)

limiter.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, 9*time.Second, waitDuration)

mc.nextNow = mc.nextNow.Add(waitDuration)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(1), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)

mc.nextNow = mc.nextNow.Add(1)

numTaken, waitDuration = limiter.Take()
assert.Equal(t, uint64(0), numTaken)
assert.Equal(t, time.Duration(0), waitDuration)
}

type mockClock struct {
nextNow time.Time
}
Expand Down

0 comments on commit 4a67af8

Please sign in to comment.