Source code

Revision control

Copy as Markdown

Other Tools

#![allow(clippy::needless_range_loop)]
#![warn(rust_2018_idioms)]
#![cfg(feature = "full")]
// Tests to run on both current-thread & multi-thread runtime variants.
macro_rules! rt_test {
($($t:tt)*) => {
mod current_thread_scheduler {
$($t)*
#[cfg(not(target_os="wasi"))]
const NUM_WORKERS: usize = 1;
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.into()
}
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
mod threaded_scheduler_4_threads {
$($t)*
const NUM_WORKERS: usize = 4;
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap()
.into()
}
}
#[cfg(not(tokio_wasi))] // Wasi doesn't support threads
mod threaded_scheduler_1_thread {
$($t)*
const NUM_WORKERS: usize = 1;
fn rt() -> Arc<Runtime> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap()
.into()
}
}
}
}
#[test]
fn send_sync_bound() {
use tokio::runtime::Runtime;
fn is_send<T: Send + Sync>() {}
is_send::<Runtime>();
}
rt_test! {
#[cfg(not(target_os="wasi"))]
use tokio::net::{TcpListener, TcpStream};
#[cfg(not(target_os="wasi"))]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::{task, time};
#[cfg(not(target_os="wasi"))]
use tokio_test::assert_err;
use tokio_test::assert_ok;
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
#[cfg(not(target_os="wasi"))]
use std::sync::mpsc;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(not(target_os="wasi"))]
use std::thread;
use std::time::{Duration, Instant};
#[test]
fn block_on_sync() {
let rt = rt();
let mut win = false;
rt.block_on(async {
win = true;
});
assert!(win);
}
#[cfg(not(target_os="wasi"))]
#[test]
fn block_on_async() {
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_one_bg() {
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send("ZOMG").unwrap();
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_one_join() {
let rt = rt();
let out = rt.block_on(async {
let (tx, rx) = oneshot::channel();
let handle = tokio::spawn(async move {
tx.send("ZOMG").unwrap();
"DONE"
});
let msg = assert_ok!(rx.await);
let out = assert_ok!(handle.await);
assert_eq!(out, "DONE");
msg
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_two() {
let rt = rt();
let out = rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
tokio::spawn(async move {
assert_ok!(tx1.send("ZOMG"));
});
tokio::spawn(async move {
let msg = assert_ok!(rx1.await);
assert_ok!(tx2.send(msg));
});
assert_ok!(rx2.await)
});
assert_eq!(out, "ZOMG");
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_many_from_block_on() {
use tokio::sync::mpsc;
const ITER: usize = 200;
let rt = rt();
let out = rt.block_on(async {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
let done_tx = done_tx.clone();
tokio::spawn(async move {
let msg = assert_ok!(rx.await);
assert_eq!(i, msg);
assert_ok!(done_tx.send(msg));
});
tx
})
.collect::<Vec<_>>();
drop(done_tx);
thread::spawn(move || {
for (i, tx) in txs.drain(..).enumerate() {
assert_ok!(tx.send(i));
}
});
let mut out = vec![];
while let Some(i) = done_rx.recv().await {
out.push(i);
}
out.sort_unstable();
out
});
assert_eq!(ITER, out.len());
for i in 0..ITER {
assert_eq!(i, out[i]);
}
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_many_from_task() {
use tokio::sync::mpsc;
const ITER: usize = 500;
let rt = rt();
let out = rt.block_on(async {
tokio::spawn(async move {
let (done_tx, mut done_rx) = mpsc::unbounded_channel();
let mut txs = (0..ITER)
.map(|i| {
let (tx, rx) = oneshot::channel();
let done_tx = done_tx.clone();
tokio::spawn(async move {
let msg = assert_ok!(rx.await);
assert_eq!(i, msg);
assert_ok!(done_tx.send(msg));
});
tx
})
.collect::<Vec<_>>();
drop(done_tx);
thread::spawn(move || {
for (i, tx) in txs.drain(..).enumerate() {
assert_ok!(tx.send(i));
}
});
let mut out = vec![];
while let Some(i) = done_rx.recv().await {
out.push(i);
}
out.sort_unstable();
out
}).await.unwrap()
});
assert_eq!(ITER, out.len());
for i in 0..ITER {
assert_eq!(i, out[i]);
}
}
#[test]
fn spawn_one_from_block_on_called_on_handle() {
let rt = rt();
let (tx, rx) = oneshot::channel();
#[allow(clippy::async_yields_async)]
let handle = rt.handle().block_on(async {
tokio::spawn(async move {
tx.send("ZOMG").unwrap();
"DONE"
})
});
let out = rt.block_on(async {
let msg = assert_ok!(rx.await);
let out = assert_ok!(handle.await);
assert_eq!(out, "DONE");
msg
});
assert_eq!(out, "ZOMG");
}
#[test]
fn spawn_await_chain() {
let rt = rt();
let out = rt.block_on(async {
assert_ok!(tokio::spawn(async {
assert_ok!(tokio::spawn(async {
"hello"
}).await)
}).await)
});
assert_eq!(out, "hello");
}
#[test]
fn outstanding_tasks_dropped() {
let rt = rt();
let cnt = Arc::new(());
rt.block_on(async {
let cnt = cnt.clone();
tokio::spawn(poll_fn(move |_| {
assert_eq!(2, Arc::strong_count(&cnt));
Poll::<()>::Pending
}));
});
assert_eq!(2, Arc::strong_count(&cnt));
drop(rt);
assert_eq!(1, Arc::strong_count(&cnt));
}
#[test]
#[should_panic]
fn nested_rt() {
let rt1 = rt();
let rt2 = rt();
rt1.block_on(async { rt2.block_on(async { "hello" }) });
}
#[test]
fn create_rt_in_block_on() {
let rt1 = rt();
let rt2 = rt1.block_on(async { rt() });
let out = rt2.block_on(async { "ZOMG" });
assert_eq!(out, "ZOMG");
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn complete_block_on_under_load() {
let rt = rt();
rt.block_on(async {
let (tx, rx) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn complete_task_under_load() {
let rt = rt();
rt.block_on(async {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
assert_ok!(tx1.send(()));
});
tokio::spawn(async move {
assert_ok!(rx1.await);
assert_ok!(tx2.send(()));
});
assert_ok!(rx2.await);
});
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_other_thread_idle() {
let rt = rt();
let handle = rt.clone();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
handle.spawn(async move {
assert_ok!(tx.send(()));
});
});
rt.block_on(async move {
assert_ok!(rx.await);
});
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_other_thread_under_load() {
let rt = rt();
let handle = rt.clone();
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
handle.spawn(async move {
assert_ok!(tx.send(()));
});
});
rt.block_on(async move {
// Spin hard
tokio::spawn(async {
loop {
yield_once().await;
}
});
assert_ok!(rx.await);
});
}
#[test]
fn sleep_at_root() {
let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
time::sleep(dur).await;
});
assert!(now.elapsed() >= dur);
}
#[test]
fn sleep_in_spawn() {
let rt = rt();
let now = Instant::now();
let dur = Duration::from_millis(50);
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
time::sleep(dur).await;
assert_ok!(tx.send(()));
});
assert_ok!(rx.await);
});
assert!(now.elapsed() >= dur);
}
#[cfg(not(target_os="wasi"))] // Wasi does not support bind
#[test]
fn block_on_socket() {
let rt = rt();
rt.block_on(async move {
let (tx, rx) = oneshot::channel();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = listener.accept().await;
tx.send(()).unwrap();
});
TcpStream::connect(&addr).await.unwrap();
rx.await.unwrap();
});
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_from_blocking() {
let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
tokio::spawn(async move { "hello" })
}).await);
assert_ok!(inner.await)
});
assert_eq!(out, "hello")
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn spawn_blocking_from_blocking() {
let rt = rt();
let out = rt.block_on(async move {
let inner = assert_ok!(tokio::task::spawn_blocking(|| {
tokio::task::spawn_blocking(|| "hello")
}).await);
assert_ok!(inner.await)
});
assert_eq!(out, "hello")
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn sleep_from_blocking() {
let rt = rt();
rt.block_on(async move {
assert_ok!(tokio::task::spawn_blocking(|| {
let now = std::time::Instant::now();
let dur = Duration::from_millis(1);
// use the futures' block_on fn to make sure we aren't setting
// any Tokio context
futures::executor::block_on(async {
tokio::time::sleep(dur).await;
});
assert!(now.elapsed() >= dur);
}).await);
});
}
#[cfg(not(target_os="wasi"))] // Wasi does not support bind
#[test]
fn socket_from_blocking() {
let rt = rt();
rt.block_on(async move {
let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(listener.local_addr());
let peer = tokio::task::spawn_blocking(move || {
// use the futures' block_on fn to make sure we aren't setting
// any Tokio context
futures::executor::block_on(async {
assert_ok!(TcpStream::connect(addr).await);
});
});
// Wait for the client to connect
let _ = assert_ok!(listener.accept().await);
assert_ok!(peer.await);
});
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn always_active_parker() {
// This test it to show that we will always have
// an active parker even if we call block_on concurrently
let rt = rt();
let rt2 = rt.clone();
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let jh1 = thread::spawn(move || {
rt.block_on(async move {
rx2.await.unwrap();
time::sleep(Duration::from_millis(5)).await;
tx1.send(()).unwrap();
});
});
let jh2 = thread::spawn(move || {
rt2.block_on(async move {
tx2.send(()).unwrap();
time::sleep(Duration::from_millis(5)).await;
rx1.await.unwrap();
time::sleep(Duration::from_millis(5)).await;
});
});
jh1.join().unwrap();
jh2.join().unwrap();
}
#[test]
// IOCP requires setting the "max thread" concurrency value. The sane,
// default, is to set this to the number of cores. Threads that poll I/O
// become associated with the IOCP handle. Once those threads sleep for any
// reason (mutex), they yield their ownership.
//
// This test hits an edge case on windows where more threads than cores are
// created, none of those threads ever yield due to being at capacity, so
// IOCP gets "starved".
//
// For now, this is a very edge case that is probably not a real production
// concern. There also isn't a great/obvious solution to take. For now, the
// test is disabled.
#[cfg(not(windows))]
#[cfg(not(target_os="wasi"))] // Wasi does not support bind or threads
fn io_driver_called_when_under_load() {
let rt = rt();
// Create a lot of constant load. The scheduler will always be busy.
for _ in 0..100 {
rt.spawn(async {
loop {
// Don't use Tokio's `yield_now()` to avoid special defer
// logic.
futures::future::poll_fn::<(), _>(|cx| {
cx.waker().wake_by_ref();
std::task::Poll::Pending
}).await;
}
});
}
// Do some I/O work
rt.block_on(async {
let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
let addr = assert_ok!(listener.local_addr());
let srv = tokio::spawn(async move {
let (mut stream, _) = assert_ok!(listener.accept().await);
assert_ok!(stream.write_all(b"hello world").await);
});
let cli = tokio::spawn(async move {
let mut stream = assert_ok!(TcpStream::connect(addr).await);
let mut dst = vec![0; 11];
assert_ok!(stream.read_exact(&mut dst).await);
assert_eq!(dst, b"hello world");
});
assert_ok!(srv.await);
assert_ok!(cli.await);
});
}
/// Tests that yielded tasks are not scheduled until **after** resource
/// drivers are polled.
///
/// The OS does not guarantee when I/O events are delivered, so there may be
/// more yields than anticipated. This makes the test slightly flaky. To
/// help avoid flakiness, we run the test 10 times and only fail it after
/// 10 failures in a row.
///
/// Note that if the test fails by panicking rather than by returning false,
/// then we fail it immediately. That kind of failure should not happen
/// spuriously.
#[test]
#[cfg(not(target_os="wasi"))]
fn yield_defers_until_park() {
for _ in 0..10 {
if yield_defers_until_park_inner() {
// test passed
return;
}
// Wait a bit and run the test again.
std::thread::sleep(std::time::Duration::from_secs(2));
}
panic!("yield_defers_until_park is failing consistently");
}
/// Implementation of `yield_defers_until_park` test. Returns `true` if the
/// test passed.
#[cfg(not(target_os="wasi"))]
fn yield_defers_until_park_inner() -> bool {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
use std::sync::Barrier;
let rt = rt();
let flag = Arc::new(AtomicBool::new(false));
let barrier = Arc::new(Barrier::new(NUM_WORKERS));
rt.block_on(async {
// Make sure other workers cannot steal tasks
#[allow(clippy::reversed_empty_ranges)]
for _ in 0..(NUM_WORKERS-1) {
let flag = flag.clone();
let barrier = barrier.clone();
tokio::spawn(async move {
barrier.wait();
while !flag.load(SeqCst) {
std::thread::sleep(std::time::Duration::from_millis(1));
}
});
}
barrier.wait();
let (fail_test, fail_test_recv) = oneshot::channel::<()>();
let jh = tokio::spawn(async move {
// Create a TCP litener
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::join!(
async {
// Done in a blocking manner intentionally.
let _socket = std::net::TcpStream::connect(addr).unwrap();
// Yield until connected
let mut cnt = 0;
while !flag.load(SeqCst){
tokio::task::yield_now().await;
cnt += 1;
if cnt >= 10 {
// yielded too many times; report failure and
// sleep forever so that the `fail_test` branch
// of the `select!` below triggers.
let _ = fail_test.send(());
futures::future::pending::<()>().await;
break;
}
}
},
async {
let _ = listener.accept().await.unwrap();
flag.store(true, SeqCst);
}
);
});
// Wait until the spawned task completes or fails. If no message is
// sent on `fail_test`, then the test succeeds. Otherwise, it fails.
let success = fail_test_recv.await.is_err();
if success {
// Check for panics in spawned task.
jh.abort();
jh.await.unwrap();
}
success
})
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn client_server_block_on() {
let rt = rt();
let (tx, rx) = mpsc::channel();
rt.block_on(async move { client_server(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
}
#[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads or panic recovery")]
#[test]
fn panic_in_task() {
let rt = rt();
let (tx, rx) = oneshot::channel();
struct Boom(Option<oneshot::Sender<()>>);
impl Future for Boom {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
panic!();
}
}
impl Drop for Boom {
fn drop(&mut self) {
assert!(std::thread::panicking());
self.0.take().unwrap().send(()).unwrap();
}
}
rt.spawn(Boom(Some(tx)));
assert_ok!(rt.block_on(rx));
}
#[test]
#[should_panic]
#[cfg_attr(tokio_wasi, ignore = "Wasi does not support panic recovery")]
fn panic_in_block_on() {
let rt = rt();
rt.block_on(async { panic!() });
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
async fn yield_once() {
let mut yielded = false;
poll_fn(|cx| {
if yielded {
Poll::Ready(())
} else {
yielded = true;
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await
}
#[test]
fn enter_and_spawn() {
let rt = rt();
let handle = {
let _enter = rt.enter();
tokio::spawn(async {})
};
assert_ok!(rt.block_on(handle));
}
#[test]
fn eagerly_drops_futures_on_shutdown() {
use std::sync::mpsc;
struct Never {
drop_tx: mpsc::Sender<()>,
}
impl Future for Never {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}
impl Drop for Never {
fn drop(&mut self) {
self.drop_tx.send(()).unwrap();
}
}
let rt = rt();
let (drop_tx, drop_rx) = mpsc::channel();
let (run_tx, run_rx) = oneshot::channel();
rt.block_on(async move {
tokio::spawn(async move {
assert_ok!(run_tx.send(()));
Never { drop_tx }.await
});
assert_ok!(run_rx.await);
});
drop(rt);
assert_ok!(drop_rx.recv());
}
#[test]
fn wake_while_rt_is_dropping() {
use tokio::sync::Barrier;
use core::sync::atomic::{AtomicBool, Ordering};
let drop_triggered = Arc::new(AtomicBool::new(false));
let set_drop_triggered = drop_triggered.clone();
struct OnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for OnDrop<F> {
fn drop(&mut self) {
(self.0)()
}
}
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
let barrier = Arc::new(Barrier::new(4));
let barrier1 = barrier.clone();
let barrier2 = barrier.clone();
let barrier3 = barrier.clone();
let rt = rt();
rt.spawn(async move {
// Ensure a waker gets stored in oneshot 1.
let _ = tokio::join!(rx1, barrier1.wait());
tx3.send(()).unwrap();
});
rt.spawn(async move {
let h1 = tokio::runtime::Handle::current();
// When this task is dropped, we'll be "closing remotes".
// We spawn a new task that owns the `tx1`, to move its Drop
// out of here.
//
// Importantly, the oneshot 1 has a waker already stored, so
// the eventual drop here will try to re-schedule again.
let mut opt_tx1 = Some(tx1);
let _d = OnDrop(move || {
let tx1 = opt_tx1.take().unwrap();
h1.spawn(async move {
tx1.send(()).unwrap();
});
// Just a sanity check that this entire thing actually happened
set_drop_triggered.store(true, Ordering::Relaxed);
});
let _ = tokio::join!(rx2, barrier2.wait());
});
rt.spawn(async move {
let _ = tokio::join!(rx3, barrier3.wait());
// We'll never get here, but once task 3 drops, this will
// force task 2 to re-schedule since it's waiting on oneshot 2.
tx2.send(()).unwrap();
});
// Wait until every oneshot channel has been polled.
rt.block_on(barrier.wait());
// Drop the rt
drop(rt);
// Make sure that the spawn actually happened
assert!(drop_triggered.load(Ordering::Relaxed));
}
#[cfg(not(target_os="wasi"))] // Wasi doesn't support UDP or bind()
#[test]
fn io_notify_while_shutting_down() {
use tokio::net::UdpSocket;
use std::sync::Arc;
for _ in 1..10 {
let runtime = rt();
runtime.block_on(async {
let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = socket.local_addr().unwrap();
let send_half = Arc::new(socket);
let recv_half = send_half.clone();
tokio::spawn(async move {
let mut buf = [0];
loop {
recv_half.recv_from(&mut buf).await.unwrap();
std::thread::sleep(Duration::from_millis(2));
}
});
tokio::spawn(async move {
let buf = [0];
loop {
send_half.send_to(&buf, &addr).await.unwrap();
tokio::time::sleep(Duration::from_millis(1)).await;
}
});
tokio::time::sleep(Duration::from_millis(5)).await;
});
}
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn shutdown_timeout() {
let (tx, rx) = oneshot::channel();
let runtime = rt();
runtime.block_on(async move {
task::spawn_blocking(move || {
tx.send(()).unwrap();
thread::sleep(Duration::from_secs(10_000));
});
rx.await.unwrap();
});
Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_millis(100));
}
#[cfg(not(target_os="wasi"))] // Wasi does not support threads
#[test]
fn shutdown_timeout_0() {
let runtime = rt();
runtime.block_on(async move {
task::spawn_blocking(move || {
thread::sleep(Duration::from_secs(10_000));
});
});
let now = Instant::now();
Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_nanos(0));
assert!(now.elapsed().as_secs() < 1);
}
#[test]
fn shutdown_wakeup_time() {
let runtime = rt();
runtime.block_on(async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
});
Arc::try_unwrap(runtime).unwrap().shutdown_timeout(Duration::from_secs(10_000));
}
// This test is currently ignored on Windows because of a
// rust-lang issue in thread local storage destructors.
#[test]
#[cfg(not(windows))]
#[cfg_attr(tokio_wasi, ignore = "Wasi does not support threads")]
fn runtime_in_thread_local() {
use std::cell::RefCell;
use std::thread;
thread_local!(
static R: RefCell<Option<Runtime>> = RefCell::new(None);
);
thread::spawn(|| {
R.with(|cell| {
let rt = rt();
let rt = Arc::try_unwrap(rt).unwrap();
*cell.borrow_mut() = Some(rt);
});
let _rt = rt();
}).join().unwrap();
}
#[cfg(not(target_os="wasi"))] // Wasi does not support bind
async fn client_server(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
tokio::spawn(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
}
#[cfg(not(tokio_wasi))] // Wasi does not support bind
#[test]
fn local_set_block_on_socket() {
let rt = rt();
let local = task::LocalSet::new();
local.block_on(&rt, async move {
let (tx, rx) = oneshot::channel();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
task::spawn_local(async move {
let _ = listener.accept().await;
tx.send(()).unwrap();
});
TcpStream::connect(&addr).await.unwrap();
rx.await.unwrap();
});
}
#[cfg(not(tokio_wasi))] // Wasi does not support bind
#[test]
fn local_set_client_server_block_on() {
let rt = rt();
let (tx, rx) = mpsc::channel();
let local = task::LocalSet::new();
local.block_on(&rt, async move { client_server_local(tx).await });
assert_ok!(rx.try_recv());
assert_err!(rx.try_recv());
}
#[cfg(not(tokio_wasi))] // Wasi does not support bind
async fn client_server_local(tx: mpsc::Sender<()>) {
let server = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
// Get the assigned address
let addr = assert_ok!(server.local_addr());
// Spawn the server
task::spawn_local(async move {
// Accept a socket
let (mut socket, _) = server.accept().await.unwrap();
// Write some data
socket.write_all(b"hello").await.unwrap();
});
let mut client = TcpStream::connect(&addr).await.unwrap();
let mut buf = vec![];
client.read_to_end(&mut buf).await.unwrap();
assert_eq!(buf, b"hello");
tx.send(()).unwrap();
}
#[test]
fn coop() {
use std::task::Poll::Ready;
use tokio::sync::mpsc;
let rt = rt();
rt.block_on(async {
let (send, mut recv) = mpsc::unbounded_channel();
// Send a bunch of messages.
for _ in 0..1_000 {
send.send(()).unwrap();
}
poll_fn(|cx| {
// At least one response should return pending.
for _ in 0..1_000 {
if recv.poll_recv(cx).is_pending() {
return Ready(());
}
}
panic!("did not yield");
}).await;
});
}
#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;
use tokio::sync::mpsc;
let rt = rt();
rt.block_on(async {
let (send, mut recv) = mpsc::unbounded_channel();
// Send a bunch of messages.
for _ in 0..1_000 {
send.send(()).unwrap();
}
tokio::task::unconstrained(poll_fn(|cx| {
// All the responses should be ready.
for _ in 0..1_000 {
assert_eq!(recv.poll_recv(cx), Poll::Ready(Some(())));
}
Ready(())
})).await;
});
}
#[cfg(tokio_unstable)]
#[test]
fn coop_consume_budget() {
let rt = rt();
rt.block_on(async {
poll_fn(|cx| {
let counter = Arc::new(std::sync::Mutex::new(0));
let counter_clone = Arc::clone(&counter);
let mut worker = Box::pin(async move {
// Consume the budget until a yield happens
for _ in 0..1000 {
*counter.lock().unwrap() += 1;
task::consume_budget().await
}
});
// Assert that the worker was yielded and it didn't manage
// to finish the whole work (assuming the total budget of 128)
assert!(Pin::new(&mut worker).poll(cx).is_pending());
assert!(*counter_clone.lock().unwrap() < 1000);
std::task::Poll::Ready(())
}).await;
});
}
// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
fn ping_pong_saturation() {
use std::sync::atomic::{Ordering, AtomicBool};
use tokio::sync::mpsc;
const NUM: usize = 100;
let rt = rt();
let running = Arc::new(AtomicBool::new(true));
rt.block_on(async {
let (spawned_tx, mut spawned_rx) = mpsc::unbounded_channel();
let mut tasks = vec![];
// Spawn a bunch of tasks that ping-pong between each other to
// saturate the runtime.
for _ in 0..NUM {
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx2, mut rx2) = mpsc::unbounded_channel();
let spawned_tx = spawned_tx.clone();
let running = running.clone();
tasks.push(task::spawn(async move {
spawned_tx.send(()).unwrap();
while running.load(Ordering::Relaxed) {
tx1.send(()).unwrap();
rx2.recv().await.unwrap();
}
// Close the channel and wait for the other task to exit.
drop(tx1);
assert!(rx2.recv().await.is_none());
}));
tasks.push(task::spawn(async move {
while rx1.recv().await.is_some() {
tx2.send(()).unwrap();
}
}));
}
for _ in 0..NUM {
spawned_rx.recv().await.unwrap();
}
// spawn another task and wait for it to complete
let handle = task::spawn(async {
for _ in 0..5 {
// Yielding forces it back into the local queue.
task::yield_now().await;
}
});
handle.await.unwrap();
running.store(false, Ordering::Relaxed);
for t in tasks {
t.await.unwrap();
}
});
}
#[test]
#[cfg(not(target_os="wasi"))]
fn shutdown_concurrent_spawn() {
const NUM_TASKS: usize = 10_000;
for _ in 0..5 {
let (tx, rx) = std::sync::mpsc::channel();
let rt = rt();
let mut txs = vec![];
for _ in 0..NUM_TASKS {
let (tx, rx) = tokio::sync::oneshot::channel();
txs.push(tx);
rt.spawn(async move {
rx.await.unwrap();
});
}
// Prime the tasks
rt.block_on(async { tokio::task::yield_now().await });
let th = std::thread::spawn(move || {
tx.send(()).unwrap();
for tx in txs.drain(..) {
let _ = tx.send(());
}
});
rx.recv().unwrap();
drop(rt);
th.join().unwrap();
}
}
}