Skip to content

Commit

Permalink
clean up twilight-http integration
Browse files Browse the repository at this point in the history
  • Loading branch information
vilgotf committed Feb 23, 2025
1 parent abe5924 commit 30cb12f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 51 deletions.
4 changes: 2 additions & 2 deletions twilight-http/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2999,9 +2999,9 @@ impl Client {
.flatten();

Ok(if let Some(ratelimiter) = &self.ratelimiter {
let rx = ratelimiter.acquire(ratelimit_path);
let permit_future = ratelimiter.acquire(ratelimit_path);

ResponseFuture::ratelimit(invalid_token, inner, self.timeout, rx)
ResponseFuture::ratelimit(invalid_token, inner, self.timeout, permit_future)
} else {
ResponseFuture::new(Box::pin(time::timeout(self.timeout, inner)), invalid_token)
})
Expand Down
102 changes: 53 additions & 49 deletions twilight-http/src/response/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,43 @@ use twilight_http_ratelimiting::{Permit, PermitFuture, RateLimitHeaders};

type Output<T> = Result<Response<T>, Error>;

/// Parse ratelimit headers from a map of headers.
///
/// # Errors
///
/// Errors if a required header is missing or if a header value is of an
/// invalid type.
fn parse_ratelimit_headers(
headers: &HeaderMap,
) -> Result<Option<RateLimitHeaders>, Box<dyn std::error::Error>> {
let bucket = headers.get(RateLimitHeaders::BUCKET);
let limit = headers.get(RateLimitHeaders::LIMIT);
let remaining = headers.get(RateLimitHeaders::REMAINING);
let reset_after = headers.get(RateLimitHeaders::RESET_AFTER);

if bucket.is_none() && limit.is_none() && remaining.is_none() && reset_after.is_none() {
return Ok(None);
}

let bucket = bucket.ok_or("missing bucket header")?.as_bytes().to_vec();
let limit = limit.ok_or("missing limit header")?.to_str()?.parse()?;
let remaining = remaining
.ok_or("missing remaining header")?
.to_str()?
.parse()?;
let reset_after = reset_after
.ok_or("missing reset-after header")?
.to_str()?
.parse()?;

Ok(Some(RateLimitHeaders {
bucket,
limit,
remaining,
reset_at: Instant::now() + Duration::from_secs_f32(reset_after),
}))
}

enum InnerPoll<T> {
Advance(ResponseFutureStage),
Pending(ResponseFutureStage),
Expand Down Expand Up @@ -74,49 +111,11 @@ impl Failed {
struct InFlight {
future: Pin<Box<Timeout<HyperResponseFuture>>>,
invalid_token: Option<Arc<AtomicBool>>,
tx: Option<Permit>,
permit: Option<Permit>,
}

impl InFlight {
fn poll<T>(mut self, cx: &mut Context<'_>) -> InnerPoll<T> {
fn headers(
headers: &HeaderMap,
on_err: impl Fn(&dyn std::error::Error),
) -> Option<RateLimitHeaders> {
let bucket = headers.get(RateLimitHeaders::BUCKET)?.as_bytes().to_vec();
let limit = headers
.get(RateLimitHeaders::LIMIT)?
.to_str()
.inspect_err(|e| on_err(e))
.ok()?
.parse()
.inspect_err(|e| on_err(e))
.ok()?;
let remaining = headers
.get(RateLimitHeaders::REMAINING)?
.to_str()
.inspect_err(|e| on_err(e))
.ok()?
.parse()
.inspect_err(|e| on_err(e))
.ok()?;
let reset = headers
.get(RateLimitHeaders::RESET_AFTER)?
.to_str()
.inspect_err(|e| on_err(e))
.ok()?
.parse()
.inspect_err(|e| on_err(e))
.ok()?;

Some(RateLimitHeaders {
bucket,
limit,
remaining,
reset_at: Instant::now() + Duration::from_secs_f32(reset),
})
}

let resp = match Pin::new(&mut self.future).poll(cx) {
Poll::Ready(Ok(Ok(resp))) => resp,
Poll::Ready(Ok(Err(source))) => {
Expand All @@ -143,10 +142,15 @@ impl InFlight {
}
}

if let Some(tx) = self.tx {
tx.complete(headers(resp.headers(), |e| {
tracing::warn!("header parsing failed: {e}; {resp:?}");
}));
if let Some(permit) = self.permit {
match parse_ratelimit_headers(resp.headers()) {
Ok(v) => permit.complete(v),
Err(source) => {
tracing::warn!("header parsing failed: {source}; {resp:?}");

permit.complete(None);
}
}
}

let status = resp.status();
Expand Down Expand Up @@ -196,12 +200,12 @@ struct RatelimitQueue {
response_future: HyperResponseFuture,
timeout: Duration,
pre_flight_check: Option<Box<dyn FnOnce() -> bool + Send + 'static>>,
rx: PermitFuture,
permit_future: PermitFuture,
}

impl RatelimitQueue {
fn poll<T>(mut self, cx: &mut Context<'_>) -> InnerPoll<T> {
let Poll::Ready(tx) = Pin::new(&mut self.rx).poll(cx) else {
let Poll::Ready(permit) = Pin::new(&mut self.permit_future).poll(cx) else {
return InnerPoll::Pending(ResponseFutureStage::RatelimitQueue(self));
};

Expand All @@ -217,7 +221,7 @@ impl RatelimitQueue {
InnerPoll::Advance(ResponseFutureStage::InFlight(InFlight {
future: Box::pin(time::timeout(self.timeout, self.response_future)),
invalid_token: self.invalid_token,
tx: Some(tx),
permit: Some(permit),
}))
}
}
Expand Down Expand Up @@ -287,7 +291,7 @@ impl<T> ResponseFuture<T> {
stage: ResponseFutureStage::InFlight(InFlight {
future,
invalid_token,
tx: None,
permit: None,
}),
}
}
Expand Down Expand Up @@ -369,7 +373,7 @@ impl<T> ResponseFuture<T> {
invalid_token: Option<Arc<AtomicBool>>,
response_future: HyperResponseFuture,
timeout: Duration,
wait_for_sender: PermitFuture,
permit_future: PermitFuture,
) -> Self {
Self {
phantom: PhantomData,
Expand All @@ -378,7 +382,7 @@ impl<T> ResponseFuture<T> {
response_future,
timeout,
pre_flight_check: None,
rx: wait_for_sender,
permit_future,
}),
}
}
Expand Down

0 comments on commit 30cb12f

Please sign in to comment.