Source code

Revision control

Copy as Markdown

Other Tools

#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
//! # Implementation Details.
//!
//! The semaphore is implemented using an intrusive linked list of waiters. An
//! atomic counter tracks the number of available permits. If the semaphore does
//! not contain the required number of permits, the task attempting to acquire
//! permits places its waker at the end of a queue. When new permits are made
//! available (such as by releasing an initial acquisition), they are assigned
//! to the task at the front of the queue, waking that task if its requested
//! number of permits is met.
//!
//! Because waiters are enqueued at the back of the linked list and dequeued
//! from the front, the semaphore is fair. Tasks trying to acquire large numbers
//! of permits at a time will always be woken eventually, even if many other
//! tasks are acquiring smaller numbers of permits. This means that in a
//! use-case like tokio's read-write lock, writers will not be starved by
//! readers.
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::{Mutex, MutexGuard};
use crate::util::linked_list::{self, LinkedList};
#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;
use crate::util::WakeList;
use std::future::Future;
use std::marker::PhantomPinned;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering::*;
use std::task::Poll::*;
use std::task::{Context, Poll, Waker};
use std::{cmp, fmt};
/// An asynchronous counting semaphore which permits waiting on multiple permits at once.
pub(crate) struct Semaphore {
waiters: Mutex<Waitlist>,
/// The current number of available permits in the semaphore.
permits: AtomicUsize,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span,
}
struct Waitlist {
queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
closed: bool,
}
/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
#[derive(Debug, PartialEq, Eq)]
pub enum TryAcquireError {
/// The semaphore has been [closed] and cannot issue new permits.
///
/// [closed]: crate::sync::Semaphore::close
Closed,
/// The semaphore has no available permits.
NoPermits,
}
/// Error returned from the [`Semaphore::acquire`] function.
///
/// An `acquire` operation can only fail if the semaphore has been
/// [closed].
///
/// [closed]: crate::sync::Semaphore::close
/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
#[derive(Debug)]
pub struct AcquireError(());
pub(crate) struct Acquire<'a> {
node: Waiter,
semaphore: &'a Semaphore,
num_permits: u32,
queued: bool,
}
/// An entry in the wait queue.
struct Waiter {
/// The current state of the waiter.
///
/// This is either the number of remaining permits required by
/// the waiter, or a flag indicating that the waiter is not yet queued.
state: AtomicUsize,
/// The waker to notify the task awaiting permits.
///
/// # Safety
///
/// This may only be accessed while the wait queue is locked.
waker: UnsafeCell<Option<Waker>>,
/// Intrusive linked-list pointers.
///
/// # Safety
///
/// This may only be accessed while the wait queue is locked.
///
/// TODO: Ideally, we would be able to use loom to enforce that
/// this isn't accessed concurrently. However, it is difficult to
/// use a `UnsafeCell` here, since the `Link` trait requires _returning_
/// references to `Pointers`, and `UnsafeCell` requires that checked access
/// take place inside a closure. We should consider changing `Pointers` to
/// use `UnsafeCell` internally.
pointers: linked_list::Pointers<Waiter>,
#[cfg(all(tokio_unstable, feature = "tracing"))]
ctx: trace::AsyncOpTracingCtx,
/// Should not be `Unpin`.
_p: PhantomPinned,
}
generate_addr_of_methods! {
impl<> Waiter {
unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
&self.pointers
}
}
}
impl Semaphore {
/// The maximum number of permits which a semaphore can hold.
///
/// Note that this reserves three bits of flags in the permit counter, but
/// we only actually use one of them. However, the previous semaphore
/// implementation used three bits, so we will continue to reserve them to
/// avoid a breaking change if additional flags need to be added in the
/// future.
pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
const CLOSED: usize = 1;
// The least-significant bit in the number of permits is reserved to use
// as a flag indicating that the semaphore has been closed. Consequently
// PERMIT_SHIFT is used to leave that bit for that purpose.
const PERMIT_SHIFT: usize = 1;
/// Creates a new semaphore with the initial number of permits
///
/// Maximum number of permits on 32-bit platforms is `1<<29`.
pub(crate) fn new(permits: usize) -> Self {
assert!(
permits <= Self::MAX_PERMITS,
"a semaphore may not have more than MAX_PERMITS permits ({})",
Self::MAX_PERMITS
);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Semaphore",
kind = "Sync",
is_internal = true
);
resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
permits = permits,
permits.op = "override",
)
});
resource_span
};
Self {
permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
waiters: Mutex::new(Waitlist {
queue: LinkedList::new(),
closed: false,
}),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}
/// Creates a new semaphore with the initial number of permits.
///
/// Maximum number of permits on 32-bit platforms is `1<<29`.
///
/// If the specified number of permits exceeds the maximum permit amount
/// Then the value will get clamped to the maximum number of permits.
#[cfg(all(feature = "parking_lot", not(all(loom, test))))]
pub(crate) const fn const_new(mut permits: usize) -> Self {
// NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925
// currently we just clamp the permit count when it exceeds the max
permits &= Self::MAX_PERMITS;
Self {
permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
waiters: Mutex::const_new(Waitlist {
queue: LinkedList::new(),
closed: false,
}),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: tracing::Span::none(),
}
}
/// Returns the current number of available permits.
pub(crate) fn available_permits(&self) -> usize {
self.permits.load(Acquire) >> Self::PERMIT_SHIFT
}
/// Adds `added` new permits to the semaphore.
///
/// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
pub(crate) fn release(&self, added: usize) {
if added == 0 {
return;
}
// Assign permits to the wait queue
self.add_permits_locked(added, self.waiters.lock());
}
/// Closes the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters.
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock();
// If the semaphore's permits counter has enough permits for an
// unqueued waiter to acquire all the permits it needs immediately,
// it won't touch the wait list. Therefore, we have to set a bit on
// the permit counter as well. However, we must do this while
// holding the lock --- otherwise, if we set the bit and then wait
// to acquire the lock we'll enter an inconsistent state where the
// permit counter is closed, but the wait list is not.
self.permits.fetch_or(Self::CLOSED, Release);
waiters.closed = true;
while let Some(mut waiter) = waiters.queue.pop_back() {
let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
if let Some(waker) = waker {
waker.wake();
}
}
}
/// Returns true if the semaphore is closed.
pub(crate) fn is_closed(&self) -> bool {
self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
}
pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
assert!(
num_permits as usize <= Self::MAX_PERMITS,
"a semaphore may not have more than MAX_PERMITS permits ({})",
Self::MAX_PERMITS
);
let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
let mut curr = self.permits.load(Acquire);
loop {
// Has the semaphore closed?
if curr & Self::CLOSED == Self::CLOSED {
return Err(TryAcquireError::Closed);
}
// Are there enough permits remaining?
if curr < num_permits {
return Err(TryAcquireError::NoPermits);
}
let next = curr - num_permits;
match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
Ok(_) => {
// TODO: Instrument once issue has been solved
return Ok(());
}
Err(actual) => curr = actual,
}
}
}
pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
Acquire::new(self, num_permits)
}
/// Release `rem` permits to the semaphore's wait list, starting from the
/// end of the queue.
///
/// If `rem` exceeds the number of permits needed by the wait list, the
/// remainder are assigned back to the semaphore.
fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
let mut wakers = WakeList::new();
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
'inner: while wakers.can_push() {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
Some(waiter) => {
if !waiter.assign_permits(&mut rem) {
break 'inner;
}
}
None => {
is_empty = true;
// If we assigned permits to all the waiters in the queue, and there are
// still permits left over, assign them back to the semaphore.
break 'inner;
}
};
let mut waiter = waiters.queue.pop_back().unwrap();
if let Some(waker) =
unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
{
wakers.push(waker);
}
}
if rem > 0 && is_empty {
let permits = rem;
assert!(
permits <= Self::MAX_PERMITS,
"cannot add more than MAX_PERMITS permits ({})",
Self::MAX_PERMITS
);
let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
let prev = prev >> Self::PERMIT_SHIFT;
assert!(
prev + permits <= Self::MAX_PERMITS,
"number of added permits ({}) would overflow MAX_PERMITS ({})",
rem,
Self::MAX_PERMITS
);
// add remaining permits back
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
permits = rem,
permits.op = "add",
)
});
rem = 0;
}
drop(waiters); // release the lock
wakers.wake_all();
}
assert_eq!(rem, 0);
}
fn poll_acquire(
&self,
cx: &mut Context<'_>,
num_permits: u32,
node: Pin<&mut Waiter>,
queued: bool,
) -> Poll<Result<(), AcquireError>> {
let mut acquired = 0;
let needed = if queued {
node.state.load(Acquire) << Self::PERMIT_SHIFT
} else {
(num_permits as usize) << Self::PERMIT_SHIFT
};
let mut lock = None;
// First, try to take the requested number of permits from the
// semaphore.
let mut curr = self.permits.load(Acquire);
let mut waiters = loop {
// Has the semaphore closed?
if curr & Self::CLOSED > 0 {
return Ready(Err(AcquireError::closed()));
}
let mut remaining = 0;
let total = curr
.checked_add(acquired)
.expect("number of permits must not overflow");
let (next, acq) = if total >= needed {
let next = curr - (needed - acquired);
(next, needed >> Self::PERMIT_SHIFT)
} else {
remaining = (needed - acquired) - curr;
(0, curr >> Self::PERMIT_SHIFT)
};
if remaining > 0 && lock.is_none() {
// No permits were immediately available, so this permit will
// (probably) need to wait. We'll need to acquire a lock on the
// wait queue before continuing. We need to do this _before_ the
// CAS that sets the new value of the semaphore's `permits`
// counter. Otherwise, if we subtract the permits and then
// acquire the lock, we might miss additional permits being
// added while waiting for the lock.
lock = Some(self.waiters.lock());
}
match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
Ok(_) => {
acquired += acq;
if remaining == 0 {
if !queued {
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
permits = acquired,
permits.op = "sub",
);
tracing::trace!(
target: "runtime::resource::async_op::state_update",
permits_obtained = acquired,
permits.op = "add",
)
});
return Ready(Ok(()));
} else if lock.is_none() {
break self.waiters.lock();
}
}
break lock.expect("lock must be acquired before waiting");
}
Err(actual) => curr = actual,
}
};
if waiters.closed {
return Ready(Err(AcquireError::closed()));
}
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
permits = acquired,
permits.op = "sub",
)
});
if node.assign_permits(&mut acquired) {
self.add_permits_locked(acquired, waiters);
return Ready(Ok(()));
}
assert_eq!(acquired, 0);
let mut old_waker = None;
// Otherwise, register the waker & enqueue the node.
node.waker.with_mut(|waker| {
// Safety: the wait list is locked, so we may modify the waker.
let waker = unsafe { &mut *waker };
// Do we need to register the new waker?
if waker
.as_ref()
.map(|waker| !waker.will_wake(cx.waker()))
.unwrap_or(true)
{
old_waker = std::mem::replace(waker, Some(cx.waker().clone()));
}
});
// If the waiter is not already in the wait queue, enqueue it.
if !queued {
let node = unsafe {
let node = Pin::into_inner_unchecked(node) as *mut _;
NonNull::new_unchecked(node)
};
waiters.queue.push_front(node);
}
drop(waiters);
drop(old_waker);
Pending
}
}
impl fmt::Debug for Semaphore {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Semaphore")
.field("permits", &self.available_permits())
.finish()
}
}
impl Waiter {
fn new(
num_permits: u32,
#[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx,
) -> Self {
Waiter {
waker: UnsafeCell::new(None),
state: AtomicUsize::new(num_permits as usize),
pointers: linked_list::Pointers::new(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
ctx,
_p: PhantomPinned,
}
}
/// Assign permits to the waiter.
///
/// Returns `true` if the waiter should be removed from the queue
fn assign_permits(&self, n: &mut usize) -> bool {
let mut curr = self.state.load(Acquire);
loop {
let assign = cmp::min(curr, *n);
let next = curr - assign;
match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
Ok(_) => {
*n -= assign;
#[cfg(all(tokio_unstable, feature = "tracing"))]
self.ctx.async_op_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::async_op::state_update",
permits_obtained = assign,
permits.op = "add",
);
});
return next == 0;
}
Err(actual) => curr = actual,
}
}
}
}
impl Future for Acquire<'_> {
type Output = Result<(), AcquireError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _resource_span = self.node.ctx.resource_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _async_op_span = self.node.ctx.async_op_span.clone().entered();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered();
let (node, semaphore, needed, queued) = self.project();
// First, ensure the current task has enough budget to proceed.
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_acquire",
crate::runtime::coop::poll_proceed(cx),
));
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
Pending => {
*queued = true;
Pending
}
Ready(r) => {
coop.made_progress();
r?;
*queued = false;
Ready(Ok(()))
}
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
return trace_poll_op!("poll_acquire", result);
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
return result;
}
}
impl<'a> Acquire<'a> {
fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self {
#[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
return Self {
node: Waiter::new(num_permits),
semaphore,
num_permits,
queued: false,
};
#[cfg(all(tokio_unstable, feature = "tracing"))]
return semaphore.resource_span.in_scope(|| {
let async_op_span =
tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new");
let async_op_poll_span = async_op_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::async_op::state_update",
permits_requested = num_permits,
permits.op = "override",
);
tracing::trace!(
target: "runtime::resource::async_op::state_update",
permits_obtained = 0usize,
permits.op = "override",
);
tracing::trace_span!("runtime.resource.async_op.poll")
});
let ctx = trace::AsyncOpTracingCtx {
async_op_span,
async_op_poll_span,
resource_span: semaphore.resource_span.clone(),
};
Self {
node: Waiter::new(num_permits, ctx),
semaphore,
num_permits,
queued: false,
}
});
}
fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) {
fn is_unpin<T: Unpin>() {}
unsafe {
// Safety: all fields other than `node` are `Unpin`
is_unpin::<&Semaphore>();
is_unpin::<&mut bool>();
is_unpin::<u32>();
let this = self.get_unchecked_mut();
(
Pin::new_unchecked(&mut this.node),
this.semaphore,
this.num_permits,
&mut this.queued,
)
}
}
}
impl Drop for Acquire<'_> {
fn drop(&mut self) {
// If the future is completed, there is no node in the wait list, so we
// can skip acquiring the lock.
if !self.queued {
return;
}
// This is where we ensure safety. The future is being dropped,
// which means we must ensure that the waiter entry is no longer stored
// in the linked list.
let mut waiters = self.semaphore.waiters.lock();
// remove the entry from the list
let node = NonNull::from(&mut self.node);
// Safety: we have locked the wait list.
unsafe { waiters.queue.remove(node) };
let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire);
if acquired_permits > 0 {
self.semaphore.add_permits_locked(acquired_permits, waiters);
}
}
}
// Safety: the `Acquire` future is not `Sync` automatically because it contains
// a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
// `UnsafeCell` is only accessed when the future is borrowed mutably (either in
// `poll` or in `drop`). Therefore, it is safe (although not particularly
// _useful_) for the future to be borrowed immutably across threads.
unsafe impl Sync for Acquire<'_> {}
// ===== impl AcquireError ====
impl AcquireError {
fn closed() -> AcquireError {
AcquireError(())
}
}
impl fmt::Display for AcquireError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "semaphore closed")
}
}
impl std::error::Error for AcquireError {}
// ===== impl TryAcquireError =====
impl TryAcquireError {
/// Returns `true` if the error was caused by a closed semaphore.
#[allow(dead_code)] // may be used later!
pub(crate) fn is_closed(&self) -> bool {
matches!(self, TryAcquireError::Closed)
}
/// Returns `true` if the error was caused by calling `try_acquire` on a
/// semaphore with no available permits.
#[allow(dead_code)] // may be used later!
pub(crate) fn is_no_permits(&self) -> bool {
matches!(self, TryAcquireError::NoPermits)
}
}
impl fmt::Display for TryAcquireError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TryAcquireError::Closed => write!(fmt, "semaphore closed"),
TryAcquireError::NoPermits => write!(fmt, "no permits available"),
}
}
}
impl std::error::Error for TryAcquireError {}
/// # Safety
///
/// `Waiter` is forced to be !Unpin.
unsafe impl linked_list::Link for Waiter {
type Handle = NonNull<Waiter>;
type Target = Waiter;
fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
*handle
}
unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
ptr
}
unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
Waiter::addr_of_pointers(target)
}
}