From 7427822bb0b113b34cf15a03028337314d4093cf Mon Sep 17 00:00:00 2001 From: David Sherret Date: Thu, 4 Jul 2024 19:45:52 -0400 Subject: [PATCH 1/5] fix: make waker in `Shared` `Send` and `Sync` --- Cargo.toml | 3 ++- src/future.rs | 29 ++++++++++++++++------------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 869ad2e..d4992a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deno_unsync" -version = "0.3.7" +version = "0.3.8" edition = "2021" authors = ["the Deno authors"] license = "MIT" @@ -9,6 +9,7 @@ description = "A collection of adapters to make working with Tokio single-thread readme = "README.md" [dependencies] +parking_lot = "0.12.3" tokio = { version = "1", features = ["rt"] } [dev-dependencies] diff --git a/src/future.rs b/src/future.rs index 55488a7..ef23609 100644 --- a/src/future.rs +++ b/src/future.rs @@ -4,10 +4,12 @@ use std::cell::RefCell; use std::future::Future; use std::pin::Pin; use std::rc::Rc; +use std::sync::Arc; use std::task::Context; use std::task::RawWaker; use std::task::RawWakerVTable; use std::task::Waker; +use parking_lot::Mutex; use crate::Flag; @@ -57,7 +59,7 @@ where struct SharedLocalInner { data: RefCell>, - child_waker_state: Rc, + child_waker_state: Arc, } impl std::fmt::Debug for SharedLocalInner @@ -103,7 +105,7 @@ where data: RefCell::new(SharedLocalData { future_or_output: FutureOrOutput::Future(future), }), - child_waker_state: Rc::new(ChildWakerState { + child_waker_state: Arc::new(ChildWakerState { can_poll: Flag::raised(), wakers: Default::default(), }), @@ -154,20 +156,20 @@ where } #[derive(Debug, Default)] -struct WakerStore(RefCell>); +struct WakerStore(Mutex>); impl WakerStore { pub fn take_all(&self) -> Vec { - let mut wakers = self.0.borrow_mut(); + let mut wakers = self.0.lock(); std::mem::take(&mut *wakers) } pub fn clone_all(&self) -> Vec { - self.0.borrow().clone() + self.0.lock().clone() } pub fn push(&self, waker: Waker) { - self.0.borrow_mut().push(waker); + self.0.lock().push(waker); } } @@ -177,9 +179,10 @@ struct ChildWakerState { wakers: WakerStore, } -fn create_child_waker(state: Rc) -> Waker { +// Wakers must implement Send + Sync +fn create_child_waker(state: Arc) -> Waker { let raw_waker = RawWaker::new( - Rc::into_raw(state) as *const (), + Arc::into_raw(state) as *const (), &RawWakerVTable::new( clone_waker, wake_waker, @@ -191,7 +194,7 @@ fn create_child_waker(state: Rc) -> Waker { } unsafe fn clone_waker(data: *const ()) -> RawWaker { - Rc::increment_strong_count(data as *const ChildWakerState); + Arc::increment_strong_count(data as *const ChildWakerState); RawWaker::new( data, &RawWakerVTable::new( @@ -204,7 +207,7 @@ unsafe fn clone_waker(data: *const ()) -> RawWaker { } unsafe fn wake_waker(data: *const ()) { - let state = Rc::from_raw(data as *const ChildWakerState); + let state = Arc::from_raw(data as *const ChildWakerState); state.can_poll.raise(); let wakers = state.wakers.take_all(); drop(state); @@ -215,10 +218,10 @@ unsafe fn wake_waker(data: *const ()) { } unsafe fn wake_by_ref_waker(data: *const ()) { - let state = Rc::from_raw(data as *const ChildWakerState); + let state = Arc::from_raw(data as *const ChildWakerState); state.can_poll.raise(); let wakers = state.wakers.clone_all(); - let _ = Rc::into_raw(state); // keep it alive + let _ = Arc::into_raw(state); // keep it alive for waker in wakers { waker.wake_by_ref(); @@ -226,7 +229,7 @@ unsafe fn wake_by_ref_waker(data: *const ()) { } unsafe fn drop_waker(data: *const ()) { - Rc::decrement_strong_count(data as *const ChildWakerState); + Arc::decrement_strong_count(data as *const ChildWakerState); } #[cfg(test)] From 755df96332a0540254cf53c127067c93515b47d8 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Thu, 4 Jul 2024 19:47:38 -0400 Subject: [PATCH 2/5] format --- src/future.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/future.rs b/src/future.rs index ef23609..1bfd4a5 100644 --- a/src/future.rs +++ b/src/future.rs @@ -1,5 +1,6 @@ // Copyright 2018-2024 the Deno authors. MIT license. +use parking_lot::Mutex; use std::cell::RefCell; use std::future::Future; use std::pin::Pin; @@ -9,7 +10,6 @@ use std::task::Context; use std::task::RawWaker; use std::task::RawWakerVTable; use std::task::Waker; -use parking_lot::Mutex; use crate::Flag; From f5fb45bc39e39186f6e9bff83c455836238cbc2c Mon Sep 17 00:00:00 2001 From: David Sherret Date: Thu, 4 Jul 2024 19:57:21 -0400 Subject: [PATCH 3/5] fix clippy --- Cargo.toml | 2 +- src/flag.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++++ src/future.rs | 6 +++--- src/lib.rs | 1 + 4 files changed, 51 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d4992a7..c343d27 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deno_unsync" -version = "0.3.8" +version = "0.3.9" edition = "2021" authors = ["the Deno authors"] license = "MIT" diff --git a/src/flag.rs b/src/flag.rs index 293eac1..5cad34f 100644 --- a/src/flag.rs +++ b/src/flag.rs @@ -1,6 +1,8 @@ // Copyright 2018-2024 the Deno authors. MIT license. use std::cell::Cell; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; /// A flag with interior mutability that can be raised or lowered. /// Useful for indicating if an event has occurred. @@ -29,6 +31,33 @@ impl Flag { } } +/// Simplifies the use of an atomic boolean as a flag. +#[derive(Debug, Default)] +pub struct AtomicFlag(AtomicBool); + +impl AtomicFlag { + /// Creates a new flag that's raised. + pub fn raised() -> AtomicFlag { + Self(AtomicBool::new(true)) + } + + /// Raises the flag returning if the raise was successful. + pub fn raise(&self) -> bool { + !self.0.swap(true, Ordering::SeqCst) + } + + /// Lowers the flag returning if the lower was successful. + pub fn lower(&self) -> bool { + self.0.swap(false, Ordering::SeqCst) + } + + /// Gets if the flag is raised. + pub fn is_raised(&self) -> bool { + self.0.load(Ordering::SeqCst) + } + +} + #[cfg(test)] mod test { use super::*; @@ -46,4 +75,21 @@ mod test { assert!(!flag.lower()); assert!(!flag.is_raised()); } + + #[test] + fn atomic_flag_raises_lowers() { + let flag = AtomicFlag::default(); + assert!(!flag.is_raised()); // false by default + assert!(flag.raise()); + assert!(flag.is_raised()); + assert!(!flag.raise()); + assert!(flag.is_raised()); + assert!(flag.lower()); + assert!(flag.raise()); + assert!(flag.lower()); + assert!(!flag.lower()); + let flag = AtomicFlag::raised(); + assert!(flag.is_raised()); + assert!(flag.lower()); + } } diff --git a/src/future.rs b/src/future.rs index 1bfd4a5..82cbc7b 100644 --- a/src/future.rs +++ b/src/future.rs @@ -11,7 +11,7 @@ use std::task::RawWaker; use std::task::RawWakerVTable; use std::task::Waker; -use crate::Flag; +use crate::AtomicFlag; impl LocalFutureExt for T where T: Future {} @@ -106,7 +106,7 @@ where future_or_output: FutureOrOutput::Future(future), }), child_waker_state: Arc::new(ChildWakerState { - can_poll: Flag::raised(), + can_poll: AtomicFlag::raised(), wakers: Default::default(), }), })) @@ -175,7 +175,7 @@ impl WakerStore { #[derive(Debug)] struct ChildWakerState { - can_poll: Flag, + can_poll: AtomicFlag, wakers: WakerStore, } diff --git a/src/lib.rs b/src/lib.rs index 0297f28..f5eeecb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ mod task_queue; mod waker; pub use flag::Flag; +pub use flag::AtomicFlag; pub use joinset::JoinSet; pub use split::split_io; pub use split::IOReadHalf; From 84393a91605c0a1163c2ffa11db20ee3d91fc59d Mon Sep 17 00:00:00 2001 From: David Sherret Date: Thu, 4 Jul 2024 19:58:39 -0400 Subject: [PATCH 4/5] fix --- src/flag.rs | 1 - src/lib.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/flag.rs b/src/flag.rs index 5cad34f..76e0e74 100644 --- a/src/flag.rs +++ b/src/flag.rs @@ -55,7 +55,6 @@ impl AtomicFlag { pub fn is_raised(&self) -> bool { self.0.load(Ordering::SeqCst) } - } #[cfg(test)] diff --git a/src/lib.rs b/src/lib.rs index f5eeecb..e1a6ef3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,8 +9,8 @@ mod task; mod task_queue; mod waker; -pub use flag::Flag; pub use flag::AtomicFlag; +pub use flag::Flag; pub use joinset::JoinSet; pub use split::split_io; pub use split::IOReadHalf; From efa0e1af76a65cb64d8b3a625f792b366687ad91 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Thu, 4 Jul 2024 20:03:39 -0400 Subject: [PATCH 5/5] Update to suggestion to impl Waker --- Cargo.toml | 2 +- src/future.rs | 65 ++++++++++++--------------------------------------- 2 files changed, 16 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c343d27..3baf1c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deno_unsync" -version = "0.3.9" +version = "0.3.10" edition = "2021" authors = ["the Deno authors"] license = "MIT" diff --git a/src/future.rs b/src/future.rs index 82cbc7b..d264508 100644 --- a/src/future.rs +++ b/src/future.rs @@ -7,8 +7,7 @@ use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use std::task::Context; -use std::task::RawWaker; -use std::task::RawWakerVTable; +use std::task::Wake; use std::task::Waker; use crate::AtomicFlag; @@ -130,8 +129,7 @@ where FutureOrOutput::Future(fut) => { self.0.child_waker_state.wakers.push(cx.waker().clone()); if self.0.child_waker_state.can_poll.lower() { - let child_waker = - create_child_waker(self.0.child_waker_state.clone()); + let child_waker = Waker::from(self.0.child_waker_state.clone()); let mut child_cx = Context::from_waker(&child_waker); let fut = unsafe { Pin::new_unchecked(fut) }; match fut.poll(&mut child_cx) { @@ -179,59 +177,26 @@ struct ChildWakerState { wakers: WakerStore, } -// Wakers must implement Send + Sync -fn create_child_waker(state: Arc) -> Waker { - let raw_waker = RawWaker::new( - Arc::into_raw(state) as *const (), - &RawWakerVTable::new( - clone_waker, - wake_waker, - wake_by_ref_waker, - drop_waker, - ), - ); - unsafe { Waker::from_raw(raw_waker) } -} - -unsafe fn clone_waker(data: *const ()) -> RawWaker { - Arc::increment_strong_count(data as *const ChildWakerState); - RawWaker::new( - data, - &RawWakerVTable::new( - clone_waker, - wake_waker, - wake_by_ref_waker, - drop_waker, - ), - ) -} +impl Wake for ChildWakerState { + fn wake(self: Arc) { + self.can_poll.raise(); + let wakers = self.wakers.take_all(); -unsafe fn wake_waker(data: *const ()) { - let state = Arc::from_raw(data as *const ChildWakerState); - state.can_poll.raise(); - let wakers = state.wakers.take_all(); - drop(state); - - for waker in wakers { - waker.wake(); + for waker in wakers { + waker.wake(); + } } -} -unsafe fn wake_by_ref_waker(data: *const ()) { - let state = Arc::from_raw(data as *const ChildWakerState); - state.can_poll.raise(); - let wakers = state.wakers.clone_all(); - let _ = Arc::into_raw(state); // keep it alive + fn wake_by_ref(self: &Arc) { + self.can_poll.raise(); + let wakers = self.wakers.clone_all(); - for waker in wakers { - waker.wake_by_ref(); + for waker in wakers { + waker.wake_by_ref(); + } } } -unsafe fn drop_waker(data: *const ()) { - Arc::decrement_strong_count(data as *const ChildWakerState); -} - #[cfg(test)] mod test { use std::sync::Arc;