Source code

Revision control

Copy as Markdown

Other Tools

// Copyright 2016 Amanieu d'Antras
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.
use crate::spinwait::SpinWait;
use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
use core::{
cell::Cell,
mem, ptr,
sync::atomic::{fence, AtomicUsize, Ordering},
};
struct ThreadData {
parker: ThreadParker,
// Linked list of threads in the queue. The queue is split into two parts:
// the processed part and the unprocessed part. When new nodes are added to
// the list, they only have the next pointer set, and queue_tail is null.
//
// Nodes are processed with the queue lock held, which consists of setting
// the prev pointer for each node and setting the queue_tail pointer on the
// first processed node of the list.
//
// This setup allows nodes to be added to the queue without a lock, while
// still allowing O(1) removal of nodes from the processed part of the list.
// The only cost is the O(n) processing, but this only needs to be done
// once for each node, and therefore isn't too expensive.
queue_tail: Cell<*const ThreadData>,
prev: Cell<*const ThreadData>,
next: Cell<*const ThreadData>,
}
impl ThreadData {
#[inline]
fn new() -> ThreadData {
assert!(mem::align_of::<ThreadData>() > !QUEUE_MASK);
ThreadData {
parker: ThreadParker::new(),
queue_tail: Cell::new(ptr::null()),
prev: Cell::new(ptr::null()),
next: Cell::new(ptr::null()),
}
}
}
// Invokes the given closure with a reference to the current thread `ThreadData`.
#[inline]
fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
let mut thread_data_ptr = ptr::null();
// If ThreadData is expensive to construct, then we want to use a cached
// version in thread-local storage if possible.
if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
if let Ok(tls_thread_data) = THREAD_DATA.try_with(|x| x as *const ThreadData) {
thread_data_ptr = tls_thread_data;
}
}
// Otherwise just create a ThreadData on the stack
let mut thread_data_storage = None;
if thread_data_ptr.is_null() {
thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
}
f(unsafe { &*thread_data_ptr })
}
const LOCKED_BIT: usize = 1;
const QUEUE_LOCKED_BIT: usize = 2;
const QUEUE_MASK: usize = !3;
// Word-sized lock that is used to implement the parking_lot API. Since this
// can't use parking_lot, it instead manages its own queue of waiting threads.
pub struct WordLock {
state: AtomicUsize,
}
impl WordLock {
/// Returns a new, unlocked, WordLock.
pub const fn new() -> Self {
WordLock {
state: AtomicUsize::new(0),
}
}
#[inline]
pub fn lock(&self) {
if self
.state
.compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
return;
}
self.lock_slow();
}
/// Must not be called on an already unlocked `WordLock`!
#[inline]
pub unsafe fn unlock(&self) {
let state = self.state.fetch_sub(LOCKED_BIT, Ordering::Release);
if state.is_queue_locked() || state.queue_head().is_null() {
return;
}
self.unlock_slow();
}
#[cold]
fn lock_slow(&self) {
let mut spinwait = SpinWait::new();
let mut state = self.state.load(Ordering::Relaxed);
loop {
// Grab the lock if it isn't locked, even if there is a queue on it
if !state.is_locked() {
match self.state.compare_exchange_weak(
state,
state | LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(x) => state = x,
}
continue;
}
// If there is no queue, try spinning a few times
if state.queue_head().is_null() && spinwait.spin() {
state = self.state.load(Ordering::Relaxed);
continue;
}
// Get our thread data and prepare it for parking
state = with_thread_data(|thread_data| {
// The pthread implementation is still unsafe, so we need to surround `prepare_park`
// with `unsafe {}`.
#[allow(unused_unsafe)]
unsafe {
thread_data.parker.prepare_park();
}
// Add our thread to the front of the queue
let queue_head = state.queue_head();
if queue_head.is_null() {
thread_data.queue_tail.set(thread_data);
thread_data.prev.set(ptr::null());
} else {
thread_data.queue_tail.set(ptr::null());
thread_data.prev.set(ptr::null());
thread_data.next.set(queue_head);
}
if let Err(x) = self.state.compare_exchange_weak(
state,
state.with_queue_head(thread_data),
Ordering::AcqRel,
Ordering::Relaxed,
) {
return x;
}
// Sleep until we are woken up by an unlock
// Ignoring unused unsafe, since it's only a few platforms where this is unsafe.
#[allow(unused_unsafe)]
unsafe {
thread_data.parker.park();
}
// Loop back and try locking again
spinwait.reset();
self.state.load(Ordering::Relaxed)
});
}
}
#[cold]
fn unlock_slow(&self) {
let mut state = self.state.load(Ordering::Relaxed);
loop {
// We just unlocked the WordLock. Just check if there is a thread
// to wake up. If the queue is locked then another thread is already
// taking care of waking up a thread.
if state.is_queue_locked() || state.queue_head().is_null() {
return;
}
// Try to grab the queue lock
match self.state.compare_exchange_weak(
state,
state | QUEUE_LOCKED_BIT,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => state = x,
}
}
// Now we have the queue lock and the queue is non-empty
'outer: loop {
// First, we need to fill in the prev pointers for any newly added
// threads. We do this until we reach a node that we previously
// processed, which has a non-null queue_tail pointer.
let queue_head = state.queue_head();
let mut queue_tail;
let mut current = queue_head;
loop {
queue_tail = unsafe { (*current).queue_tail.get() };
if !queue_tail.is_null() {
break;
}
unsafe {
let next = (*current).next.get();
(*next).prev.set(current);
current = next;
}
}
// Set queue_tail on the queue head to indicate that the whole list
// has prev pointers set correctly.
unsafe {
(*queue_head).queue_tail.set(queue_tail);
}
// If the WordLock is locked, then there is no point waking up a
// thread now. Instead we let the next unlocker take care of waking
// up a thread.
if state.is_locked() {
match self.state.compare_exchange_weak(
state,
state & !QUEUE_LOCKED_BIT,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(x) => state = x,
}
// Need an acquire fence before reading the new queue
fence_acquire(&self.state);
continue;
}
// Remove the last thread from the queue and unlock the queue
let new_tail = unsafe { (*queue_tail).prev.get() };
if new_tail.is_null() {
loop {
match self.state.compare_exchange_weak(
state,
state & LOCKED_BIT,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => state = x,
}
// If the compare_exchange failed because a new thread was
// added to the queue then we need to re-scan the queue to
// find the previous element.
if state.queue_head().is_null() {
continue;
} else {
// Need an acquire fence before reading the new queue
fence_acquire(&self.state);
continue 'outer;
}
}
} else {
unsafe {
(*queue_head).queue_tail.set(new_tail);
}
self.state.fetch_and(!QUEUE_LOCKED_BIT, Ordering::Release);
}
// Finally, wake up the thread we removed from the queue. Note that
// we don't need to worry about any races here since the thread is
// guaranteed to be sleeping right now and we are the only one who
// can wake it up.
unsafe {
(*queue_tail).parker.unpark_lock().unpark();
}
break;
}
}
}
// Thread-Sanitizer only has partial fence support, so when running under it, we
// try and avoid false positives by using a discarded acquire load instead.
#[inline]
fn fence_acquire(a: &AtomicUsize) {
if cfg!(tsan_enabled) {
let _ = a.load(Ordering::Acquire);
} else {
fence(Ordering::Acquire);
}
}
trait LockState {
fn is_locked(self) -> bool;
fn is_queue_locked(self) -> bool;
fn queue_head(self) -> *const ThreadData;
fn with_queue_head(self, thread_data: *const ThreadData) -> Self;
}
impl LockState for usize {
#[inline]
fn is_locked(self) -> bool {
self & LOCKED_BIT != 0
}
#[inline]
fn is_queue_locked(self) -> bool {
self & QUEUE_LOCKED_BIT != 0
}
#[inline]
fn queue_head(self) -> *const ThreadData {
(self & QUEUE_MASK) as *const ThreadData
}
#[inline]
fn with_queue_head(self, thread_data: *const ThreadData) -> Self {
(self & !QUEUE_MASK) | thread_data as *const _ as usize
}
}