diff --git a/src/trigger.rs b/src/trigger.rs index b62998c..c968cbc 100644 --- a/src/trigger.rs +++ b/src/trigger.rs @@ -8,7 +8,7 @@ use std::{ use pin_project_lite::pin_project; use slab::Slab; -type WakerList = Arc>>; +type WakerList = Arc>>>; type TriggerState = Arc; #[derive(Debug, Clone)] @@ -34,7 +34,7 @@ impl Subscriber { return SubscriberState::Triggered; } - let waker = cx.waker().clone(); + let waker = Some(cx.waker().clone()); SubscriberState::Waiting(if let Some(key) = key { tracing::trace!("trigger::Subscriber: updating waker for key: {}", key); @@ -125,8 +125,8 @@ impl Future for Receiver { #[derive(Debug, Clone)] pub struct Sender { - wakers: WakerList, state: TriggerState, + wakers: WakerList, } impl Sender { @@ -135,11 +135,24 @@ impl Sender { } pub fn trigger(&self) { - let wakers = self.wakers.lock().unwrap(); - self.state.store(true, std::sync::atomic::Ordering::SeqCst); - for (key, waker) in wakers.iter() { - tracing::trace!("trigger::Sender: wake up waker with key: {}", key); - waker.wake_by_ref(); + if self.state.swap(true, std::sync::atomic::Ordering::SeqCst) { + return; + } + + let mut wakers = self.wakers.lock().unwrap(); + for (key, waker) in wakers.iter_mut() { + match waker.take() { + Some(waker) => { + tracing::trace!("trigger::Sender: wake up waker with key: {}", key); + waker.wake(); + } + None => { + tracing::trace!( + "trigger::Sender: nop: waker already triggered with key: {}", + key + ); + } + } } } }