Skip to content

Commit

Permalink
improve Sender part of trigger
Browse files Browse the repository at this point in the history
(only trigger once)
  • Loading branch information
glendc committed Sep 4, 2023
1 parent 58cc740 commit 7211f3e
Showing 1 changed file with 21 additions and 8 deletions.
29 changes: 21 additions & 8 deletions src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use pin_project_lite::pin_project;
use slab::Slab;

type WakerList = Arc<Mutex<Slab<Waker>>>;
type WakerList = Arc<Mutex<Slab<Option<Waker>>>>;
type TriggerState = Arc<AtomicBool>;

#[derive(Debug, Clone)]
Expand All @@ -34,7 +34,7 @@ impl Subscriber {
return SubscriberState::Triggered;
}

let waker = cx.waker().clone();
let waker = Some(cx.waker().clone());

SubscriberState::Waiting(if let Some(key) = key {
tracing::trace!("trigger::Subscriber: updating waker for key: {}", key);
Expand Down Expand Up @@ -125,8 +125,8 @@ impl Future for Receiver {

#[derive(Debug, Clone)]
pub struct Sender {
wakers: WakerList,
state: TriggerState,
wakers: WakerList,
}

impl Sender {
Expand All @@ -135,11 +135,24 @@ impl Sender {
}

pub fn trigger(&self) {
let wakers = self.wakers.lock().unwrap();
self.state.store(true, std::sync::atomic::Ordering::SeqCst);
for (key, waker) in wakers.iter() {
tracing::trace!("trigger::Sender: wake up waker with key: {}", key);
waker.wake_by_ref();
if self.state.swap(true, std::sync::atomic::Ordering::SeqCst) {
return;
}

let mut wakers = self.wakers.lock().unwrap();
for (key, waker) in wakers.iter_mut() {
match waker.take() {
Some(waker) => {
tracing::trace!("trigger::Sender: wake up waker with key: {}", key);
waker.wake();
}
None => {
tracing::trace!(
"trigger::Sender: nop: waker already triggered with key: {}",
key
);
}
}
}
}
}
Expand Down

0 comments on commit 7211f3e

Please sign in to comment.