Source code

Revision control

Copy as Markdown

Other Tools

#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
use futures::{
future::{pending, ready},
FutureExt,
};
use tokio::runtime;
use tokio::sync::{mpsc, oneshot};
use tokio::task::{self, LocalSet};
use tokio::time;
#[cfg(not(tokio_wasi))]
use std::cell::Cell;
use std::sync::atomic::AtomicBool;
#[cfg(not(tokio_wasi))]
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
#[cfg(not(tokio_wasi))]
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;
#[tokio::test(flavor = "current_thread")]
async fn local_current_thread_scheduler() {
LocalSet::new()
.run_until(async {
task::spawn_local(async {}).await.unwrap();
})
.await;
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
})
.await
.unwrap();
})
.await;
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_threadpool() {
thread_local! {
static ON_LOCAL_THREAD: Cell<bool> = Cell::new(false);
}
ON_LOCAL_THREAD.with(|cell| cell.set(true));
let local = LocalSet::new();
local.spawn_local(async move {
assert!(ON_LOCAL_THREAD.with(|cell| cell.get()));
});
local.await;
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_future_timers() {
static RAN1: AtomicBool = AtomicBool::new(false);
static RAN2: AtomicBool = AtomicBool::new(false);
let local = LocalSet::new();
local.spawn_local(async move {
time::sleep(Duration::from_millis(5)).await;
RAN1.store(true, Ordering::SeqCst);
});
local.spawn_local(async move {
time::sleep(Duration::from_millis(10)).await;
RAN2.store(true, Ordering::SeqCst);
});
local.await;
assert!(RAN1.load(Ordering::SeqCst));
assert!(RAN2.load(Ordering::SeqCst));
}
#[tokio::test]
async fn localset_future_drives_all_local_futs() {
static RAN1: AtomicBool = AtomicBool::new(false);
static RAN2: AtomicBool = AtomicBool::new(false);
static RAN3: AtomicBool = AtomicBool::new(false);
let local = LocalSet::new();
local.spawn_local(async move {
task::spawn_local(async {
task::yield_now().await;
RAN3.store(true, Ordering::SeqCst);
});
task::yield_now().await;
RAN1.store(true, Ordering::SeqCst);
});
local.spawn_local(async move {
task::yield_now().await;
RAN2.store(true, Ordering::SeqCst);
});
local.await;
assert!(RAN1.load(Ordering::SeqCst));
assert!(RAN2.load(Ordering::SeqCst));
assert!(RAN3.load(Ordering::SeqCst));
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_timer() {
// This test ensures that runtime services like the timer are properly
// set for the local task set.
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let join = task::spawn_local(async move {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
time::sleep(Duration::from_millis(10)).await;
assert!(ON_RT_THREAD.with(|cell| cell.get()));
});
join.await.unwrap();
})
.await;
}
#[test]
fn enter_guard_spawn() {
let local = LocalSet::new();
let _guard = local.enter();
// Run the local task set.
let join = task::spawn_local(async { true });
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
local.block_on(&rt, async move {
assert!(join.await.unwrap());
});
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
#[test]
// This will panic, since the thread that calls `block_on` cannot use
// in-place blocking inside of `block_on`.
#[should_panic]
fn local_threadpool_blocking_in_place() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
LocalSet::new().block_on(&rt, async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let join = task::spawn_local(async move {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::block_in_place(|| {});
assert!(ON_RT_THREAD.with(|cell| cell.get()));
});
join.await.unwrap();
});
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn local_threadpool_blocking_run() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let join = task::spawn_local(async move {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_blocking(|| {
assert!(
!ON_RT_THREAD.with(|cell| cell.get()),
"blocking must not run on the local task set's thread"
);
})
.await
.unwrap();
assert!(ON_RT_THREAD.with(|cell| cell.get()));
});
join.await.unwrap();
})
.await;
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn all_spawns_are_local() {
use futures::future;
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
let handles = (0..128)
.map(|_| {
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
})
})
.collect::<Vec<_>>();
for joined in future::join_all(handles).await {
joined.unwrap();
}
})
.await;
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn nested_spawn_is_local() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
LocalSet::new()
.run_until(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
task::spawn_local(async {
assert!(ON_RT_THREAD.with(|cell| cell.get()));
})
.await
.unwrap();
})
.await
.unwrap();
})
.await
.unwrap();
})
.await
.unwrap();
})
.await;
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[test]
fn join_local_future_elsewhere() {
thread_local! {
static ON_RT_THREAD: Cell<bool> = Cell::new(false);
}
ON_RT_THREAD.with(|cell| cell.set(true));
let rt = runtime::Runtime::new().unwrap();
let local = LocalSet::new();
local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
let join = task::spawn_local(async move {
assert!(
ON_RT_THREAD.with(|cell| cell.get()),
"local task must run on local thread, no matter where it is awaited"
);
rx.await.unwrap();
"hello world"
});
let join2 = task::spawn(async move {
assert!(
!ON_RT_THREAD.with(|cell| cell.get()),
"spawned task should be on a worker"
);
tx.send(()).expect("task shouldn't have ended yet");
join.await.expect("task should complete successfully");
});
join2.await.unwrap()
});
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
#[tokio::test(flavor = "multi_thread")]
async fn localset_in_thread_local() {
thread_local! {
static LOCAL_SET: LocalSet = LocalSet::new();
}
// holds runtime thread until end of main fn.
let (_tx, rx) = oneshot::channel::<()>();
let handle = tokio::runtime::Handle::current();
std::thread::spawn(move || {
LOCAL_SET.with(|local_set| {
handle.block_on(local_set.run_until(async move {
let _ = rx.await;
}))
});
});
}
#[test]
fn drop_cancels_tasks() {
use std::rc::Rc;
// This test reproduces issue #1842
let rt = rt();
let rc1 = Rc::new(());
let rc2 = rc1.clone();
let (started_tx, started_rx) = oneshot::channel();
let local = LocalSet::new();
local.spawn_local(async move {
// Move this in
let _rc2 = rc2;
started_tx.send(()).unwrap();
futures::future::pending::<()>().await;
});
local.block_on(&rt, async {
started_rx.await.unwrap();
});
drop(local);
drop(rt);
assert_eq!(1, Rc::strong_count(&rc1));
}
/// Runs a test function in a separate thread, and panics if the test does not
/// complete within the specified timeout, or if the test function panics.
///
/// This is intended for running tests whose failure mode is a hang or infinite
/// loop that cannot be detected otherwise.
fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
use std::sync::mpsc::RecvTimeoutError;
let (done_tx, done_rx) = std::sync::mpsc::channel();
let thread = std::thread::spawn(move || {
f();
// Send a message on the channel so that the test thread can
// determine if we have entered an infinite loop:
done_tx.send(()).unwrap();
});
// Since the failure mode of this test is an infinite loop, rather than
// something we can easily make assertions about, we'll run it in a
// thread. When the test thread finishes, it will send a message on a
// channel to this thread. We'll wait for that message with a fairly
// generous timeout, and if we don't receive it, we assume the test
// thread has hung.
//
// Note that it should definitely complete in under a minute, but just
// in case CI is slow, we'll give it a long timeout.
match done_rx.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => panic!(
"test did not complete within {:?} seconds, \
we have (probably) entered an infinite loop!",
timeout,
),
// Did the test thread panic? We'll find out for sure when we `join`
// with it.
Err(RecvTimeoutError::Disconnected) => {}
// Test completed successfully!
Ok(()) => {}
}
thread.join().expect("test thread should not panic!")
}
#[cfg_attr(tokio_wasi, ignore = "`unwrap()` in `with_timeout()` panics on Wasi")]
#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
with_timeout(Duration::from_secs(60), || {
let (tx, mut rx) = mpsc::channel::<()>(1024);
let rt = rt();
let local = LocalSet::new();
local.spawn_local(async move { while rx.recv().await.is_some() {} });
local.block_on(&rt, async {
time::sleep(Duration::from_millis(1)).await;
});
drop(tx);
// This enters an infinite loop if the remote notified tasks are not
// properly cancelled.
drop(local);
});
}
#[cfg_attr(
tokio_wasi,
ignore = "FIXME: `task::spawn_local().await.unwrap()` panics on Wasi"
)]
#[test]
fn local_tasks_wake_join_all() {
// This test reproduces issue #2460.
with_timeout(Duration::from_secs(60), || {
use futures::future::join_all;
use tokio::task::LocalSet;
let rt = rt();
let set = LocalSet::new();
let mut handles = Vec::new();
for _ in 1..=128 {
handles.push(set.spawn_local(async move {
tokio::task::spawn_local(async move {}).await.unwrap();
}));
}
rt.block_on(set.run_until(join_all(handles)));
});
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
#[test]
fn local_tasks_are_polled_after_tick() {
// This test depends on timing, so we run it up to five times.
for _ in 0..4 {
let res = std::panic::catch_unwind(local_tasks_are_polled_after_tick_inner);
if res.is_ok() {
// success
return;
}
}
// Test failed 4 times. Try one more time without catching panics. If it
// fails again, the test fails.
local_tasks_are_polled_after_tick_inner();
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support panic recovery
#[tokio::main(flavor = "current_thread")]
async fn local_tasks_are_polled_after_tick_inner() {
// Reproduces issues #1899 and #1900
static RX1: AtomicUsize = AtomicUsize::new(0);
static RX2: AtomicUsize = AtomicUsize::new(0);
const EXPECTED: usize = 500;
RX1.store(0, SeqCst);
RX2.store(0, SeqCst);
let (tx, mut rx) = mpsc::unbounded_channel();
let local = LocalSet::new();
local
.run_until(async {
let task2 = task::spawn(async move {
// Wait a bit
time::sleep(Duration::from_millis(10)).await;
let mut oneshots = Vec::with_capacity(EXPECTED);
// Send values
for _ in 0..EXPECTED {
let (oneshot_tx, oneshot_rx) = oneshot::channel();
oneshots.push(oneshot_tx);
tx.send(oneshot_rx).unwrap();
}
time::sleep(Duration::from_millis(10)).await;
for tx in oneshots.drain(..) {
tx.send(()).unwrap();
}
loop {
time::sleep(Duration::from_millis(20)).await;
let rx1 = RX1.load(SeqCst);
let rx2 = RX2.load(SeqCst);
if rx1 == EXPECTED && rx2 == EXPECTED {
break;
}
}
});
while let Some(oneshot) = rx.recv().await {
RX1.fetch_add(1, SeqCst);
task::spawn_local(async move {
oneshot.await.unwrap();
RX2.fetch_add(1, SeqCst);
});
}
task2.await.unwrap();
})
.await;
}
#[tokio::test]
async fn acquire_mutex_in_drop() {
use futures::future::pending;
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let local = LocalSet::new();
local.spawn_local(async move {
let _ = rx2.await;
unreachable!();
});
local.spawn_local(async move {
let _ = rx1.await;
tx2.send(()).unwrap();
unreachable!();
});
// Spawn a task that will never notify
local.spawn_local(async move {
pending::<()>().await;
tx1.send(()).unwrap();
});
// Tick the loop
local
.run_until(async {
task::yield_now().await;
})
.await;
// Drop the LocalSet
drop(local);
}
#[tokio::test]
async fn spawn_wakes_localset() {
let local = LocalSet::new();
futures::select! {
_ = local.run_until(pending::<()>()).fuse() => unreachable!(),
ret = async { local.spawn_local(ready(())).await.unwrap()}.fuse() => ret
}
}
#[test]
fn store_local_set_in_thread_local_with_runtime() {
use tokio::runtime::Runtime;
thread_local! {
static CURRENT: RtAndLocalSet = RtAndLocalSet::new();
}
struct RtAndLocalSet {
rt: Runtime,
local: LocalSet,
}
impl RtAndLocalSet {
fn new() -> RtAndLocalSet {
RtAndLocalSet {
rt: tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap(),
local: LocalSet::new(),
}
}
async fn inner_method(&self) {
self.local
.run_until(async move {
tokio::task::spawn_local(async {});
})
.await
}
fn method(&self) {
self.rt.block_on(self.inner_method());
}
}
CURRENT.with(|f| {
f.method();
});
}
#[cfg(tokio_unstable)]
mod unstable {
use tokio::runtime::UnhandledPanic;
use tokio::task::LocalSet;
#[tokio::test]
#[should_panic(
expected = "a spawned task panicked and the LocalSet is configured to shutdown on unhandled panic"
)]
async fn shutdown_on_panic() {
LocalSet::new()
.unhandled_panic(UnhandledPanic::ShutdownRuntime)
.run_until(async {
tokio::task::spawn_local(async {
panic!("boom");
});
futures::future::pending::<()>().await;
})
.await;
}
// This test compares that, when the task driving `run_until` has already
// consumed budget, the `run_until` future has less budget than a "spawned"
// task.
//
// "Budget" is a fuzzy metric as the Tokio runtime is able to change values
// internally. This is why the test uses indirection to test this.
#[tokio::test]
async fn run_until_does_not_get_own_budget() {
// Consume some budget
tokio::task::consume_budget().await;
LocalSet::new()
.run_until(async {
let spawned = tokio::spawn(async {
let mut spawned_n = 0;
{
let mut spawned = tokio_test::task::spawn(async {
loop {
spawned_n += 1;
tokio::task::consume_budget().await;
}
});
// Poll once
assert!(!spawned.poll().is_ready());
}
spawned_n
});
let mut run_until_n = 0;
{
let mut run_until = tokio_test::task::spawn(async {
loop {
run_until_n += 1;
tokio::task::consume_budget().await;
}
});
// Poll once
assert!(!run_until.poll().is_ready());
}
let spawned_n = spawned.await.unwrap();
assert_ne!(spawned_n, 0);
assert_ne!(run_until_n, 0);
assert!(spawned_n > run_until_n);
})
.await
}
}
fn rt() -> runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}