Source code

Revision control

Copy as Markdown

Other Tools

//! An SPSC broadcast channel.
//!
//! - The value can only be a `usize`.
//! - The consumer is only notified if the value is different.
//! - The value `0` is reserved for closed.
use futures_util::task::AtomicWaker;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::task;
type Value = usize;
pub(crate) const CLOSED: usize = 0;
pub(crate) fn channel(initial: Value) -> (Sender, Receiver) {
debug_assert!(
initial != CLOSED,
"watch::channel initial state of 0 is reserved"
);
let shared = Arc::new(Shared {
value: AtomicUsize::new(initial),
waker: AtomicWaker::new(),
});
(
Sender {
shared: shared.clone(),
},
Receiver { shared },
)
}
pub(crate) struct Sender {
shared: Arc<Shared>,
}
pub(crate) struct Receiver {
shared: Arc<Shared>,
}
struct Shared {
value: AtomicUsize,
waker: AtomicWaker,
}
impl Sender {
pub(crate) fn send(&mut self, value: Value) {
if self.shared.value.swap(value, Ordering::SeqCst) != value {
self.shared.waker.wake();
}
}
}
impl Drop for Sender {
fn drop(&mut self) {
self.send(CLOSED);
}
}
impl Receiver {
pub(crate) fn load(&mut self, cx: &mut task::Context<'_>) -> Value {
self.shared.waker.register(cx.waker());
self.shared.value.load(Ordering::SeqCst)
}
pub(crate) fn peek(&self) -> Value {
self.shared.value.load(Ordering::Relaxed)
}
}