diff --git a/src/intrusive_double_linked_list.rs b/src/intrusive_double_linked_list.rs index f87f3e1..7b01644 100644 --- a/src/intrusive_double_linked_list.rs +++ b/src/intrusive_double_linked_list.rs @@ -81,6 +81,25 @@ impl LinkedList { } } + /// Adds a node at the back of the linked list. + /// + /// Safety: This function is only safe as long as `node` is guaranteed to + /// get removed from the list before it gets moved or dropped. + /// In addition to this `node` may not be added to another other list before + /// it is removed from the current one. + pub unsafe fn add_back(&mut self, node: &mut ListNode) { + node.next = None; + node.prev = self.tail; + match self.tail { + Some(mut tail) => tail.as_mut().next = Some(node.into()), + None => {} + }; + self.tail = Some(node.into()); + if self.head.is_none() { + self.head = Some(node.into()); + } + } + /// Returns a reference to the first node in the linked list /// The function is only safe as long as valid pointers are stored inside /// the linked list. @@ -321,6 +340,71 @@ impl LinkedList { } } } + + /// Iterate the list in reverse order by calling a callback on each list node + /// and determining what to do based on the control flow. + pub fn reverse_apply_while(&mut self, mut func: F) + where + F: FnMut(&mut ListNode) -> ControlFlow, + { + let mut current = self.tail; + + while let Some(mut node) = current { + // Safety: We are exclusively using nodes that are already contained + // in the list so they will contain valid data. The nodes can also + // not be added to the list again during iteration, since the list + // is mutably borrowed. + unsafe { + let flow = func(node.as_mut()); + match flow { + ControlFlow::Continue => { + current = node.as_mut().prev; + } + ControlFlow::Stop => return, + ControlFlow::RemoveAndStop + | ControlFlow::RemoveAndContinue => { + let node = node.as_mut(); + match node.prev { + Some(mut prev) => { + prev.as_mut().next = node.next; + } + None => { + self.head = node.next; + } + } + match node.next { + Some(mut next) => { + next.as_mut().prev = node.prev; + } + None => { + self.tail = node.prev; + } + } + if let ControlFlow::RemoveAndStop = flow { + return; + } else { + current = node.prev; + } + } + } + } + } + } +} + +/// The outcome of a callback. +pub enum ControlFlow { + /// Continue the iteration. + Continue, + + /// Stop the iteration. + Stop, + + /// Remove the current entry and stop the iteration. + RemoveAndStop, + + /// Remove the current entry and continue the iteration. + RemoveAndContinue, } #[cfg(all(test, feature = "alloc"))] // Tests make use of Vec at the moment diff --git a/src/sync/mod.rs b/src/sync/mod.rs index b4a11bf..63b8789 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -41,3 +41,20 @@ pub use self::semaphore::{ Semaphore, SemaphoreAcquireFuture, SemaphoreReleaser, SharedSemaphore, SharedSemaphoreAcquireFuture, SharedSemaphoreReleaser, }; + +mod rwlock; + +pub use self::rwlock::{ + GenericRwLock, GenericRwLockReadFuture, GenericRwLockReadGuard, + GenericRwLockUpgradableReadFuture, GenericRwLockUpgradableReadGuard, + GenericRwLockWriteFuture, GenericRwLockWriteGuard, LocalRwLock, + LocalRwLockReadFuture, LocalRwLockReadGuard, + LocalRwLockUpgradableReadFuture, LocalRwLockUpgradableReadGuard, + LocalRwLockWriteFuture, LocalRwLockWriteGuard, +}; + +#[cfg(feature = "std")] +pub use self::rwlock::{ + RwLock, RwLockReadFuture, RwLockReadGuard, RwLockUpgradableReadFuture, + RwLockUpgradableReadGuard, RwLockWriteFuture, RwLockWriteGuard, +}; diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs new file mode 100644 index 0000000..138b3f2 --- /dev/null +++ b/src/sync/rwlock.rs @@ -0,0 +1,1457 @@ +//! An asynchronously awaitable mutex for synchronization between concurrently +//! executing futures. + +use crate::{ + intrusive_double_linked_list::{ControlFlow, LinkedList, ListNode}, + utils::update_waker_ref, + NoopLock, +}; +use core::{ + cell::UnsafeCell, + ops::{Deref, DerefMut}, + pin::Pin, +}; +use futures_core::{ + future::{FusedFuture, Future}, + task::{Context, Poll, Waker}, +}; +use lock_api::{Mutex as LockApiMutex, RawMutex}; + +/// Tracks how the future had interacted with the mutex +#[derive(Debug, PartialEq)] +enum PollState { + /// The task has never interacted with the mutex. + New, + + /// The task was added to the wait queue at the mutex. + Waiting, + + /// The task had previously waited on the mutex, but was notified + /// that the mutex was released in the meantime. + Notified, + + /// The task had been polled to completion. + Done, +} + +/// The kind of entry. +#[derive(Debug, PartialEq)] +enum EntryKind { + /// An exclusive write access. + Write, + + /// A shared read access. + Read, + + /// A upgradable shared read access. + /// + /// While only one `UpgradeRead` guard can be open at a time, it can be + /// open alongside any number of `Read` guards. + UpgradeRead, +} + +/// Tracks the MutexLockFuture waiting state. +/// Access to this struct is synchronized through the mutex in the Event. +#[derive(Debug)] +struct Entry { + /// The task handle of the waiting task + task: Option, + + /// Current polling state + state: PollState, + + /// The kind of entry. + kind: EntryKind, +} + +impl Entry { + /// Creates a new Entry + fn new(kind: EntryKind) -> Entry { + Entry { + task: None, + state: PollState::New, + kind, + } + } + + /// Update the state & wakeup if possible. + fn notify(&mut self) { + self.state = PollState::Notified; + + if let Some(waker) = self.task.take() { + waker.wake(); + } + } +} + +/// Keeps track of the hypothetical future state of the `MutexState` +/// if all the notified waiters were to acquire their respective +/// locks. +#[derive(Debug)] +struct FilterWaiters { + nb_waiting_reads: usize, + nb_waiting_upgrade_reads: usize, + nb_reads: usize, + has_upgrade_read: bool, + remove_notified: bool, +} + +impl FilterWaiters { + /// Wakeup as many waiters without contention. + fn filter(&mut self, entry: &mut ListNode) -> ControlFlow { + match entry.kind { + EntryKind::Read => { + // Since a writer lock is exclusive, it will end the + // iteration, implying that there is not writer lock. + // Therefore we can notify every read lock waiter. + entry.notify(); + self.nb_reads += 1; + self.nb_waiting_reads -= 1; + + if self.nb_waiting_reads == 0 + && (self.has_upgrade_read + || self.nb_waiting_upgrade_reads == 0) + { + // This is the last reader. + if self.remove_notified { + ControlFlow::RemoveAndStop + } else { + ControlFlow::Stop + } + } else if self.remove_notified { + ControlFlow::RemoveAndContinue + } else { + ControlFlow::Continue + } + } + EntryKind::UpgradeRead => { + if self.has_upgrade_read { + if self.nb_waiting_reads > 0 { + ControlFlow::Continue + } else { + // This was the last reader. + ControlFlow::Stop + } + } else { + // Like with the `Read` variant, we know that the lock + // is currently shared. + entry.notify(); + self.has_upgrade_read = true; + self.nb_reads += 1; + self.nb_waiting_upgrade_reads -= 1; + if self.nb_waiting_reads == 0 { + // This was the last reader. + if self.remove_notified { + ControlFlow::RemoveAndStop + } else { + ControlFlow::Stop + } + } else if self.remove_notified { + ControlFlow::RemoveAndContinue + } else { + ControlFlow::Continue + } + } + } + EntryKind::Write => { + if self.nb_reads == 0 { + // We have the exclusive write lock so we end + // the iteration. + entry.notify(); + if self.remove_notified { + ControlFlow::RemoveAndStop + } else { + ControlFlow::Stop + } + } else if self.nb_waiting_reads > 0 + || (!self.has_upgrade_read + && self.nb_waiting_upgrade_reads > 0) + { + ControlFlow::Continue + } else { + // Iteration would be a waste of time. + ControlFlow::Stop + } + } + } + } +} + +/// Internal state of the `Mutex` +struct MutexState { + /// Whether this is a fair mutex, meaning the queue order + /// is respected. + is_fair: bool, + + /// The current number of open read lock guards. + nb_reads: usize, + + /// Whether a exclusive upgradable read guard is open. + has_upgrade_read: bool, + + /// Whether a exclusive writer guard is open. + has_write: bool, + + /// The waiter queue. + waiters: LinkedList, + + /// The number of write lock waiters in the queue. + nb_waiting_writes: usize, + + /// The number of read lock waiters in the queue. + nb_waiting_reads: usize, + + /// The number of upgradable_read lock waiters in the queue. + nb_waiting_upgrade_reads: usize, +} + +impl MutexState { + fn new(is_fair: bool) -> Self { + MutexState { + is_fair, + nb_reads: 0, + has_upgrade_read: false, + has_write: false, + waiters: LinkedList::new(), + nb_waiting_writes: 0, + nb_waiting_reads: 0, + nb_waiting_upgrade_reads: 0, + } + } + + /// Reduce the number of reads on the lock by one, waking + /// up waiters if needed. + /// + /// If the Mutex is not fair, removes the woken up node from + /// the wait queue + fn unlock_read(&mut self) { + self.nb_reads -= 1; + self.wakeup_waiters_if_needed(); + } + + /// Release the exclusive writer lock, waking up entries + /// if needed. + /// + /// If the Mutex is not fair, removes the woken up node from + /// the wait queue + fn unlock_write(&mut self) { + self.has_write = false; + debug_assert_eq!(self.nb_reads, 0); + + self.wakeup_waiters_if_needed() + } + + /// Release the upgradable reader lock, waking up entries + /// if needed. + /// + /// If the Mutex is not fair, removes the woken up node from + /// the wait queue + fn unlock_upgrade_read(&mut self) { + self.has_upgrade_read = false; + self.nb_reads -= 1; + self.wakeup_waiters_if_needed(); + } + + /// Release the upgradable reader lock to upgrade + /// it into a writer lock. + fn unlock_upgrade_read_for_upgrade(&mut self) { + self.has_upgrade_read = false; + self.nb_reads -= 1; + // We don't wakeup anyone because the upgrade has priority. + } + + /// Returns the number of read waiters that could acquire + /// the read lock immediately. + fn nb_immediate_read_waiters(&self) -> usize { + if self.has_write { + 0 + } else if self.has_upgrade_read { + self.nb_waiting_reads + } else { + self.nb_waiting_reads + 1 + } + } + + /// Wakeup waiters from back to front. + /// + /// If the mutex is unfair, notified entries are removed from the queue. + fn wakeup_waiters_if_needed(&mut self) { + // We never need to wakeup waiters if + // - an exclusive write lock is held. + // - the queue is empty. + // - there are readers but no immediate read waiters. + if self.has_write + || self.waiters.is_empty() + || (self.nb_reads > 0 && self.nb_immediate_read_waiters() == 0) + { + return; + } + let mut filter = FilterWaiters { + nb_waiting_reads: self.nb_waiting_reads, + nb_waiting_upgrade_reads: self.nb_waiting_upgrade_reads, + nb_reads: self.nb_reads, + has_upgrade_read: self.has_upgrade_read, + remove_notified: !self.is_fair, + }; + self.waiters + .reverse_apply_while(|entry| filter.filter(entry)); + } + + /// Attempt To gain shared read access. + /// + /// Returns true if the access is obtained. + fn try_lock_read_sync(&mut self) -> bool { + // The lock can only be obtained synchronously if + // - has no write + // - the Semaphore is either not fair, or there are no waiting writes. + if !self.has_write && (!self.is_fair || self.nb_waiting_writes == 0) { + self.nb_reads += 1; + true + } else { + false + } + } + + /// Attempt To gain upgradable shared read access. + /// + /// Returns true if the access is obtained. + fn try_lock_upgrade_read_sync(&mut self) -> bool { + // The lock can only be obtained synchronously if + // - has no write + // - the Semaphore is either not fair, or there are no waiting writes + // or upgradable reads + if !self.has_write + && !self.has_upgrade_read + && (!self.is_fair + || (self.nb_waiting_writes == 0 + && self.nb_waiting_upgrade_reads == 0)) + { + self.nb_reads += 1; + self.has_upgrade_read = true; + true + } else { + false + } + } + + /// Attempt to gain exclusive write access. + /// + /// Returns true if the access is obtained. + fn try_lock_write_sync(&mut self) -> bool { + // The lock can only be obtained synchronously if + // - has no write + // - has no read + // - the Semaphore is either not fair, or there are no waiting writes. + if !self.has_write + && self.nb_reads == 0 + && (!self.is_fair + || (self.nb_waiting_writes == 0 + && self.nb_waiting_upgrade_reads == 0)) + { + self.has_write = true; + true + } else { + false + } + } + + /// Attempt to gain exclusive write access by upgrading a upgradable_read + /// lock. + /// + /// Returns true if the access is obtained. + fn try_upgrade_read_sync(&mut self) -> bool { + // The lock can only be obtained synchronously if + // - has 1 read (the caller) + debug_assert!(self.has_upgrade_read); + if self.nb_reads == 1 { + self.has_write = true; + self.nb_reads -= 1; + self.has_upgrade_read = false; + true + } else { + false + } + } + + /// Add a read lock waiter to the wait queue. + /// + /// Safety: This function is only safe as long as `node` is guaranteed to + /// get removed from the list before it gets moved or dropped. + /// In addition to this `node` may not be added to another other list before + /// it is removed from the current one. + unsafe fn add_read_waiter(&mut self, wait_node: &mut ListNode) { + debug_assert_eq!(wait_node.kind, EntryKind::Read); + self.waiters.add_front(wait_node); + self.nb_waiting_reads += 1; + } + + /// Add a write lock waiter to the wait queue. + /// + /// If the write has priority, it will be added to the back instead + /// of the front, meaning it will wakeup first. + /// + /// Safety: This function is only safe as long as `node` is guaranteed to + /// get removed from the list before it gets moved or dropped. + /// In addition to this `node` may not be added to another other list before + /// it is removed from the current one. + unsafe fn add_write_waiter( + &mut self, + wait_node: &mut ListNode, + has_priority: bool, + ) { + debug_assert_eq!(wait_node.kind, EntryKind::Write); + if !has_priority { + self.waiters.add_front(wait_node); + } else { + self.waiters.add_back(wait_node); + } + self.nb_waiting_writes += 1; + } + + /// Add a upgradable_read lock waiter to the wait queue. + /// + /// Safety: This function is only safe as long as `node` is guaranteed to + /// get removed from the list before it gets moved or dropped. + /// In addition to this `node` may not be added to another other list before + /// it is removed from the current one. + unsafe fn add_upgrade_read_waiter( + &mut self, + wait_node: &mut ListNode, + ) { + debug_assert_eq!(wait_node.kind, EntryKind::UpgradeRead); + self.waiters.add_front(wait_node); + self.nb_waiting_upgrade_reads += 1; + } + + /// Tries to acquire the shared read access from a Entry. + /// + /// If it isn't available, the Entry gets added to the wait + /// queue at the Mutex, and will be signalled once ready. + /// This function is only safe as long as the `wait_node`s address is guaranteed + /// to be stable until it gets removed from the queue. + unsafe fn try_lock_read( + &mut self, + wait_node: &mut ListNode, + cx: &mut Context<'_>, + ) -> Poll<()> { + match wait_node.state { + PollState::New => { + // The fast path - the Mutex isn't locked by anyone else. + // If the mutex is fair, noone must be in the wait list before us. + if self.try_lock_read_sync() { + wait_node.state = PollState::Done; + Poll::Ready(()) + } else { + // Add the task to the wait queue + wait_node.task = Some(cx.waker().clone()); + wait_node.state = PollState::Waiting; + self.add_read_waiter(wait_node); + Poll::Pending + } + } + PollState::Waiting => { + // The RwLockReadFuture is already in the queue. + if self.is_fair { + // The task needs to wait until it gets notified in order to + // maintain the ordering. However the caller might have + // passed a different `Waker`. In this case we need to update it. + update_waker_ref(&mut wait_node.task, cx); + Poll::Pending + } else { + // For throughput improvement purposes, grab the lock immediately + // if it's available. + if !self.has_write { + self.nb_reads += 1; + wait_node.state = PollState::Done; + // Since this waiter has been registered before, it must + // get removed from the waiter list. + // Safety: Due to the state, we know that the node must be part + // of the waiter list + self.force_remove_read(wait_node); + Poll::Ready(()) + } else { + // The caller might have passed a different `Waker`. + // In this case we need to update it. + update_waker_ref(&mut wait_node.task, cx); + Poll::Pending + } + } + } + PollState::Notified => { + // We had been woken by the mutex, since the mutex is available again. + // The mutex thereby removed us from the waiters list. + // Just try to lock again. If the mutex isn't available, + // we need to add it to the wait queue again. + if !self.has_write { + if self.is_fair { + // In a fair Mutex, the Entry is kept in the + // linked list and must be removed here + // Safety: Due to the state, we know that the node must be part + // of the waiter list + self.force_remove_read(wait_node); + } + self.nb_reads += 1; + wait_node.state = PollState::Done; + Poll::Ready(()) + } else { + // Fair mutexes should always be able to acquire the lock + // after they had been notified + debug_assert!(!self.is_fair); + // Add to queue + wait_node.task = Some(cx.waker().clone()); + wait_node.state = PollState::Waiting; + self.add_read_waiter(wait_node); + Poll::Pending + } + } + PollState::Done => { + // The future had been polled to completion before + panic!("polled Mutex after completion"); + } + } + } + + /// Tries to acquire an exclusive write access from a Entry. + /// + /// If it isn't available, the Entry gets added to the wait + /// queue at the Mutex, and will be signalled once ready. + /// This function is only safe as long as the `wait_node`s address is guaranteed + /// to be stable until it gets removed from the queue. + unsafe fn try_lock_write( + &mut self, + wait_node: &mut ListNode, + cx: &mut Context<'_>, + has_priority: bool, + ) -> Poll<()> { + match wait_node.state { + PollState::New => { + // The fast path - the Mutex isn't locked by anyone else. + // If the mutex is fair, noone must be in the wait list before us. + if self.try_lock_write_sync() { + wait_node.state = PollState::Done; + Poll::Ready(()) + } else { + // Add the task to the wait queue + wait_node.task = Some(cx.waker().clone()); + wait_node.state = PollState::Waiting; + self.add_write_waiter(wait_node, has_priority); + Poll::Pending + } + } + PollState::Waiting => { + // The RwLockReadFuture is already in the queue. + if self.is_fair { + // The task needs to wait until it gets notified in order to + // maintain the ordering. However the caller might have + // passed a different `Waker`. In this case we need to update it. + update_waker_ref(&mut wait_node.task, cx); + Poll::Pending + } else { + // For throughput improvement purposes, grab the lock immediately + // if it's available. + if !self.has_write && self.nb_reads == 0 { + self.has_write = true; + wait_node.state = PollState::Done; + // Since this waiter has been registered before, it must + // get removed from the waiter list. + // Safety: Due to the state, we know that the node must be part + // of the waiter list + self.force_remove_write(wait_node); + Poll::Ready(()) + } else { + // The caller might have passed a different `Waker`. + // In this case we need to update it. + update_waker_ref(&mut wait_node.task, cx); + Poll::Pending + } + } + } + PollState::Notified => { + // We had been woken by the mutex, since the mutex is available again. + // The mutex thereby removed us from the waiters list. + // Just try to lock again. If the mutex isn't available, + // we need to add it to the wait queue again. + if !self.has_write && self.nb_reads == 0 { + if self.is_fair { + // In a fair Mutex, the Entry is kept in the + // linked list and must be removed here + // Safety: Due to the state, we know that the node must be part + // of the waiter list + self.force_remove_write(wait_node); + } + self.has_write = true; + wait_node.state = PollState::Done; + Poll::Ready(()) + } else { + // Fair mutexes should always be able to acquire the lock + // after they had been notified + debug_assert!(!self.is_fair); + // Add to queue + wait_node.task = Some(cx.waker().clone()); + wait_node.state = PollState::Waiting; + self.add_write_waiter(wait_node, has_priority); + Poll::Pending + } + } + PollState::Done => { + // The future had been polled to completion before + panic!("polled Mutex after completion"); + } + } + } + + /// Tries to acquire an upgradable shared read access from a Entry. + /// + /// If it isn't available, the Entry gets added to the wait + /// queue at the Mutex, and will be signalled once ready. + /// This function is only safe as long as the `wait_node`s address is guaranteed + /// to be stable until it gets removed from the queue. + unsafe fn try_lock_upgrade_read( + &mut self, + wait_node: &mut ListNode, + cx: &mut Context<'_>, + ) -> Poll<()> { + match wait_node.state { + PollState::New => { + // The fast path - the Mutex isn't locked by anyone else. + // If the mutex is fair, noone must be in the wait list before us. + if self.try_lock_upgrade_read_sync() { + wait_node.state = PollState::Done; + Poll::Ready(()) + } else { + // Add the task to the wait queue + wait_node.task = Some(cx.waker().clone()); + wait_node.state = PollState::Waiting; + self.add_upgrade_read_waiter(wait_node); + Poll::Pending + } + } + PollState::Waiting => { + // The RwLockReadFuture is already in the queue. + if self.is_fair { + // The task needs to wait until it gets notified in order to + // maintain the ordering. However the caller might have + // passed a different `Waker`. In this case we need to update it. + update_waker_ref(&mut wait_node.task, cx); + Poll::Pending + } else { + // For throughput improvement purposes, grab the lock immediately + // if it's available. + if !self.has_write && !self.has_upgrade_read { + self.has_upgrade_read = true; + self.nb_reads += 1; + wait_node.state = PollState::Done; + // Since this waiter has been registered before, it must + // get removed from the waiter list. + // Safety: Due to the state, we know that the node must be part + // of the waiter list + self.force_remove_upgrade_read(wait_node); + Poll::Ready(()) + } else { + // The caller might have passed a different `Waker`. + // In this case we need to update it. + update_waker_ref(&mut wait_node.task, cx); + Poll::Pending + } + } + } + PollState::Notified => { + // We had been woken by the mutex, since the mutex is available again. + // The mutex thereby removed us from the waiters list. + // Just try to lock again. If the mutex isn't available, + // we need to add it to the wait queue again. + if !self.has_write && !self.has_upgrade_read { + if self.is_fair { + // In a fair Mutex, the Entry is kept in the + // linked list and must be removed here + // Safety: Due to the state, we know that the node must be part + // of the waiter list + self.force_remove_upgrade_read(wait_node); + } + self.has_upgrade_read = true; + self.nb_reads += 1; + wait_node.state = PollState::Done; + Poll::Ready(()) + } else { + // Fair mutexes should always be able to acquire the lock + // after they had been notified + debug_assert!(!self.is_fair); + // Add to queue + wait_node.task = Some(cx.waker().clone()); + wait_node.state = PollState::Waiting; + self.add_upgrade_read_waiter(wait_node); + Poll::Pending + } + } + PollState::Done => { + // The future had been polled to completion before + panic!("polled Mutex after completion"); + } + } + } + + /// Tries to remove a read waiter from the wait queue, and panics if the + /// waiter is no longer valid. + unsafe fn force_remove_read(&mut self, wait_node: &mut ListNode) { + if !self.waiters.remove(wait_node) { + // Panic if the address isn't found. This can only happen if the contract was + // violated, e.g. the Entry got moved after the initial poll. + panic!("Future could not be removed from wait queue"); + } + self.nb_waiting_reads -= 1; + } + + /// Tries to remove a write waiter from the wait queue, and panics if the + /// waiter is no longer valid. + unsafe fn force_remove_write(&mut self, wait_node: &mut ListNode) { + if !self.waiters.remove(wait_node) { + // Panic if the address isn't found. This can only happen if the contract was + // violated, e.g. the Entry got moved after the initial poll. + panic!("Future could not be removed from wait queue"); + } + self.nb_waiting_writes -= 1; + } + + /// Tries to remove a upgrade_read waiter from the wait queue, and panics if the + /// waiter is no longer valid. + unsafe fn force_remove_upgrade_read( + &mut self, + wait_node: &mut ListNode, + ) { + if !self.waiters.remove(wait_node) { + // Panic if the address isn't found. This can only happen if the contract was + // violated, e.g. the Entry got moved after the initial poll. + panic!("Future could not be removed from wait queue"); + } + self.nb_waiting_upgrade_reads -= 1; + } + + /// Removes the read lock waiter from the wait list. + /// + /// This function is only safe as long as the reference that is passed here + /// equals the reference/address under which the waiter was added. + /// The waiter must not have been moved in between. + /// + /// Returns the `Waker` of another task which might get ready to run due to + /// this. + fn remove_read(&mut self, wait_node: &mut ListNode) { + // MutexLockFuture only needs to get removed if it had been added to + // the wait queue of the Mutex. This has happened in the PollState::Waiting case. + // If the current waiter was notified, another waiter must get notified now. + match wait_node.state { + PollState::Notified => { + if self.is_fair { + // In a fair Mutex, the Entry is kept in the + // linked list and must be removed here + // Safety: Due to the state, we know that the node must be part + // of the waiter list + unsafe { self.force_remove_read(wait_node) }; + } + wait_node.state = PollState::Done; + // Since the task was notified but did not lock the Mutex, + // another task gets the chance to run. + self.wakeup_waiters_if_needed(); + } + PollState::Waiting => { + // Remove the Entry from the linked list + // Safety: Due to the state, we know that the node must be part + // of the waiter list + unsafe { self.force_remove_read(wait_node) }; + wait_node.state = PollState::Done; + } + PollState::New | PollState::Done => (), + } + } + + /// Removes the write lock waiter from the wait list. + /// + /// This function is only safe as long as the reference that is passed here + /// equals the reference/address under which the waiter was added. + /// The waiter must not have been moved in between. + /// + /// Returns the `Waker` of another task which might get ready to run due to + /// this. + fn remove_write(&mut self, wait_node: &mut ListNode) { + // MutexLockFuture only needs to get removed if it had been added to + // the wait queue of the Mutex. This has happened in the PollState::Waiting case. + // If the current waiter was notified, another waiter must get notified now. + match wait_node.state { + PollState::Notified => { + if self.is_fair { + // In a fair Mutex, the Entry is kept in the + // linked list and must be removed here + // Safety: Due to the state, we know that the node must be part + // of the waiter list + unsafe { self.force_remove_write(wait_node) }; + } + wait_node.state = PollState::Done; + // Since the task was notified but did not lock the Mutex, + // another task gets the chance to run. + self.wakeup_waiters_if_needed(); + } + PollState::Waiting => { + // Remove the Entry from the linked list + // Safety: Due to the state, we know that the node must be part + // of the waiter list + unsafe { self.force_remove_write(wait_node) }; + wait_node.state = PollState::Done; + } + PollState::New | PollState::Done => (), + } + } + + /// Removes the upgrade_read lock waiter from the wait list. + /// + /// This function is only safe as long as the reference that is passed here + /// equals the reference/address under which the waiter was added. + /// The waiter must not have been moved in between. + /// + /// Returns the `Waker` of another task which might get ready to run due to + /// this. + fn remove_upgrade_read(&mut self, wait_node: &mut ListNode) { + // MutexLockFuture only needs to get removed if it had been added to + // the wait queue of the Mutex. This has happened in the PollState::Waiting case. + // If the current waiter was notified, another waiter must get notified now. + match wait_node.state { + PollState::Notified => { + if self.is_fair { + // In a fair Mutex, the Entry is kept in the + // linked list and must be removed here + // Safety: Due to the state, we know that the node must be part + // of the waiter list + unsafe { self.force_remove_upgrade_read(wait_node) }; + } + wait_node.state = PollState::Done; + // Since the task was notified but did not lock the Mutex, + // another task gets the chance to run. + self.wakeup_waiters_if_needed(); + } + PollState::Waiting => { + // Remove the Entry from the linked list + // Safety: Due to the state, we know that the node must be part + // of the waiter list + unsafe { self.force_remove_upgrade_read(wait_node) }; + wait_node.state = PollState::Done; + } + PollState::New | PollState::Done => (), + } + } +} + +/// An RAII guard returned by the `read` and `try_read` methods. +/// When this structure is dropped (falls out of scope), the shared +/// read access will be released. +pub struct GenericRwLockReadGuard<'a, MutexType: RawMutex, T: 'a> { + /// The Mutex which is associated with this Guard + mutex: &'a GenericRwLock, +} + +impl core::fmt::Debug + for GenericRwLockReadGuard<'_, MutexType, T> +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("GenericRwLockReadGuard").finish() + } +} + +impl Drop for GenericRwLockReadGuard<'_, MutexType, T> { + fn drop(&mut self) { + // Release the guard. + self.mutex.state.lock().unlock_read(); + } +} + +impl Deref + for GenericRwLockReadGuard<'_, MutexType, T> +{ + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +// Safety: GenericRwLockReadGuard may only be used across threads if the underlying +// type is Sync. +unsafe impl Sync + for GenericRwLockReadGuard<'_, MutexType, T> +{ +} + +/// A future which resolves when shared read access has been successfully acquired. +#[must_use = "futures do nothing unless polled"] +pub struct GenericRwLockReadFuture<'a, MutexType: RawMutex, T: 'a> { + /// The Mutex which should get locked trough this Future + mutex: Option<&'a GenericRwLock>, + /// Node for waiting at the mutex + wait_node: ListNode, +} + +// Safety: Futures can be sent between threads as long as the underlying +// mutex is thread-safe (Sync), which allows to poll/register/unregister from +// a different thread. +unsafe impl<'a, MutexType: RawMutex + Sync, T: 'a> Send + for GenericRwLockReadFuture<'a, MutexType, T> +{ +} + +impl<'a, MutexType: RawMutex, T: core::fmt::Debug> core::fmt::Debug + for GenericRwLockReadFuture<'a, MutexType, T> +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("GenericRwLockReadFuture").finish() + } +} + +impl<'a, MutexType: RawMutex, T> Future + for GenericRwLockReadFuture<'a, MutexType, T> +{ + type Output = GenericRwLockReadGuard<'a, MutexType, T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safety: The next operations are safe, because Pin promises us that + // the address of the wait queue entry inside this future is stable, + // and we don't move any fields inside the future until it gets dropped. + let mut_self: &mut GenericRwLockReadFuture = + unsafe { Pin::get_unchecked_mut(self) }; + + let mutex = mut_self + .mutex + .expect("polled GenericRwLockReadFuture after completion"); + let mut mutex_state = mutex.state.lock(); + + let poll_res = + unsafe { mutex_state.try_lock_read(&mut mut_self.wait_node, cx) }; + + match poll_res { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => { + // The mutex was acquired + mut_self.mutex = None; + Poll::Ready(GenericRwLockReadGuard::<'a, MutexType, T> { + mutex, + }) + } + } + } +} + +impl<'a, MutexType: RawMutex, T> FusedFuture + for GenericRwLockReadFuture<'a, MutexType, T> +{ + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl<'a, MutexType: RawMutex, T> Drop + for GenericRwLockReadFuture<'a, MutexType, T> +{ + fn drop(&mut self) { + // If this GenericRwLockReadFuture has been polled and it was added to the + // wait queue at the mutex, it must be removed before dropping. + // Otherwise the mutex would access invalid memory. + if let Some(mutex) = self.mutex { + let mut mutex_state = mutex.state.lock(); + mutex_state.remove_read(&mut self.wait_node) + } + } +} + +/// An RAII guard returned by the `write` and `try_write` methods. +/// When this structure is dropped (falls out of scope), the +/// exclusive write access will be released. +pub struct GenericRwLockWriteGuard<'a, MutexType: RawMutex, T: 'a> { + /// The Mutex which is associated with this Guard + mutex: &'a GenericRwLock, +} + +impl core::fmt::Debug + for GenericRwLockWriteGuard<'_, MutexType, T> +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("GenericRwLockWriteGuard").finish() + } +} + +impl Drop + for GenericRwLockWriteGuard<'_, MutexType, T> +{ + fn drop(&mut self) { + // Release the guard. + self.mutex.state.lock().unlock_write(); + } +} + +impl Deref + for GenericRwLockWriteGuard<'_, MutexType, T> +{ + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +impl DerefMut + for GenericRwLockWriteGuard<'_, MutexType, T> +{ + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.mutex.value.get() } + } +} + +// Safety: GenericRwLockReadGuard may only be used across threads if the underlying +// type is Sync. +unsafe impl Sync + for GenericRwLockWriteGuard<'_, MutexType, T> +{ +} + +/// A future which resolves when exclusive write access has been successfully acquired. +#[must_use = "futures do nothing unless polled"] +pub struct GenericRwLockWriteFuture<'a, MutexType: RawMutex, T: 'a> { + /// The Mutex which should get locked trough this Future + mutex: Option<&'a GenericRwLock>, + /// Node for waiting at the mutex + wait_node: ListNode, + /// Whether this is a upgradable_read lock that is being upgraded, + /// thus has priority over the other writers. + has_priority: bool, +} + +// Safety: Futures can be sent between threads as long as the underlying +// mutex is thread-safe (Sync), which allows to poll/register/unregister from +// a different thread. +unsafe impl<'a, MutexType: RawMutex + Sync, T: 'a> Send + for GenericRwLockWriteFuture<'a, MutexType, T> +{ +} + +impl<'a, MutexType: RawMutex, T: core::fmt::Debug> core::fmt::Debug + for GenericRwLockWriteFuture<'a, MutexType, T> +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("GenericRwLockWriteFuture").finish() + } +} + +impl<'a, MutexType: RawMutex, T> Future + for GenericRwLockWriteFuture<'a, MutexType, T> +{ + type Output = GenericRwLockWriteGuard<'a, MutexType, T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safety: The next operations are safe, because Pin promises us that + // the address of the wait queue entry inside this future is stable, + // and we don't move any fields inside the future until it gets dropped. + let mut_self: &mut GenericRwLockWriteFuture = + unsafe { Pin::get_unchecked_mut(self) }; + + let mutex = mut_self + .mutex + .expect("polled GenericRwLockWriteFuture after completion"); + let mut mutex_state = mutex.state.lock(); + + let poll_res = unsafe { + mutex_state.try_lock_write( + &mut mut_self.wait_node, + cx, + mut_self.has_priority, + ) + }; + + match poll_res { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => { + // The mutex was acquired + mut_self.mutex = None; + Poll::Ready(GenericRwLockWriteGuard::<'a, MutexType, T> { + mutex, + }) + } + } + } +} + +impl<'a, MutexType: RawMutex, T> FusedFuture + for GenericRwLockWriteFuture<'a, MutexType, T> +{ + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl<'a, MutexType: RawMutex, T> Drop + for GenericRwLockWriteFuture<'a, MutexType, T> +{ + fn drop(&mut self) { + // If this GenericRwLockWriteFuture has been polled and it was added to the + // wait queue at the mutex, it must be removed before dropping. + // Otherwise the mutex would access invalid memory. + if let Some(mutex) = self.mutex { + let mut mutex_state = mutex.state.lock(); + mutex_state.remove_write(&mut self.wait_node) + } + } +} + +/// An RAII guard returned by the `upgradable_read` and `try_upgradable_read` +/// methods. When this structure is dropped (falls out of scope), the +/// shared read access and exclusive upgrade_read access will be released. +pub struct GenericRwLockUpgradableReadGuard<'a, MutexType: RawMutex, T: 'a> { + /// The Mutex which is associated with this Guard + mutex: Option<&'a GenericRwLock>, +} + +impl<'a, MutexType: RawMutex, T> + GenericRwLockUpgradableReadGuard<'a, MutexType, T> +{ + /// Asynchrously upgrade the shared read lock into an exclusive write lock. + /// + /// Note that this future will have priority over all other competing + /// write futures. + pub fn upgrade(mut self) -> GenericRwLockWriteFuture<'a, MutexType, T> { + let mutex = self.mutex.take().unwrap(); + let mut state = mutex.state.lock(); + state.unlock_upgrade_read_for_upgrade(); + + GenericRwLockWriteFuture:: { + mutex: Some(mutex), + wait_node: ListNode::new(Entry::new(EntryKind::Write)), + has_priority: true, + } + } + + /// Attempt to atomically upgrade the shared read lock into an + /// exclusive write lock. + pub fn try_upgrade( + mut self, + ) -> Result, Self> { + let mutex = self.mutex.take().unwrap(); + let mut state = mutex.state.lock(); + if state.try_upgrade_read_sync() { + Ok(GenericRwLockWriteGuard { mutex }) + } else { + Err(Self { mutex: Some(mutex) }) + } + } +} + +impl core::fmt::Debug + for GenericRwLockUpgradableReadGuard<'_, MutexType, T> +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("GenericRwLockUpgradableReadGuard").finish() + } +} + +impl Drop + for GenericRwLockUpgradableReadGuard<'_, MutexType, T> +{ + fn drop(&mut self) { + if let Some(mutex) = self.mutex.take() { + // Release the guard. + mutex.state.lock().unlock_upgrade_read(); + } + } +} + +impl Deref + for GenericRwLockUpgradableReadGuard<'_, MutexType, T> +{ + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.as_ref().unwrap().value.get() } + } +} + +// Safety: GenericRwLockReadGuard may only be used across threads if the underlying +// type is Sync. +unsafe impl Sync + for GenericRwLockUpgradableReadGuard<'_, MutexType, T> +{ +} + +/// A future which resolves when upgrade_read lock access has been successfully acquired. +#[must_use = "futures do nothing unless polled"] +pub struct GenericRwLockUpgradableReadFuture<'a, MutexType: RawMutex, T: 'a> { + /// The Mutex which should get locked trough this Future + mutex: Option<&'a GenericRwLock>, + /// Node for waiting at the mutex + wait_node: ListNode, +} + +// Safety: Futures can be sent between threads as long as the underlying +// mutex is thread-safe (Sync), which allows to poll/register/unregister from +// a different thread. +unsafe impl<'a, MutexType: RawMutex + Sync, T: 'a> Send + for GenericRwLockUpgradableReadFuture<'a, MutexType, T> +{ +} + +impl<'a, MutexType: RawMutex, T: core::fmt::Debug> core::fmt::Debug + for GenericRwLockUpgradableReadFuture<'a, MutexType, T> +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("GenericRwLockUpgradableReadFuture").finish() + } +} + +impl<'a, MutexType: RawMutex, T> Future + for GenericRwLockUpgradableReadFuture<'a, MutexType, T> +{ + type Output = GenericRwLockUpgradableReadGuard<'a, MutexType, T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Safety: The next operations are safe, because Pin promises us that + // the address of the wait queue entry inside the future is stable, + // and we don't move any fields inside the future until it gets dropped. + let mut_self: &mut GenericRwLockUpgradableReadFuture = + unsafe { Pin::get_unchecked_mut(self) }; + + let mutex = mut_self.mutex.expect( + "polled GenericRwLockUpgradableReadFuture after completion", + ); + let mut mutex_state = mutex.state.lock(); + + let poll_res = unsafe { + mutex_state.try_lock_upgrade_read(&mut mut_self.wait_node, cx) + }; + + match poll_res { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => { + // The mutex was acquired + mut_self.mutex = None; + Poll::Ready(GenericRwLockUpgradableReadGuard::< + 'a, + MutexType, + T, + > { + mutex: Some(mutex), + }) + } + } + } +} + +impl<'a, MutexType: RawMutex, T> FusedFuture + for GenericRwLockUpgradableReadFuture<'a, MutexType, T> +{ + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl<'a, MutexType: RawMutex, T> Drop + for GenericRwLockUpgradableReadFuture<'a, MutexType, T> +{ + fn drop(&mut self) { + // If this future has been polled and it was added to the + // wait queue at the mutex, it must be removed before dropping. + // Otherwise the mutex would access invalid memory. + if let Some(mutex) = self.mutex { + let mut mutex_state = mutex.state.lock(); + mutex_state.remove_upgrade_read(&mut self.wait_node) + } + } +} + +/// A futures-aware mutex. +pub struct GenericRwLock { + value: UnsafeCell, + state: LockApiMutex, +} + +// It is safe to send mutexes between threads, as long as they are not used and +// thereby borrowed +unsafe impl Send + for GenericRwLock +{ +} +// The mutex is thread-safe as long as the utilized mutex is thread-safe +unsafe impl Sync + for GenericRwLock +{ +} + +impl core::fmt::Debug + for GenericRwLock +{ + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + f.debug_struct("Mutex") + .field("is_exclusive", &self.is_exclusive()) + .finish() + } +} + +impl GenericRwLock { + /// Creates a new futures-aware mutex. + /// + /// `is_fair` defines whether the `RwLock` should behave be fair regarding the + /// order of waiters. A fair `RwLock` will respect the priority queue + /// so that older waiters will be prioritized over newer waiters + /// when polling to acquire their lock. + pub fn new(value: T, is_fair: bool) -> GenericRwLock { + GenericRwLock:: { + value: UnsafeCell::new(value), + state: LockApiMutex::new(MutexState::new(is_fair)), + } + } + + /// Acquire shared read access asynchronously. + /// + /// This method returns a future that will resolve once the shared + /// read access has been successfully acquired. + pub fn read(&self) -> GenericRwLockReadFuture<'_, MutexType, T> { + GenericRwLockReadFuture:: { + mutex: Some(&self), + wait_node: ListNode::new(Entry::new(EntryKind::Read)), + } + } + + /// Tries to acquire shared read access without waiting. + /// + /// If acquiring the mutex is successful, a [`GenericRwLockReadGuard`] + /// will be returned, which allows to access the contained data. + /// + /// Otherwise `None` will be returned. + pub fn try_read(&self) -> Option> { + if self.state.lock().try_lock_read_sync() { + Some(GenericRwLockReadGuard { mutex: self }) + } else { + None + } + } + + /// Acquire exclusive write access asynchronously. + /// + /// This method returns a future that will resolve once the exclusive + /// write access has been successfully acquired. + pub fn write(&self) -> GenericRwLockWriteFuture<'_, MutexType, T> { + GenericRwLockWriteFuture:: { + mutex: Some(&self), + wait_node: ListNode::new(Entry::new(EntryKind::Write)), + has_priority: false, + } + } + + /// Tries to acquire exclusive write access without waiting. + /// + /// If acquiring the mutex is successful, a [`GenericRwLockReadGuard`] + /// will be returned, which allows to access the contained data. + /// + /// Otherwise `None` will be returned. + pub fn try_write( + &self, + ) -> Option> { + if self.state.lock().try_lock_write_sync() { + Some(GenericRwLockWriteGuard { mutex: self }) + } else { + None + } + } + + /// Acquire upgradable shared read access asynchronously. + /// + /// This method returns a future that will resolve once the upgradable + /// shared read access has been successfully acquired. + pub fn upgradable_read( + &self, + ) -> GenericRwLockUpgradableReadFuture<'_, MutexType, T> { + GenericRwLockUpgradableReadFuture:: { + mutex: Some(&self), + wait_node: ListNode::new(Entry::new(EntryKind::UpgradeRead)), + } + } + + /// Tries to acquire exclusive write access without waiting. + /// + /// If acquiring the mutex is successful, a [`GenericRwLockReadGuard`] + /// will be returned, which allows to access the contained data. + /// + /// Otherwise `None` will be returned. + pub fn try_upgradable_read( + &self, + ) -> Option> { + if self.state.lock().try_lock_upgrade_read_sync() { + Some(GenericRwLockUpgradableReadGuard { mutex: Some(self) }) + } else { + None + } + } + + /// Returns whether the `RwLock` is locked in exclusive access. + pub fn is_exclusive(&self) -> bool { + self.state.lock().has_write + } + + /// Returns the number of shared read access guards currently held. + pub fn nb_readers(&self) -> usize { + self.state.lock().nb_reads + } +} + +// Export a non thread-safe version using NoopLock + +/// A [`GenericRwLock`] which is not thread-safe. +pub type LocalRwLock = GenericRwLock; + +/// A [`GenericRwLockReadGuard`] for [`LocalMutex`]. +pub type LocalRwLockReadGuard<'a, T> = GenericRwLockReadGuard<'a, NoopLock, T>; + +/// A [`GenericRwLockReadFuture`] for [`LocalMutex`]. +pub type LocalRwLockReadFuture<'a, T> = + GenericRwLockReadFuture<'a, NoopLock, T>; + +/// A [`GenericRwLockWriteGuard`] for [`LocalMutex`]. +pub type LocalRwLockWriteGuard<'a, T> = + GenericRwLockWriteGuard<'a, NoopLock, T>; + +/// A [`GenericRwLockWriteFuture`] for [`LocalMutex`]. +pub type LocalRwLockWriteFuture<'a, T> = + GenericRwLockWriteFuture<'a, NoopLock, T>; + +/// A [`GenericRwLockUpgradableReadGuard`] for [`LocalMutex`]. +pub type LocalRwLockUpgradableReadGuard<'a, T> = + GenericRwLockUpgradableReadGuard<'a, NoopLock, T>; + +/// A [`GenericRwLockUpgradableReadFuture`] for [`LocalMutex`]. +pub type LocalRwLockUpgradableReadFuture<'a, T> = + GenericRwLockUpgradableReadFuture<'a, NoopLock, T>; + +#[cfg(feature = "std")] +mod if_std { + use super::*; + + // Export a thread-safe version using parking_lot::RawMutex + + /// A [`GenericRwLock`] backed by [`parking_lot`]. + pub type RwLock = GenericRwLock; + + /// A [`GenericRwLockReadGuard`] for [`Mutex`]. + pub type RwLockReadGuard<'a, T> = + GenericRwLockReadGuard<'a, parking_lot::RawMutex, T>; + + /// A [`GenericRwLockReadFuture`] for [`Mutex`]. + pub type RwLockReadFuture<'a, T> = + GenericRwLockReadFuture<'a, parking_lot::RawMutex, T>; + + /// A [`GenericRwLockWriteGuard`] for [`Mutex`]. + pub type RwLockWriteGuard<'a, T> = + GenericRwLockWriteGuard<'a, parking_lot::RawMutex, T>; + + /// A [`GenericRwLockWriteFuture`] for [`Mutex`]. + pub type RwLockWriteFuture<'a, T> = + GenericRwLockWriteFuture<'a, parking_lot::RawMutex, T>; + + /// A [`GenericRwLockUpgradableReadGuard`] for [`Mutex`]. + pub type RwLockUpgradableReadGuard<'a, T> = + GenericRwLockUpgradableReadGuard<'a, parking_lot::RawMutex, T>; + + /// A [`GenericRwLockUpgradableReadFuture`] for [`Mutex`]. + pub type RwLockUpgradableReadFuture<'a, T> = + GenericRwLockUpgradableReadFuture<'a, parking_lot::RawMutex, T>; +} + +#[cfg(feature = "std")] +pub use self::if_std::*; diff --git a/tests/rwlock.rs b/tests/rwlock.rs new file mode 100644 index 0000000..fd400a8 --- /dev/null +++ b/tests/rwlock.rs @@ -0,0 +1,931 @@ +#![feature(collapse_debuginfo)] + +use futures::future::{FusedFuture, Future}; +use futures::task::{Context, Poll}; +use futures_intrusive::sync::LocalRwLock; +use futures_test::task::{new_count_waker, panic_waker}; +use pin_utils::pin_mut; + +// Allows backtrace to work properly inside macro expension. +#[collapse_debuginfo] +macro_rules! assert_eq_ { + ($left:expr, $right:expr $(,)?) => { + assert_eq!($left, $right); + }; +} + +#[collapse_debuginfo] +macro_rules! assert_ { + ($val:expr) => { + assert!($val) + }; +} + +#[collapse_debuginfo] +macro_rules! panic_ { + ($val:expr) => { + panic!($val) + }; +} + +#[collapse_debuginfo] +macro_rules! gen_rwlock_tests { + ($mod_name:ident, $rwlock_type:ident) => { + mod $mod_name { + use super::*; + + #[test] + fn uncontended_read() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + { + let mut guards = Vec::with_capacity(3); + for _ in 0..3 { + let fut = lock.read(); + pin_mut!(fut); + match fut.as_mut().poll(cx) { + Poll::Pending => { + panic_!("Expect lock to get locked") + } + Poll::Ready(mut guard) => { + assert_eq_!(false, lock.is_exclusive()); + assert_eq_!(5, *guard); + guards.push(guard); + assert_eq_!( + guards.len(), + lock.nb_readers() + ); + } + }; + assert_!(fut.as_mut().is_terminated()); + } + + assert_eq_!(3, lock.nb_readers()); + + drop(guards.pop().unwrap()); + assert_eq_!(2, lock.nb_readers()); + + drop(guards.pop().unwrap()); + assert_eq_!(1, lock.nb_readers()); + + drop(guards.pop().unwrap()); + assert_eq_!(0, lock.nb_readers()); + } + + { + let fut = lock.read(); + pin_mut!(fut); + match fut.as_mut().poll(cx) { + Poll::Pending => { + panic_!("Expect lock to get locked") + } + Poll::Ready(guard) => { + assert_eq_!(false, lock.is_exclusive()); + assert_eq_!(1, lock.nb_readers()); + assert_eq_!(5, *guard); + } + }; + } + + assert_eq_!(0, lock.nb_readers()); + } + } + + #[test] + fn uncontended_write() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + { + let fut = lock.write(); + pin_mut!(fut); + match fut.as_mut().poll(cx) { + Poll::Pending => { + panic_!("Expect lock to get locked") + } + Poll::Ready(mut guard) => { + assert_eq_!(true, lock.is_exclusive()); + assert_eq_!(5, *guard); + *guard = 12; + assert_eq_!(12, *guard); + } + }; + assert_!(fut.as_mut().is_terminated()); + } + assert_eq_!(false, lock.is_exclusive()); + + { + let fut = lock.write(); + pin_mut!(fut); + match fut.as_mut().poll(cx) { + Poll::Pending => { + panic_!("Expect lock to get locked") + } + Poll::Ready(guard) => { + assert_eq_!(true, lock.is_exclusive()); + assert_eq_!(12, *guard); + } + }; + } + + assert_eq_!(false, lock.is_exclusive()); + } + } + + #[test] + fn uncontended_upgradable_read() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + { + let fut = lock.upgradable_read(); + pin_mut!(fut); + let guard = match fut.as_mut().poll(cx) { + Poll::Pending => { + panic_!("Expect lock to get locked") + } + Poll::Ready(guard) => { + assert_eq_!(false, lock.is_exclusive()); + assert_eq_!(5, *guard); + guard + } + }; + assert_!(fut.as_mut().is_terminated()); + + let fut = guard.upgrade(); + pin_mut!(fut); + match fut.as_mut().poll(cx) { + Poll::Pending => { + panic_!("Expect lock to get locked") + } + Poll::Ready(mut guard) => { + assert_eq_!(true, lock.is_exclusive()); + assert_eq_!(5, *guard); + *guard = 12; + assert_eq_!(12, *guard); + } + }; + } + + assert_eq_!(false, lock.is_exclusive()); + + { + let fut = lock.upgradable_read(); + pin_mut!(fut); + match fut.as_mut().poll(cx) { + Poll::Pending => { + panic_!("Expect lock to get locked") + } + Poll::Ready(guard) => { + assert_eq_!(false, lock.is_exclusive()); + assert_eq_!(12, *guard); + } + }; + } + + assert_eq_!(false, lock.is_exclusive()); + } + } + + #[test] + fn contended_read() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + let guard = lock.try_write().unwrap(); + + { + assert_!(lock.try_read().is_none()); + let fut = lock.read(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_pending()); + assert_!(!fut.as_mut().is_terminated()); + } + + { + assert_!(lock.try_read().is_none()); + let fut = lock.read(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_pending()); + assert_!(!fut.as_mut().is_terminated()); + } + } + } + + #[test] + fn contended_write() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + let guard = lock.try_read().unwrap(); + + { + assert_!(lock.try_write().is_none()); + let fut = lock.write(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_pending()); + assert_!(!fut.as_mut().is_terminated()); + } + } + } + + #[test] + fn contended_upgrade_read() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + let guard = lock.try_upgradable_read().unwrap(); + { + assert_!(lock.try_upgradable_read().is_none()); + let fut = lock.upgradable_read(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_pending()); + assert_!(!fut.as_mut().is_terminated()); + } + drop(guard); + + let guard = lock.try_write().unwrap(); + { + assert_!(lock.try_upgradable_read().is_none()); + let fut = lock.upgradable_read(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_pending()); + assert_!(!fut.as_mut().is_terminated()); + } + } + } + + #[test] + fn dropping_reader_wakes_up_writer() { + for is_fair in &[true, false] { + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + let guard = lock.try_read().unwrap(); + + let mut fut = lock.write(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_pending()); + assert_!(!fut.as_mut().is_terminated()); + + assert_eq_!(count, 0); + + drop(guard); + assert_eq_!(count, 1); + + assert_!(fut.as_mut().poll(cx).is_ready()); + assert_!(fut.as_mut().is_terminated()); + } + } + + #[test] + fn dropping_writer_wakes_up_readers() { + for is_fair in &[true, false] { + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + let guard = lock.try_write().unwrap(); + + let fut1 = lock.read(); + pin_mut!(fut1); + assert_!(fut1.as_mut().poll(cx).is_pending()); + assert_!(!fut1.as_mut().is_terminated()); + + let fut2 = lock.upgradable_read(); + pin_mut!(fut2); + assert_!(fut2.as_mut().poll(cx).is_pending()); + assert_!(!fut2.as_mut().is_terminated()); + + assert_eq_!(count, 0); + + drop(guard); + assert_eq_!(count, 2); + + assert_!(fut1.as_mut().poll(cx).is_ready()); + assert_!(fut1.as_mut().is_terminated()); + + assert_!(fut2.as_mut().poll(cx).is_ready()); + assert_!(fut2.as_mut().is_terminated()); + } + } + + #[test] + #[should_panic] + fn poll_read_after_completion_should_panic() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + { + let fut = lock.read(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_ready()); + assert_!(fut.as_mut().is_terminated()); + + let _ = fut.as_mut().poll(cx); + } + } + } + + #[test] + #[should_panic] + fn poll_write_after_completion_should_panic() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + { + let fut = lock.write(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_ready()); + assert_!(fut.as_mut().is_terminated()); + + let _ = fut.as_mut().poll(cx); + } + } + } + + #[test] + #[should_panic] + fn poll_upgradable_read_after_completion_should_panic() { + for is_fair in &[true, false] { + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + { + let fut = lock.upgradable_read(); + pin_mut!(fut); + assert_!(fut.as_mut().poll(cx).is_ready()); + assert_!(fut.as_mut().is_terminated()); + + let _ = fut.as_mut().poll(cx); + } + } + } + + #[test] + fn dropping_guard_follows_queue_order_for_wakeup() { + for is_fair in &[true, false] { + let lock = $rwlock_type::new(5, *is_fair); + assert_eq_!(false, lock.is_exclusive()); + + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + + let writer = lock.try_write().unwrap(); + + let read_fut1 = lock.read(); + pin_mut!(read_fut1); + assert_!(read_fut1.as_mut().poll(cx).is_pending()); + + let read_fut2 = lock.read(); + pin_mut!(read_fut2); + assert_!(read_fut2.as_mut().poll(cx).is_pending()); + + let write_fut1 = lock.write(); + pin_mut!(write_fut1); + assert_!(write_fut1.as_mut().poll(cx).is_pending()); + + let upgrade_read_fut1 = lock.upgradable_read(); + pin_mut!(upgrade_read_fut1); + assert_!(upgrade_read_fut1.as_mut().poll(cx).is_pending()); + + let write_fut2 = lock.write(); + pin_mut!(write_fut2); + assert_!(write_fut2.as_mut().poll(cx).is_pending()); + + let read_fut3 = lock.read(); + pin_mut!(read_fut3); + assert_!(read_fut3.as_mut().poll(cx).is_pending()); + + let upgrade_read_fut2 = lock.upgradable_read(); + pin_mut!(upgrade_read_fut2); + assert_!(upgrade_read_fut2.as_mut().poll(cx).is_pending()); + + assert_eq_!(count, 0); + + drop(writer); + + // Wakeup the three readers and 1 upgradable_read. + assert_eq_!(count, 4); + + let guard1 = match read_fut1.as_mut().poll(cx) { + Poll::Pending => panic_!("busy rwlock"), + Poll::Ready(guard) => guard, + }; + let guard2 = match read_fut2.as_mut().poll(cx) { + Poll::Pending => panic_!("busy rwlock"), + Poll::Ready(guard) => guard, + }; + let guard3 = match upgrade_read_fut1.as_mut().poll(cx) { + Poll::Pending => panic_!("busy rwlock"), + Poll::Ready(guard) => guard, + }; + let guard4 = match read_fut3.as_mut().poll(cx) { + Poll::Pending => panic_!("busy rwlock"), + Poll::Ready(guard) => guard, + }; + assert_!(upgrade_read_fut2.as_mut().poll(cx).is_pending()); + assert_!(write_fut1.as_mut().poll(cx).is_pending()); + assert_!(write_fut2.as_mut().poll(cx).is_pending()); + + assert_eq_!(count, 4); + + drop(guard1); + assert_eq_!(count, 4); + + // Wakeup the other upgradable_read + drop(guard3); + assert_eq_!(count, 5); + + let guard5 = match upgrade_read_fut2.as_mut().poll(cx) { + Poll::Pending => panic_!("busy rwlock"), + Poll::Ready(guard) => guard, + }; + assert_!(write_fut1.as_mut().poll(cx).is_pending()); + assert_!(write_fut2.as_mut().poll(cx).is_pending()); + + drop(guard2); + assert_eq_!(count, 5); + drop(guard5); + assert_eq_!(count, 5); + // Now that we dropped all readers, wakeup one writer. + drop(guard4); + assert_eq_!(count, 6); + + let guard6 = match write_fut1.as_mut().poll(cx) { + Poll::Pending => panic_!("busy rwlock"), + Poll::Ready(guard) => guard, + }; + assert_!(write_fut2.as_mut().poll(cx).is_pending()); + + // Wakeup the other writer. + drop(guard6); + assert_eq_!(count, 7); + + assert_!(write_fut2.as_mut().poll(cx).is_ready()); + assert_eq_!(count, 7); + } + } + } + + #[test] + fn cancel_wait_for_lock() { + for is_fair in &[true, false] { + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + + let mut guard1 = lock.try_write().unwrap(); + + // The second and third lock attempt must fail + let mut fut1 = Box::pin(lock.write()); + let mut fut2 = Box::pin(lock.read()); + let mut fut3 = Box::pin(lock.upgradable_read()); + let mut fut4 = Box::pin(lock.write()); + + assert!(fut1.as_mut().poll(cx).is_pending()); + assert!(fut2.as_mut().poll(cx).is_pending()); + assert!(fut3.as_mut().poll(cx).is_pending()); + assert!(fut4.as_mut().poll(cx).is_pending()); + + // Before the lock gets available, cancel a bunch of futures. + drop(fut1); + drop(fut2); + drop(fut3); + + assert_eq!(count, 0); + + // The reader should have been notified. + drop(guard1); + assert_eq!(count, 1); + + // Unlock - mutex should be available again + assert_!(fut4.as_mut().poll(cx).is_ready()); + } + } + + #[test] + fn unlock_next_when_notification_is_not_used() { + for is_fair in &[true, false] { + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, *is_fair); + + // Lock the mutex + let mut guard1 = lock.try_write().unwrap(); + + // The second and third lock attempt must fail + let mut fut2 = Box::pin(lock.write()); + let mut fut3 = Box::pin(lock.write()); + let mut fut4 = Box::pin(lock.upgradable_read()); + let mut fut5 = Box::pin(lock.upgradable_read()); + + assert_!(fut2.as_mut().poll(cx).is_pending()); + assert_!(fut3.as_mut().poll(cx).is_pending()); + assert_!(fut4.as_mut().poll(cx).is_pending()); + assert_!(fut5.as_mut().poll(cx).is_pending()); + + assert_eq_!(count, 0); + + // Unlock, notifying the next waiter in line. + drop(guard1); + assert_eq_!(count, 1); + + // Ignore the notification. + drop(fut2); + assert_eq_!(count, 2); + + // Unlock - mutex should be available again + let guard3 = match fut3.as_mut().poll(cx) { + Poll::Pending => panic!("lock busy"), + Poll::Ready(guard) => guard, + }; + + drop(guard3); + assert_eq_!(count, 3); + drop(fut4); + assert_eq_!(count, 4); + + match fut5.as_mut().poll(cx) { + Poll::Pending => panic!("lock busy"), + Poll::Ready(guard) => (), + }; + + // We also test cancelling read locks. + let guard = lock.try_write().unwrap(); + + let mut fut6 = Box::pin(lock.upgradable_read()); + let mut fut7 = Box::pin(lock.upgradable_read()); + + assert_!(fut6.as_mut().poll(cx).is_pending()); + assert_!(fut7.as_mut().poll(cx).is_pending()); + + assert_eq_!(count, 4); + drop(guard); + assert_eq_!(count, 5); + drop(fut6); + assert_eq_!(count, 6); + + assert_!(fut7.as_mut().poll(cx).is_ready()); + + } + } + + #[test] + fn new_waiters_on_unfair_lock_can_acquire_future_while_another_task_is_notified() { + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, false); + + let mut guard1 = lock.try_write().unwrap(); + + // The second and third lock attempt must fail + let mut fut2 = Box::pin(lock.write()); + let mut fut3 = Box::pin(lock.upgradable_read()); + let mut fut4 = Box::pin(lock.read()); + let mut fut5 = Box::pin(lock.write()); + + assert_!(fut2.as_mut().poll(cx).is_pending()); + assert_!(fut3.as_mut().poll(cx).is_pending()); + assert_!(fut4.as_mut().poll(cx).is_pending()); + + // Notify fut2. + drop(guard1); + assert_eq_!(count, 1); + + // Steal the lock. + let guard5 = match fut5.as_mut().poll(cx) { + Poll::Pending => panic_!("Expect mutex to get locked"), + Poll::Ready(guard) => guard, + }; + // This future must requeue. + assert_!(fut2.as_mut().poll(cx).is_pending()); + + // ...but the other one preserve their old position in the queue. + assert_!(fut3.as_mut().poll(cx).is_pending()); + assert_!(fut4.as_mut().poll(cx).is_pending()); + + // When we drop fut3, the mutex should signal that it's available for + // fut2 and fut3 since they have priority in the queue. + assert_eq_!(count, 1); + drop(guard5); + assert_eq_!(count, 3); + + // We streal the lock again. + let guard2 = match fut2.as_mut().poll(cx) { + Poll::Pending => panic_!("Expect mutex to get locked"), + Poll::Ready(guard) => guard, + }; + + // The two notified futures must register again. + assert_!(fut3.as_mut().poll(cx).is_pending()); + assert_!(fut4.as_mut().poll(cx).is_pending()); + + // Now we notify the two futures. + drop(guard2); + assert_eq_!(count, 5); + + assert_!(fut3.as_mut().poll(cx).is_ready()); + assert_!(fut4.as_mut().poll(cx).is_ready()); + } + + #[test] + fn new_waiters_on_fair_mutex_cant_acquire_future_while_another_task_is_notified() { + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, true); + + // Lock the mutex + let mut guard1 = lock.try_write().unwrap(); + + let mut fut2 = Box::pin(lock.write()); + let mut fut3 = Box::pin(lock.upgradable_read()); + let mut fut4 = Box::pin(lock.read()); + let mut fut5 = Box::pin(lock.write()); + + assert_!(fut2.as_mut().poll(cx).is_pending()); + + // Notify fut2. + drop(guard1); + assert_eq_!(count, 1); + + // Try to steal the lock. + assert_!(fut3.as_mut().poll(cx).is_pending()); + assert_!(fut4.as_mut().poll(cx).is_pending()); + assert_!(fut5.as_mut().poll(cx).is_pending()); + + // Take the lock. + assert_!(fut2.as_mut().poll(cx).is_ready()); + + // Now fut3 & fut4 should have been signaled and be lockable + assert_eq_!(count, 3); + + // Try to steal the lock. + assert_!(fut5.as_mut().poll(cx).is_pending()); + + // Take the lock. + assert_!(fut3.as_mut().poll(cx).is_ready()); + assert_!(fut4.as_mut().poll(cx).is_ready()); + + // Now fut5 should have been signaled and be lockable + assert_eq_!(count, 4); + + assert_!(fut5.as_mut().poll(cx).is_ready()); + } + + #[test] + fn waiters_on_fair_mutex_cant_acquire_future_while_another_task_is_notified() { + let (waker, count) = new_count_waker(); + let cx = &mut Context::from_waker(&waker); + let lock = $rwlock_type::new(5, true); + + // Lock the mutex + let mut guard1 = lock.try_write().unwrap(); + + let mut fut2 = Box::pin(lock.write()); + let mut fut3 = Box::pin(lock.upgradable_read()); + let mut fut4 = Box::pin(lock.read()); + let mut fut5 = Box::pin(lock.write()); + + assert_!(fut2.as_mut().poll(cx).is_pending()); + assert_!(fut3.as_mut().poll(cx).is_pending()); + assert_!(fut4.as_mut().poll(cx).is_pending()); + + // Notify fut2. + drop(guard1); + assert_eq_!(count, 1); + + // Try to steal the lock. + assert_!(fut3.as_mut().poll(cx).is_pending()); + assert_!(fut4.as_mut().poll(cx).is_pending()); + assert_!(fut5.as_mut().poll(cx).is_pending()); + + // Take the lock. + assert_!(fut2.as_mut().poll(cx).is_ready()); + + // Now fut3 & fut4 should have been signaled and be lockable + assert_eq_!(count, 3); + + // Try to steal the lock. + assert_!(fut5.as_mut().poll(cx).is_pending()); + + // Take the lock. + assert_!(fut3.as_mut().poll(cx).is_ready()); + assert_!(fut4.as_mut().poll(cx).is_ready()); + + // Now fut5 should have been signaled and be lockable + assert_eq_!(count, 4); + + // Take the lock. + assert_!(fut5.as_mut().poll(cx).is_ready()); + } + + #[test] + fn poll_from_multiple_executors() { + for is_fair in &[true, false] { + let (waker_1, count_1) = new_count_waker(); + let (waker_2, count_2) = new_count_waker(); + let lock = $rwlock_type::new(5, *is_fair); + + // Lock the mutex + let mut guard1 = lock.try_write().unwrap(); + *guard1 = 27; + + let cx_1 = &mut Context::from_waker(&waker_1); + let cx_2 = &mut Context::from_waker(&waker_2); + + // Wait the read lock and poll using 2 different contexts. + let fut1 = lock.read(); + pin_mut!(fut1); + + assert_!(fut1.as_mut().poll(cx_1).is_pending()); + assert_!(fut1.as_mut().poll(cx_2).is_pending()); + + // Make sure the context was updated properly. + drop(guard1); + assert_eq_!(count_1, 0); + assert_eq_!(count_2, 1); + + let guard2 = match fut1.as_mut().poll(cx_2) { + Poll::Pending => panic_!("busy lock"), + Poll::Ready(guard) => guard, + }; + assert_!(fut1.as_mut().is_terminated()); + + // Wait for the write lock and poll using 2 different contexts. + let fut2 = lock.write(); + pin_mut!(fut2); + assert_!(fut2.as_mut().poll(cx_1).is_pending()); + assert_!(fut2.as_mut().poll(cx_2).is_pending()); + + // Make sure the context was updated properly. + drop(guard2); + assert_eq_!(count_1, 0); + assert_eq_!(count_2, 2); + + let guard3 = match fut2.as_mut().poll(cx_2) { + Poll::Pending => panic_!("busy lock"), + Poll::Ready(guard) => guard, + }; + + // Wait for the upgradable_read lock and poll using 2 different contexts. + let fut3 = lock.upgradable_read(); + pin_mut!(fut3); + assert_!(fut3.as_mut().poll(cx_1).is_pending()); + assert_!(fut3.as_mut().poll(cx_2).is_pending()); + + // Make sure the context was updated properly. + drop(guard3); + assert_eq_!(count_1, 0); + assert_eq_!(count_2, 3); + + assert_!(fut3.as_mut().poll(cx_2).is_ready()); + } + } + + #[test] + fn upgrade_guard_has_priority_over_writer() { + for is_fair in &[true, false] { + let (waker, count) = new_count_waker(); + let lock = $rwlock_type::new(5, *is_fair); + + // Acquire the upgradable lock. + let mut guard1 = lock.try_read().unwrap(); + let mut guard2 = lock.try_upgradable_read().unwrap(); + + let cx = &mut Context::from_waker(&waker); + + // Wait for the write lock. + let fut3 = lock.write(); + pin_mut!(fut3); + assert_!(fut3.as_mut().poll(cx).is_pending()); + + // We can't upgrade because of the other reader. + let guard2 = guard2.try_upgrade().unwrap_err(); + + // Add contention for the write lock. + let fut4 = guard2.upgrade(); + pin_mut!(fut4); + assert_!(fut4.as_mut().poll(cx).is_pending()); + + assert_eq_!(count, 0); + // This should wakeup the upgrading upgradable_read because + // it has priority. + drop(guard1); + assert_eq_!(count, 1); + + // We got the write lock first because of priority. + assert_!(fut4.as_mut().poll(cx).is_ready()); + assert_eq_!(count, 2); + + // Make sure the lock is released properly. + assert_!(fut3.as_mut().poll(cx).is_ready()); + } + } + + #[test] + fn upgrade_guard_sync_contention() { + for is_fair in &[true, false] { + let (waker, count) = new_count_waker(); + let lock = $rwlock_type::new(5, *is_fair); + + // Acquire the upgradable lock. + let mut guard1 = lock.try_read().unwrap(); + let mut guard2 = lock.try_upgradable_read().unwrap(); + + // We can't upgrade because of the oother reader. + let guard2 = guard2.try_upgrade().unwrap_err(); + drop(guard1); + + // Can't acquire a write lock because of the upgradable_read. + assert_!(lock.try_write().is_none()); + + // Now we can upgrade. + let _guard3 = guard2.try_upgrade().unwrap(); + } + } + }; +} + +gen_rwlock_tests!(local_rwlock_tests, LocalRwLock); + +#[cfg(feature = "std")] +mod if_std { + use super::*; + use futures::FutureExt; + use futures_intrusive::sync::RwLock; + + gen_rwlock_tests!(rwlock_tests, RwLock); + + fn is_send(_: &T) {} + + fn is_send_value(_: T) {} + + fn is_sync(_: &T) {} + + macro_rules! gen_future_send_test { + ($mod_name:ident, $fut_fn:ident) => { + mod $mod_name { + use super::*; + + #[test] + fn futures_are_send() { + let lock = RwLock::new(true, true); + is_sync(&lock); + { + let fut = lock.$fut_fn(); + is_send(&fut); + pin_mut!(fut); + is_send(&fut); + + let waker = &panic_waker(); + let cx = &mut Context::from_waker(&waker); + pin_mut!(fut); + let res = fut.poll_unpin(cx); + let guard = match res { + Poll::Ready(v) => v, + Poll::Pending => panic_!("Expected to be ready"), + }; + is_send(&guard); + is_send_value(guard); + } + is_send_value(lock); + } + } + }; + } + + gen_future_send_test!(rwlock_read, read); + gen_future_send_test!(rwlock_write, write); + gen_future_send_test!(rwlock_upgradable_read, upgradable_read); +}