Skip to content

Commit

Permalink
perf(gateway): optimize CommandRatelimiter size (#2395)
Browse files Browse the repository at this point in the history
Switches from storing absolute timestamps to relative ones to `delay` as
milliseconds (tokio's timer resolution). This reduces the item size from
12 bytes to just 2 (as 60,000 ms fits in a `u16`) and adds a negligible
amount of bookkeeping work in rebasing the timestamps on `delay`
elapsing. From this I also made the first relative timestamp implicit
from whether `delay` is elapsed or not (instead of being zero).

Additionally, this PR fixes the following minor bugs:

* always store waker in `poll_pending` (always call `Sleep::poll`)
* `slice::partition_point` may return any matching element, but we
depend upon the first being returned (it should be impossible to acquire
two permits at the same time in practice, since we send a gateway
command between each acquisition)
* `Vec::with_capacity` being allowed to allocate more than the requested
amount whereas we require the amount to be exact (the current
implementation is exact, however)

TODO:
- [x] Update doc comments
- [x] ~~Explain algorithm more in code comments, the logic is complex~~
split logic into self-explanatory methods
- [x] add more tests for new edge cases.

---------

Co-authored-by: Erk <[email protected]>
  • Loading branch information
vilgotf and Erk- authored Dec 27, 2024
1 parent 093eca2 commit 7c05997
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 83 deletions.
187 changes: 108 additions & 79 deletions twilight-gateway/src/ratelimiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,120 +7,152 @@
//! [`CommandRatelimiter`] is implemented as a sliding window log. This is the
//! only ratelimit algorithm that supports burst requests and guarantees that
//! the (t - [`PERIOD`], t] window is never exceeded. See
//! <https://hechao.li/2018/06/25/Rate-Limiter-Part1> for an overview of it and
//! <https://hechao.li/posts/Rate-Limiter-Part1/> for an overview of it and
//! other alternative algorithms.
#![allow(clippy::cast_possible_truncation)]

use std::{
collections::VecDeque,
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::time::{sleep_until, Duration, Instant, Sleep};

/// Number of commands allowed in a [`PERIOD`].
const COMMANDS_PER_PERIOD: u8 = 120;
use tokio::time::{Duration, Instant, Sleep};

/// Gateway ratelimiter period duration.
/// Duration until an acquired permit is released.
const PERIOD: Duration = Duration::from_secs(60);

/// Number of permits per [`PERIOD`].
const PERMITS: u8 = 120;

/// Ratelimiter for sending commands over the gateway to Discord.
#[derive(Debug)]
pub struct CommandRatelimiter {
/// Future that completes the next time the ratelimiter allows a permit.
/// Future that completes when the next permit is released.
///
/// Counts as an acquired permit if pending.
delay: Pin<Box<Sleep>>,
/// Ordered queue of instants when a permit elapses.
instants: Vec<Instant>,
/// Ordered queue of timestamps relative to [`Self::delay`] in milliseconds
/// when permits release.
queue: VecDeque<u16>,
}

impl CommandRatelimiter {
/// Create a new ratelimiter with some capacity reserved for heartbeating.
pub(crate) fn new(heartbeat_interval: Duration) -> Self {
let allotted = nonreserved_commands_per_reset(heartbeat_interval);

let now = Instant::now();
let mut delay = Box::pin(sleep_until(now));

// Hack to register the timer.
delay.as_mut().reset(now);
let capacity = usize::from(nonreserved_commands_per_reset(heartbeat_interval)) - 1;

let mut queue = VecDeque::with_capacity(capacity);
if queue.capacity() != capacity {
queue.resize(capacity, 0);
// `into_boxed_slice().into_vec()` guarantees len == capacity.
let vec = Vec::from(queue).into_boxed_slice().into_vec();
// This is guaranteed to not allocate.
queue = VecDeque::from(vec);
queue.clear();
}

Self {
delay,
instants: Vec::with_capacity(allotted.into()),
delay: Box::pin(tokio::time::sleep_until(Instant::now())),
queue,
}
}

/// Number of available permits.
#[allow(clippy::cast_possible_truncation)]
pub fn available(&self) -> u8 {
let now = Instant::now();
let elapsed_permits = self.instants.partition_point(|&elapsed| elapsed <= now);
let used_permits = self.instants.len() - elapsed_permits;
let acquired = if now >= self.delay.deadline() {
self.next_acquired_position(now)
.map_or(0, |released_count| self.queue.len() - released_count)
} else {
self.queue.len() + 1
};

self.max() - used_permits as u8
self.max() - acquired as u8
}

/// Maximum number of available permits.
#[allow(clippy::cast_possible_truncation)]
pub fn max(&self) -> u8 {
self.instants.capacity() as u8
self.queue.capacity() as u8 + 1
}

/// Duration until the next permit is available.
pub fn next_available(&self) -> Duration {
self.instants.first().map_or(Duration::ZERO, |elapsed| {
elapsed.saturating_duration_since(Instant::now())
})
self.delay
.deadline()
.saturating_duration_since(Instant::now())
}

/// Polls for a permit.
///
/// # Return value
/// Attempts to acquire a permit.
///
/// The function returns:
/// # Returns
///
/// * `Poll::Pending` if the ratelimiter is full
/// * `Poll::Ready` if a permit was granted.
/// * `Poll::Pending` if no permit is available
/// * `Poll::Ready` if a permit is acquired.
pub(crate) fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<()> {
let now = ready!(self.poll_ready(cx)).unwrap_or_else(Instant::now);
self.instants.push(now + PERIOD);
ready!(self.poll_available(cx));

let now = Instant::now();
if now >= self.delay.deadline() {
if let Some(new_deadline_idx) = self.next_acquired_position(now) {
self.rebase(new_deadline_idx);
} else {
self.queue.clear();
self.delay.as_mut().reset(now + PERIOD);

return Poll::Ready(());
}
}

let releases = (now + PERIOD) - self.delay.deadline();
debug_assert_ne!(self.queue.capacity(), self.queue.len());
self.queue.push_back(releases.as_millis() as u16);

if self.queue.len() == self.queue.capacity() {
tracing::debug!(duration = ?(self.delay.deadline() - now), "ratelimited");
}

Poll::Ready(())
}

/// Polls for readiness.
/// Checks whether a permit is available.
///
/// # Return value
/// # Returns
///
/// The function returns:
///
/// * `Poll::Pending` if the ratelimiter is full
/// * `Poll::Ready<Option(now)>` if the ratelimiter has spare capacity.
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Option<Instant>> {
if self.instants.len() != self.instants.capacity() {
return Poll::Ready(None);
/// * `Poll::Pending` if no permit is available
/// * `Poll::Ready` if a permit is available.
pub(crate) fn poll_available(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if self.queue.len() < self.queue.capacity() {
return Poll::Ready(());
}

if !self.delay.is_elapsed() {
return Poll::Pending;
}
self.delay.as_mut().poll(cx)
}

let new_deadline = self.instants[0];
let now = Instant::now();
if new_deadline > now {
tracing::debug!(duration = ?(new_deadline - now), "ratelimited");
self.delay.as_mut().reset(new_deadline);
_ = self.delay.as_mut().poll(cx);
/// Searches for the first acquired timestamp, returning its index.
///
/// If every timestamp is released, it returns `None`.
fn next_acquired_position(&self, now: Instant) -> Option<usize> {
self.queue
.iter()
.map(|&m| self.delay.deadline() + Duration::from_millis(m.into()))
.position(|deadline| deadline > now)
}

Poll::Pending
} else {
let elapsed_permits = self.instants.partition_point(|&elapsed| elapsed <= now);
let used_permits = self.instants.len() - elapsed_permits;
/// Resets to a new deadline and updates acquired permits' relative timestamp.
fn rebase(&mut self, new_deadline_idx: usize) {
let duration = Duration::from_millis(self.queue[new_deadline_idx].into());
let new_deadline = self.delay.deadline() + duration;

self.instants.rotate_right(used_permits);
self.instants.truncate(used_permits);
self.queue.drain(..=new_deadline_idx);

Poll::Ready(Some(now))
for timestamp in &mut self.queue {
let deadline = self.delay.deadline() + Duration::from_millis((*timestamp).into());
*timestamp = (deadline - new_deadline).as_millis() as u16;
}

self.delay.as_mut().reset(new_deadline);
}
}

Expand All @@ -135,20 +167,20 @@ impl CommandRatelimiter {
fn nonreserved_commands_per_reset(heartbeat_interval: Duration) -> u8 {
/// Guard against faulty gateways specifying low heartbeat intervals by
/// maximally reserving this many heartbeats per [`PERIOD`].
const MAX_NONRESERVED_COMMANDS_PER_PERIOD: u8 = COMMANDS_PER_PERIOD - 10;
const MAX_NONRESERVED_COMMANDS_PER_PERIOD: u8 = PERMITS - 10;

// Calculate the amount of heartbeats per heartbeat interval.
let heartbeats_per_reset = PERIOD.as_secs_f32() / heartbeat_interval.as_secs_f32();

// Round up to be on the safe side.
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
let heartbeats_per_reset = heartbeats_per_reset.ceil() as u8;

// Reserve an extra heartbeat just in case.
let heartbeats_per_reset = heartbeats_per_reset.saturating_add(1);

// Subtract the reserved heartbeats from the total available events.
let nonreserved_commands_per_reset = COMMANDS_PER_PERIOD.saturating_sub(heartbeats_per_reset);
let nonreserved_commands_per_reset = PERMITS.saturating_sub(heartbeats_per_reset);

// Take the larger value between this and the guard value.
nonreserved_commands_per_reset.max(MAX_NONRESERVED_COMMANDS_PER_PERIOD)
Expand All @@ -158,7 +190,7 @@ fn nonreserved_commands_per_reset(heartbeat_interval: Duration) -> u8 {
mod tests {
use super::{nonreserved_commands_per_reset, CommandRatelimiter, PERIOD};
use static_assertions::assert_impl_all;
use std::{fmt::Debug, future::poll_fn, task::Poll, time::Duration};
use std::{fmt::Debug, future::poll_fn, time::Duration};
use tokio::time;

assert_impl_all!(CommandRatelimiter: Debug, Send, Sync);
Expand Down Expand Up @@ -245,25 +277,22 @@ mod tests {
}

#[tokio::test(start_paused = true)]
async fn spurious_poll() {
async fn rebase() {
let mut ratelimiter = CommandRatelimiter::new(HEARTBEAT_INTERVAL);

for _ in 0..ratelimiter.max() {
for _ in 0..5 {
time::advance(Duration::from_millis(20)).await;
poll_fn(|cx| ratelimiter.poll_acquire(cx)).await;
}
assert_eq!(ratelimiter.available(), 0);
assert_eq!(ratelimiter.available(), ratelimiter.max() - 5);

// Spuriously poll after registering the waker but before the timer has
// fired.
poll_fn(|cx| {
if ratelimiter.poll_ready(cx).is_ready() {
return Poll::Ready(());
};
let deadline = ratelimiter.delay.deadline();
assert!(ratelimiter.poll_ready(cx).is_pending());
assert_eq!(deadline, ratelimiter.delay.deadline(), "deadline was reset");
Poll::Pending
})
.await;
time::advance(PERIOD - Duration::from_millis(80)).await;
assert_eq!(ratelimiter.available(), ratelimiter.max() - 4);

for _ in 0..4 {
poll_fn(|cx| ratelimiter.poll_acquire(cx)).await;
time::advance(Duration::from_millis(20)).await;
assert_eq!(ratelimiter.available(), ratelimiter.max() - 4);
}
}
}
7 changes: 3 additions & 4 deletions twilight-gateway/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,10 +618,9 @@ impl<Q: Queue> Shard<Q> {
continue;
}

let not_ratelimited = self
.ratelimiter
.as_mut()
.map_or(true, |ratelimiter| ratelimiter.poll_ready(cx).is_ready());
let not_ratelimited = self.ratelimiter.as_mut().map_or(true, |ratelimiter| {
ratelimiter.poll_available(cx).is_ready()
});

if not_ratelimited {
if let Some(Poll::Ready(canceled)) = self
Expand Down

0 comments on commit 7c05997

Please sign in to comment.