Skip to content

Commit

Permalink
feat: Add throttle (#241)
Browse files Browse the repository at this point in the history
Throttles `iterable` at a rate of `limit` per `interval` without discarding data. Useful for throttling rate limited APIs.
- `limit` can be greater than 0 but less than `Infinity`.
- `interval` can be greater than or equal to 0 but less than `Infinity`.
  • Loading branch information
richardscarrott authored Feb 6, 2022
1 parent c2b1d73 commit ea8d1ff
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 0 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Since this works with async iterators it requires node 10 or higher.
- [`reduce()`](#reduce)
- [`take()`](#take)
- [`tap()`](#tap)
- [`throttle()`](#throttle)
- [`time()`](#time)
- [`transform()`](#transform)
- [`writeToStream()`](#writetostream)
Expand Down Expand Up @@ -463,6 +464,26 @@ function tap<T>(func: (data: T) => any, iterable: AnyIterable<T>): AsyncIterable

Returns a new iterator that yields the data it consumes, passing the data through to a function. If you provide an async function, the iterator will wait for the promise to resolve before yielding the value. This is useful for logging, or processing information and passing it along.

### throttle
```ts
function throttle<T>(limit: number, interval: number, iterable: AnyIterable<T>): AsyncGenerator<T>
```

Throttles `iterable` at a rate of `limit` per `interval` without discarding data. Useful for throttling rate limited APIs.

`limit` can be greater than 0 but less than `Infinity`.
`interval` can be greater than or equal to 0 but less than `Infinity`.

```ts
import { throttle } from 'streaming-iterables'
import { getPokemon, trainMonster } from 'iterable-pokedex'
// load monsters at a maximum rate of 1 per second
for await (const monster of throttle(1, 1000, getPokemon())) {
await trainMonster(monster)
}
```

### time
```ts
function time<T>(config?: ITimeConfig, iterable: AsyncIterable<R>): AsyncIterableIterator<R>
Expand Down
1 change: 1 addition & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export { pipeline } from './pipeline'
export { reduce } from './reduce'
export { take, CurriedTakeResult } from './take'
export { tap } from './tap'
export { throttle } from './throttle'
export { time, TimeConfig, CurriedTimeResult } from './time'
export { transform } from './transform'
export { writeToStream, WritableStreamish } from './write-to-stream'
199 changes: 199 additions & 0 deletions lib/throttle-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import { assert } from 'chai'
import { throttle } from './throttle'
import { promiseImmediate } from './util-test'
import * as sinon from 'sinon'

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))

async function* asyncNumbers(max: number) {
let num = 1
while (num <= max) {
yield await promiseImmediate(num)
num++
}
}

function* numbers(max: number) {
let num = 1
while (num <= max) {
yield num
num++
}
}

describe('throttle', () => {
let clock: ReturnType<typeof sinon.useFakeTimers>

beforeEach(() => {
clock = sinon.useFakeTimers()
})

afterEach(() => {
clock.restore()
})

async function* withTimestamp<T>(iterable: AsyncIterable<T>) {
for await (const value of iterable) {
yield { value, timestamp: `${clock.now}ms` }
}
}

it('throws if `limit` is not a finite number', () => {
assert.throws(() => throttle('1' as any, 1000, numbers(5)), 'Expected `limit` to be a finite number')
assert.throws(() => throttle(-Infinity, 1000)(numbers(5)), 'Expected `limit` to be a finite number')
assert.throws(() => throttle(-Infinity, 1000)(numbers(5)), 'Expected `limit` to be a finite number')
})

it('throws if `interval` is not a finite number', () => {
assert.throws(() => throttle(1, '1000' as any, numbers(5)), 'Expected `interval` to be a finite number')
assert.throws(() => throttle(1, -Infinity, numbers(5)), 'Expected `interval` to be a finite number')
assert.throws(() => throttle(1, Infinity)(numbers(5)), 'Expected `interval` to be a finite number')
})

it('throws if limit is <= 0', () => {
assert.throws(() => throttle(0, 1000, numbers(5)), 'Expected `limit` to be greater than 0')
assert.throws(() => throttle(-1, 1000, numbers(5)), 'Expected `limit` to be greater than 0')
})

it('throttles sync iterators, 1 every 1s', async () => {
const src = withTimestamp(throttle(1, 1000, numbers(5)))
const promisedValues = new Promise(async resolve => {
const vals: any[] = []
for await (const value of src) {
vals.push(value)
}
resolve(vals)
})
clock.runAllAsync()
const values = await promisedValues
assert.deepEqual(values, [
{ value: 1, timestamp: '0ms' },
{ value: 2, timestamp: '1000ms' },
{ value: 3, timestamp: '2000ms' },
{ value: 4, timestamp: '3000ms' },
{ value: 5, timestamp: '4000ms' },
])
assert.equal((await src.next()).done, true)
})

it('throttles async iterators, 1 every 1s', async () => {
const src = withTimestamp(throttle(1, 1000, asyncNumbers(5)))
const promisedValues = new Promise(async resolve => {
const vals: any[] = []
for await (const value of src) {
vals.push(value)
}
resolve(vals)
})
clock.runAllAsync()
const values = await promisedValues
assert.deepEqual(values, [
{ value: 1, timestamp: '0ms' },
{ value: 2, timestamp: '1000ms' },
{ value: 3, timestamp: '2000ms' },
{ value: 4, timestamp: '3000ms' },
{ value: 5, timestamp: '4000ms' },
])
assert.equal((await src.next()).done, true)
})

it('throttles async iterators, 2 every 1s', async () => {
const src = withTimestamp(throttle(2, 1000, asyncNumbers(5)))
const promisedValues = new Promise(async resolve => {
const vals: any[] = []
for await (const value of src) {
vals.push(value)
}
resolve(vals)
})
clock.runAllAsync()
const values = await promisedValues
assert.deepEqual(values, [
{ value: 1, timestamp: '0ms' },
{ value: 2, timestamp: '0ms' },
{ value: 3, timestamp: '1000ms' },
{ value: 4, timestamp: '1000ms' },
{ value: 5, timestamp: '2000ms' },
])
assert.equal((await src.next()).done, true)
})

it('throttles async iterators, 4 every 6s', async () => {
const src = withTimestamp(throttle(4, 6000, asyncNumbers(9)))
const promisedValues = new Promise(async resolve => {
const vals: any[] = []
for await (const value of src) {
vals.push(value)
}
resolve(vals)
})
clock.runAllAsync()
const values = await promisedValues
assert.deepEqual(values, [
{ value: 1, timestamp: '0ms' },
{ value: 2, timestamp: '0ms' },
{ value: 3, timestamp: '0ms' },
{ value: 4, timestamp: '0ms' },
{ value: 5, timestamp: '6000ms' },
{ value: 6, timestamp: '6000ms' },
{ value: 7, timestamp: '6000ms' },
{ value: 8, timestamp: '6000ms' },
{ value: 9, timestamp: '12000ms' },
])
})

it('avoids over-throttling if the consumer is slower than the throttling config', async () => {
const src = withTimestamp(throttle(1, 500, asyncNumbers(7)))
const promisedValues = new Promise(async resolve => {
const vals: any[] = []
vals.push((await src.next()).value) // 1
await sleep(1000)
vals.push((await src.next()).value) // 2
await sleep(250)
vals.push((await src.next()).value) // 3
await sleep(3000)
vals.push((await src.next()).value) // 4
await sleep(100)
vals.push((await src.next()).value) // 5
await sleep(1000)
vals.push((await src.next()).value) // 6
await sleep(60000)
vals.push((await src.next()).value) // 7
resolve(vals)
})
clock.runAllAsync()
const values = await promisedValues
assert.deepEqual(values, [
{ value: 1, timestamp: '0ms' },
{ value: 2, timestamp: '1000ms' },
{ value: 3, timestamp: '1500ms' }, // throttled
{ value: 4, timestamp: '4500ms' },
{ value: 5, timestamp: '5000ms' }, // throttled
{ value: 6, timestamp: '6000ms' },
{ value: 7, timestamp: '66000ms' },
])
assert.equal((await src.next()).done, true)
})

it('is curryable', async () => {
const throttle3PerSecond = throttle(3, 1000)
const src = withTimestamp(throttle3PerSecond(asyncNumbers(5)))
const promisedValues = new Promise(async resolve => {
const vals: any[] = []
for await (const value of src) {
vals.push(value)
}
resolve(vals)
})
clock.runAllAsync()
const values = await promisedValues
assert.deepEqual(values, [
{ value: 1, timestamp: '0ms' },
{ value: 2, timestamp: '0ms' },
{ value: 3, timestamp: '0ms' },
{ value: 4, timestamp: '1000ms' },
{ value: 5, timestamp: '1000ms' },
])
assert.equal((await src.next()).done, true)
})
})
49 changes: 49 additions & 0 deletions lib/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/// <reference lib="esnext.asynciterable" />
import { AnyIterable } from './types'

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))

function _throttle<T>(limit: number, interval: number, iterable: AnyIterable<T>) {
if (!Number.isFinite(limit)) {
throw new TypeError('Expected `limit` to be a finite number')
}
if (limit <= 0) {
throw new TypeError('Expected `limit` to be greater than 0')
}
if (!Number.isFinite(interval)) {
throw new TypeError('Expected `interval` to be a finite number')
}
return (async function* __throttle() {
let sent = 0
let time: number | undefined
for await (const val of iterable) {
if (sent < limit) {
if (typeof time === 'undefined') {
time = Date.now()
}
sent++
yield val
continue
}
// Only wait if the interval hasn't already passed while we were
// yielding the previous values.
const elapsedMs = Date.now() - time!
const waitFor = interval - elapsedMs
if (waitFor > 0) {
await sleep(waitFor)
}
time = Date.now()
sent = 1
yield val
}
})()
}

export function throttle<T>(limit: number, interval: number): (iterable: AnyIterable<T>) => AsyncGenerator<T>
export function throttle<T>(limit: number, interval: number, iterable: AnyIterable<T>): AsyncGenerator<T>
export function throttle<T>(limit: number, interval: number, iterable?: AnyIterable<T>) {
if (iterable === undefined) {
return (curriedIterable: AnyIterable<T>) => _throttle(limit, interval, curriedIterable)
}
return _throttle(limit, interval, iterable)
}

0 comments on commit ea8d1ff

Please sign in to comment.