Name Description Size
barrier.rs 7130
batch_semaphore.rs # Implementation Details. The semaphore is implemented using an intrusive linked list of waiters. An atomic counter tracks the number of available permits. If the semaphore does not contain the required number of permits, the task attempting to acquire permits places its waker at the end of a queue. When new permits are made available (such as by releasing an initial acquisition), they are assigned to the task at the front of the queue, waking that task if its requested number of permits is met. Because waiters are enqueued at the back of the linked list and dequeued from the front, the semaphore is fair. Tasks trying to acquire large numbers of permits at a time will always be woken eventually, even if many other tasks are acquiring smaller numbers of permits. This means that in a use-case like tokio's read-write lock, writers will not be starved by readers. 25941
broadcast.rs A multi-producer, multi-consumer broadcast queue. Each sent value is seen by all consumers. A [`Sender`] is used to broadcast values to **all** connected [`Receiver`] values. [`Sender`] handles are clone-able, allowing concurrent send and receive actions. [`Sender`] and [`Receiver`] are both `Send` and `Sync` as long as `T` is `Send`. When a value is sent, **all** [`Receiver`] handles are notified and will receive the value. The value is stored once inside the channel and cloned on demand for each receiver. Once all receivers have received a clone of the value, the value is released from the channel. A channel is created by calling [`channel`], specifying the maximum number of messages the channel can retain at any given time. New [`Receiver`] handles are created by calling [`Sender::subscribe`]. The returned [`Receiver`] will receive values sent **after** the call to `subscribe`. This channel is also suitable for the single-producer multi-consumer use-case, where a single sender broadcasts values to many receivers. ## Lagging As sent messages must be retained until **all** [`Receiver`] handles receive a clone, broadcast channels are susceptible to the "slow receiver" problem. In this case, all but one receiver are able to receive values at the rate they are sent. Because one receiver is stalled, the channel starts to fill up. This broadcast channel implementation handles this case by setting a hard upper bound on the number of values the channel may retain at any given time. This upper bound is passed to the [`channel`] function as an argument. If a value is sent when the channel is at capacity, the oldest value currently held by the channel is released. This frees up space for the new value. Any receiver that has not yet seen the released value will return [`RecvError::Lagged`] the next time [`recv`] is called. Once [`RecvError::Lagged`] is returned, the lagging receiver's position is updated to the oldest value contained by the channel. The next call to [`recv`] will return this value. This behavior enables a receiver to detect when it has lagged so far behind that data has been dropped. The caller may decide how to respond to this: either by aborting its task or by tolerating lost messages and resuming consumption of the channel. ## Closing When **all** [`Sender`] handles have been dropped, no new values may be sent. At this point, the channel is "closed". Once a receiver has received all values retained by the channel, the next call to [`recv`] will return with [`RecvError::Closed`]. When a [`Receiver`] handle is dropped, any messages not read by the receiver will be marked as read. If this receiver was the only one not to have read that message, the message will be dropped at this point. [`Sender`]: crate::sync::broadcast::Sender [`Sender::subscribe`]: crate::sync::broadcast::Sender::subscribe [`Receiver`]: crate::sync::broadcast::Receiver [`channel`]: crate::sync::broadcast::channel [`RecvError::Lagged`]: crate::sync::broadcast::error::RecvError::Lagged [`RecvError::Closed`]: crate::sync::broadcast::error::RecvError::Closed [`recv`]: crate::sync::broadcast::Receiver::recv # Examples Basic usage ``` use tokio::sync::broadcast; #[tokio::main] async fn main() { let (tx, mut rx1) = broadcast::channel(16); let mut rx2 = tx.subscribe(); tokio::spawn(async move { assert_eq!(rx1.recv().await.unwrap(), 10); assert_eq!(rx1.recv().await.unwrap(), 20); }); tokio::spawn(async move { assert_eq!(rx2.recv().await.unwrap(), 10); assert_eq!(rx2.recv().await.unwrap(), 20); }); tx.send(10).unwrap(); tx.send(20).unwrap(); } ``` Handling lag ``` use tokio::sync::broadcast; #[tokio::main] async fn main() { let (tx, mut rx) = broadcast::channel(2); tx.send(10).unwrap(); tx.send(20).unwrap(); tx.send(30).unwrap(); // The receiver lagged behind assert!(rx.recv().await.is_err()); // At this point, we can abort or continue with lost messages assert_eq!(20, rx.recv().await.unwrap()); assert_eq!(30, rx.recv().await.unwrap()); } ``` 43170
mod.rs 17130
mpsc
mutex.rs 45115
notify.rs 42669
once_cell.rs 15757
oneshot.rs A one-shot channel is used for sending a single message between asynchronous tasks. The [`channel`] function is used to create a [`Sender`] and [`Receiver`] handle pair that form the channel. The `Sender` handle is used by the producer to send the value. The `Receiver` handle is used by the consumer to receive the value. Each handle can be used on separate tasks. Since the `send` method is not async, it can be used anywhere. This includes sending between two runtimes, and using it from non-async code. If the [`Receiver`] is closed before receiving a message which has already been sent, the message will remain in the channel until the receiver is dropped, at which point the message will be dropped immediately. # Examples ``` use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { if let Err(_) = tx.send(3) { println!("the receiver dropped"); } }); match rx.await { Ok(v) => println!("got = {:?}", v), Err(_) => println!("the sender dropped"), } } ``` If the sender is dropped without sending, the receiver will fail with [`error::RecvError`]: ``` use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx, rx) = oneshot::channel::<u32>(); tokio::spawn(async move { drop(tx); }); match rx.await { Ok(_) => panic!("This doesn't happen"), Err(_) => println!("the sender dropped"), } } ``` To use a oneshot channel in a `tokio::select!` loop, add `&mut` in front of the channel. ``` use tokio::sync::oneshot; use tokio::time::{interval, sleep, Duration}; #[tokio::main] # async fn _doc() {} # #[tokio::main(flavor = "current_thread", start_paused = true)] async fn main() { let (send, mut recv) = oneshot::channel(); let mut interval = interval(Duration::from_millis(100)); # let handle = tokio::spawn(async move { sleep(Duration::from_secs(1)).await; send.send("shut down").unwrap(); }); loop { tokio::select! { _ = interval.tick() => println!("Another 100ms"), msg = &mut recv => { println!("Got message: {}", msg.unwrap()); break; } } } # handle.await.unwrap(); } ``` To use a `Sender` from a destructor, put it in an [`Option`] and call [`Option::take`]. ``` use tokio::sync::oneshot; struct SendOnDrop { sender: Option<oneshot::Sender<&'static str>>, } impl Drop for SendOnDrop { fn drop(&mut self) { if let Some(sender) = self.sender.take() { // Using `let _ =` to ignore send errors. let _ = sender.send("I got dropped!"); } } } #[tokio::main] # async fn _doc() {} # #[tokio::main(flavor = "current_thread")] async fn main() { let (send, recv) = oneshot::channel(); let send_on_drop = SendOnDrop { sender: Some(send) }; drop(send_on_drop); assert_eq!(recv.await, Ok("I got dropped!")); } ``` 42481
rwlock
rwlock.rs 37410
semaphore.rs 22812
task
tests
watch.rs A single-producer, multi-consumer channel that only retains the *last* sent value. This channel is useful for watching for changes to a value from multiple points in the code base, for example, changes to configuration values. # Usage [`channel`] returns a [`Sender`] / [`Receiver`] pair. These are the producer and consumer halves of the channel. The channel is created with an initial value. The **latest** value stored in the channel is accessed with [`Receiver::borrow()`]. Awaiting [`Receiver::changed()`] waits for a new value to be sent by the [`Sender`] half. # Examples ``` use tokio::sync::watch; # async fn dox() -> Result<(), Box<dyn std::error::Error>> { let (tx, mut rx) = watch::channel("hello"); tokio::spawn(async move { while rx.changed().await.is_ok() { println!("received = {:?}", *rx.borrow()); } }); tx.send("world")?; # Ok(()) # } ``` # Closing [`Sender::is_closed`] and [`Sender::closed`] allow the producer to detect when all [`Receiver`] handles have been dropped. This indicates that there is no further interest in the values being produced and work can be stopped. The value in the channel will not be dropped until the sender and all receivers have been dropped. # Thread safety Both [`Sender`] and [`Receiver`] are thread safe. They can be moved to other threads and can be used in a concurrent environment. Clones of [`Receiver`] handles may be moved to separate threads and also used concurrently. [`Sender`]: crate::sync::watch::Sender [`Receiver`]: crate::sync::watch::Receiver [`Receiver::changed()`]: crate::sync::watch::Receiver::changed [`Receiver::borrow()`]: crate::sync::watch::Receiver::borrow [`channel`]: crate::sync::watch::channel [`Sender::is_closed`]: crate::sync::watch::Sender::is_closed [`Sender::closed`]: crate::sync::watch::Sender::closed 41366