use core::sync::atomic::Ordering; use cfg_if::cfg_if; cfg_if! { if #[cfg(any( target_os = "linux", target_os = "android", //all(target_os = "emscripten", target_feature = "atomics"), //target_os = "freebsd", //target_os = "openbsd", //target_os = "dragonfly", //target_os = "fuchsia", ))] { mod linux; use linux::{Futex, Primitive, SmallFutex, SmallPrimitive}; } else if #[cfg(all(target_os = "windows", not(target_vendor = "win7")))] { mod windows; use windows::Futex; } } const SPIN_COUNT: u32 = 100; const UNLOCKED: SmallPrimitive = 0; const LOCKED: SmallPrimitive = 1; // locked, no other threads waiting const CONTENDED: SmallPrimitive = 2; // locked, and other threads waiting (contended) pub struct Mutex(SmallFutex); impl Mutex { #[inline] pub const fn new() -> Self { Self(Futex::new(UNLOCKED)) } /// Locks the mutex /// /// # Safety /// /// UB occurs if the mutex is already locked by the current thread and the /// `unsafe_lock` feature is enabled. #[inline] pub unsafe fn lock(&self) { if self .0 .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_err() { self.lock_contended(); } } /// If the mutex is unlocked, it is locked, and this function returns /// `true'. Otherwise, `false` is returned. #[inline] pub unsafe fn try_lock(&self) -> bool { self.0 .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_ok() } /// Unlocks the mutex /// /// # Safety /// /// UB occurs if the mutex is already unlocked or if it has been locked on /// a different thread. #[inline] pub unsafe fn unlock(&self) { if self.0.swap(UNLOCKED, Ordering::Release) == CONTENDED { // We only wake up one thread. When that thread locks the mutex, it // will mark the mutex as CONTENDED (see lock_contended above), // which makes sure that any other waiting threads will also be // woken up eventually. self.0.wake(); } } #[inline] pub unsafe fn is_locked(&self) -> bool { self.0.load(Ordering::Relaxed) == UNLOCKED } #[cold] fn lock_contended(&self) { // Spin first to speed things up if the lock is released quickly. let mut state = self.spin(); // If it's unlocked now, attempt to take the lock // without marking it as contended. if state == UNLOCKED { match self .0 .compare_exchange(UNLOCKED, LOCKED, Ordering::Acquire, Ordering::Relaxed) { Ok(_) => return, // Locked! Err(s) => state = s, } } loop { // Put the lock in contended state. // We avoid an unnecessary write if it as already set to CONTENDED, // to be friendlier for the caches. if state != CONTENDED && self.0.swap(CONTENDED, Ordering::Acquire) == UNLOCKED { // We changed it from UNLOCKED to CONTENDED, so we just successfully locked it. return; } // Wait for the futex to change state, assuming it is still CONTENDED. self.0.wait(CONTENDED); // Spin again after waking up. state = self.spin(); } } fn spin(&self) -> SmallPrimitive { let mut spin = SPIN_COUNT; loop { // We only use `load` (and not `swap` or `compare_exchange`) // while spinning, to be easier on the caches. let state = self.0.load(Ordering::Acquire); // We stop spinning when the mutex is UNLOCKED, // but also when it's CONTENDED. if state != LOCKED || spin == 0 { return state; } core::hint::spin_loop(); spin -= 1; } } } pub struct RwLock { // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag. // Bits 0.30: // 0: Unlocked // 1.=0x3FFF_FFFE: Locked by N readers // 0x3FFF_FFFF: Write locked // Bit 30: Readers are waiting on this futex. // Bit 31: Writers are waiting on the writer_notify futex. state: Futex, // The 'condition variable' to notify writers through. // Incremented on every signal. writer_notify: Futex, } const READ_LOCKED: Primitive = 1; const MASK: Primitive = (1 << 30) - 1; const WRITE_LOCKED: Primitive = MASK; const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED const MAX_READERS: Primitive = MASK - 1; const READERS_WAITING: Primitive = 1 << 30; const WRITERS_WAITING: Primitive = 1 << 31; #[inline] fn is_unlocked(state: Primitive) -> bool { state & MASK == 0 } #[inline] fn is_write_locked(state: Primitive) -> bool { state & MASK == WRITE_LOCKED } #[inline] fn has_readers_waiting(state: Primitive) -> bool { state & READERS_WAITING != 0 } #[inline] fn has_writers_waiting(state: Primitive) -> bool { state & WRITERS_WAITING != 0 } #[inline] fn is_read_lockable(state: Primitive) -> bool { // This also returns false if the counter could overflow if we tried to read lock it. // // We don't allow read-locking if there's readers waiting, even if the lock is unlocked // and there's no writers waiting. The only situation when this happens is after unlocking, // at which point the unlocking thread might be waking up writers, which have priority over readers. // The unlocking thread will clear the readers waiting bit and wake up readers, if necessary. state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state) } #[inline] fn is_read_lockable_after_wakeup(state: Primitive) -> bool { // We make a special case for checking if we can read-lock _after_ a reader thread that went to // sleep has been woken up by a call to `downgrade`. // // `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be // no readers waiting and the lock should be read-locked (not write-locked or unlocked). // // Note that we do not check if any writers are waiting. This is because a call to `downgrade` // implies that the caller wants other readers to read the value protected by the lock. If we // did not allow readers to acquire the lock before writers after a `downgrade`, then only the // original writer would be able to read the value, thus defeating the purpose of `downgrade`. state & MASK < MAX_READERS && !has_readers_waiting(state) && !is_write_locked(state) && !is_unlocked(state) } #[inline] fn has_reached_max_readers(state: Primitive) -> bool { state & MASK == MAX_READERS } impl RwLock { #[inline] pub const fn new() -> Self { Self { state: Futex::new(0), writer_notify: Futex::new(0), } } #[inline] pub fn try_read(&self) -> bool { self.state .fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| { is_read_lockable(s).then(|| s + READ_LOCKED) }) .is_ok() } #[inline] pub fn read(&self) { let state = self.state.load(Ordering::Relaxed); if !is_read_lockable(state) || self .state .compare_exchange_weak( state, state + READ_LOCKED, Ordering::Acquire, Ordering::Relaxed, ) .is_err() { self.read_contended(); } } /// # Safety /// /// The `RwLock` must be read-locked (N readers) in order to call this. #[inline] pub unsafe fn read_unlock(&self) { let state = self.state.fetch_sub(READ_LOCKED, Ordering::Release) - READ_LOCKED; // It's impossible for a reader to be waiting on a read-locked RwLock, // except if there is also a writer waiting. debug_assert!(!has_readers_waiting(state) || has_writers_waiting(state)); // Wake up a writer if we were the last reader and there's a writer waiting. if is_unlocked(state) && has_writers_waiting(state) { self.wake_writer_or_readers(state); } } #[cold] fn read_contended(&self) { let mut has_slept = false; let mut state = self.spin_read(); loop { // If we have just been woken up, first check for a `downgrade` call. // Otherwise, if we can read-lock it, lock it. if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) { match self.state.compare_exchange_weak( state, state + READ_LOCKED, Ordering::Acquire, Ordering::Relaxed, ) { Ok(_) => return, // Locked! Err(s) => { state = s; continue; } } } // Check for overflow. assert!( !has_reached_max_readers(state), "too many active read locks on RwLock" ); // Make sure the readers waiting bit is set before we go to sleep. if !has_readers_waiting(state) { if let Err(s) = self.state.compare_exchange( state, state | READERS_WAITING, Ordering::Relaxed, Ordering::Relaxed, ) { state = s; continue; } } // Wait for the state to change. self.state.wait(state | READERS_WAITING); has_slept = true; // Spin again after waking up. state = self.spin_read(); } } #[inline] pub fn try_write(&self) -> bool { self.state .fetch_update(Ordering::Acquire, Ordering::Relaxed, |s| { is_unlocked(s).then(|| s + WRITE_LOCKED) }) .is_ok() } #[inline] pub fn write(&self) { if self .state .compare_exchange_weak(0, WRITE_LOCKED, Ordering::Acquire, Ordering::Relaxed) .is_err() { self.write_contended(); } } /// # Safety /// /// The `RwLock` must be write-locked (single writer) in order to call this. #[inline] pub unsafe fn write_unlock(&self) { let state = self.state.fetch_sub(WRITE_LOCKED, Ordering::Release) - WRITE_LOCKED; debug_assert!(is_unlocked(state)); if has_writers_waiting(state) || has_readers_waiting(state) { self.wake_writer_or_readers(state); } } /// # Safety /// /// The `RwLock` must be write-locked (single writer) in order to call this. #[inline] pub unsafe fn downgrade(&self) { // Removes all write bits and adds a single read bit. let state = self.state.fetch_add(DOWNGRADE, Ordering::Release); debug_assert!( is_write_locked(state), "RwLock must be write locked to call `downgrade`" ); if has_readers_waiting(state) { // Since we had the exclusive lock, nobody else can unset this bit. self.state.fetch_sub(READERS_WAITING, Ordering::Relaxed); self.state.wake_all(); } } #[cold] fn write_contended(&self) { let mut state = self.spin_write(); let mut other_writers_waiting = 0; loop { // If it's unlocked, we try to lock it. if is_unlocked(state) { match self.state.compare_exchange_weak( state, state | WRITE_LOCKED | other_writers_waiting, Ordering::Acquire, Ordering::Relaxed, ) { Ok(_) => return, // Locked! Err(s) => { state = s; continue; } } } // Set the waiting bit indicating that we're waiting on it. if !has_writers_waiting(state) { if let Err(s) = self.state.compare_exchange( state, state | WRITERS_WAITING, Ordering::Relaxed, Ordering::Relaxed, ) { state = s; continue; } } // Other writers might be waiting now too, so we should make sure // we keep that bit on once we manage lock it. other_writers_waiting = WRITERS_WAITING; // Examine the notification counter before we check if `state` has changed, // to make sure we don't miss any notifications. let seq = self.writer_notify.load(Ordering::Acquire); // Don't go to sleep if the lock has become available, // or if the writers waiting bit is no longer set. state = self.state.load(Ordering::Relaxed); if is_unlocked(state) || !has_writers_waiting(state) { continue; } // Wait for the state to change. self.writer_notify.wait(seq); // Spin again after waking up. state = self.spin_write(); } } /// Wakes up waiting threads after unlocking. /// /// If both are waiting, this will wake up only one writer, but will fall /// back to waking up readers if there was no writer to wake up. #[cold] fn wake_writer_or_readers(&self, mut state: Primitive) { assert!(is_unlocked(state)); // The readers waiting bit might be turned on at any point now, // since readers will block when there's anything waiting. // Writers will just lock the lock though, regardless of the waiting bits, // so we don't have to worry about the writer waiting bit. // // If the lock gets locked in the meantime, we don't have to do // anything, because then the thread that locked the lock will take // care of waking up waiters when it unlocks. // If only writers are waiting, wake one of them up. if state == WRITERS_WAITING { match self .state .compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed) { Ok(_) => { self.wake_writer(); return; } Err(s) => { // Maybe some readers are now waiting too. So, continue to the next `if`. state = s; } } } // If both writers and readers are waiting, leave the readers waiting // and only wake up one writer. if state == READERS_WAITING + WRITERS_WAITING { if self .state .compare_exchange(state, READERS_WAITING, Ordering::Relaxed, Ordering::Relaxed) .is_err() { // The lock got locked. Not our problem anymore. return; } if self.wake_writer() { return; } // No writers were actually blocked on futex_wait, so we continue // to wake up readers instead, since we can't be sure if we notified a writer. state = READERS_WAITING; } // If readers are waiting, wake them all up. if state == READERS_WAITING && self .state .compare_exchange(state, 0, Ordering::Relaxed, Ordering::Relaxed) .is_ok() { self.state.wake_all(); } } /// This wakes one writer and returns true if we woke up a writer that was /// blocked on futex_wait. /// /// If this returns false, it might still be the case that we notified a /// writer that was about to go to sleep. fn wake_writer(&self) -> bool { self.writer_notify.fetch_add(1, Ordering::Release); self.writer_notify.wake() // Note that FreeBSD and DragonFlyBSD don't tell us whether they woke // up any threads or not, and always return `false` here. That still // results in correct behavior: it just means readers get woken up as // well in case both readers and writers were waiting. } /// Spin for a while, but stop directly at the given condition. #[inline] fn spin_until(&self, f: impl Fn(Primitive) -> bool) -> Primitive { let mut spin = SPIN_COUNT; // Chosen by fair dice roll. loop { let state = self.state.load(Ordering::Relaxed); if f(state) || spin == 0 { return state; } core::hint::spin_loop(); spin -= 1; } } #[inline] fn spin_write(&self) -> Primitive { // Stop spinning when it's unlocked or when there's waiting writers, to keep things somewhat fair. self.spin_until(|state| is_unlocked(state) || has_writers_waiting(state)) } #[inline] fn spin_read(&self) -> Primitive { // Stop spinning when it's unlocked or read locked, or when there's waiting threads. self.spin_until(|state| { !is_write_locked(state) || has_readers_waiting(state) || has_writers_waiting(state) }) } }