Source code

Revision control

Copy as Markdown

Other Tools

use std::fmt;
use std::future::Future;
use std::panic;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::runtime::task::AbortHandle;
use crate::runtime::Builder;
use crate::sync::oneshot;
use crate::task::JoinHandle;
use futures::future::FutureExt;
// Enums for each option in the combinations being tested
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiRuntime {
CurrentThread,
Multi1,
Multi2,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiLocalSet {
Yes,
No,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiTask {
PanicOnRun,
PanicOnDrop,
PanicOnRunAndDrop,
NoPanic,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiOutput {
PanicOnDrop,
NoPanic,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiJoinInterest {
Polled,
NotPolled,
}
#[allow(clippy::enum_variant_names)] // we aren't using glob imports
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiJoinHandle {
DropImmediately = 1,
DropFirstPoll = 2,
DropAfterNoConsume = 3,
DropAfterConsume = 4,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiAbort {
NotAborted = 0,
AbortedImmediately = 1,
AbortedFirstPoll = 2,
AbortedAfterFinish = 3,
AbortedAfterConsumeOutput = 4,
}
#[derive(Copy, Clone, Debug, PartialEq)]
enum CombiAbortSource {
JoinHandle,
AbortHandle,
}
#[test]
fn test_combinations() {
let mut rt = &[
CombiRuntime::CurrentThread,
CombiRuntime::Multi1,
CombiRuntime::Multi2,
][..];
if cfg!(miri) {
rt = &[CombiRuntime::CurrentThread];
}
let ls = [CombiLocalSet::Yes, CombiLocalSet::No];
let task = [
CombiTask::NoPanic,
CombiTask::PanicOnRun,
CombiTask::PanicOnDrop,
CombiTask::PanicOnRunAndDrop,
];
let output = [CombiOutput::NoPanic, CombiOutput::PanicOnDrop];
let ji = [CombiJoinInterest::Polled, CombiJoinInterest::NotPolled];
let jh = [
CombiJoinHandle::DropImmediately,
CombiJoinHandle::DropFirstPoll,
CombiJoinHandle::DropAfterNoConsume,
CombiJoinHandle::DropAfterConsume,
];
let abort = [
CombiAbort::NotAborted,
CombiAbort::AbortedImmediately,
CombiAbort::AbortedFirstPoll,
CombiAbort::AbortedAfterFinish,
CombiAbort::AbortedAfterConsumeOutput,
];
let ah = [
None,
Some(CombiJoinHandle::DropImmediately),
Some(CombiJoinHandle::DropFirstPoll),
Some(CombiJoinHandle::DropAfterNoConsume),
Some(CombiJoinHandle::DropAfterConsume),
];
for rt in rt.iter().copied() {
for ls in ls.iter().copied() {
for task in task.iter().copied() {
for output in output.iter().copied() {
for ji in ji.iter().copied() {
for jh in jh.iter().copied() {
for abort in abort.iter().copied() {
// abort via join handle --- abort handles
// may be dropped at any point
for ah in ah.iter().copied() {
test_combination(
rt,
ls,
task,
output,
ji,
jh,
ah,
abort,
CombiAbortSource::JoinHandle,
);
}
// if aborting via AbortHandle, it will
// never be dropped.
test_combination(
rt,
ls,
task,
output,
ji,
jh,
None,
abort,
CombiAbortSource::AbortHandle,
);
}
}
}
}
}
}
}
}
fn is_debug<T: fmt::Debug>(_: &T) {}
#[allow(clippy::too_many_arguments)]
fn test_combination(
rt: CombiRuntime,
ls: CombiLocalSet,
task: CombiTask,
output: CombiOutput,
ji: CombiJoinInterest,
jh: CombiJoinHandle,
ah: Option<CombiJoinHandle>,
abort: CombiAbort,
abort_src: CombiAbortSource,
) {
match (abort_src, ah) {
(CombiAbortSource::JoinHandle, _) if (jh as usize) < (abort as usize) => {
// join handle dropped prior to abort
return;
}
(CombiAbortSource::AbortHandle, Some(_)) => {
// abort handle dropped, we can't abort through the
// abort handle
return;
}
_ => {}
}
if (task == CombiTask::PanicOnDrop) && (output == CombiOutput::PanicOnDrop) {
// this causes double panic
return;
}
if (task == CombiTask::PanicOnRunAndDrop) && (abort != CombiAbort::AbortedImmediately) {
// this causes double panic
return;
}
is_debug(&rt);
is_debug(&ls);
is_debug(&task);
is_debug(&output);
is_debug(&ji);
is_debug(&jh);
is_debug(&ah);
is_debug(&abort);
is_debug(&abort_src);
// A runtime optionally with a LocalSet
struct Rt {
rt: crate::runtime::Runtime,
ls: Option<crate::task::LocalSet>,
}
impl Rt {
fn new(rt: CombiRuntime, ls: CombiLocalSet) -> Self {
let rt = match rt {
CombiRuntime::CurrentThread => Builder::new_current_thread().build().unwrap(),
CombiRuntime::Multi1 => Builder::new_multi_thread()
.worker_threads(1)
.build()
.unwrap(),
CombiRuntime::Multi2 => Builder::new_multi_thread()
.worker_threads(2)
.build()
.unwrap(),
};
let ls = match ls {
CombiLocalSet::Yes => Some(crate::task::LocalSet::new()),
CombiLocalSet::No => None,
};
Self { rt, ls }
}
fn block_on<T>(&self, task: T) -> T::Output
where
T: Future,
{
match &self.ls {
Some(ls) => ls.block_on(&self.rt, task),
None => self.rt.block_on(task),
}
}
fn spawn<T>(&self, task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
match &self.ls {
Some(ls) => ls.spawn_local(task),
None => self.rt.spawn(task),
}
}
}
// The type used for the output of the future
struct Output {
panic_on_drop: bool,
on_drop: Option<oneshot::Sender<()>>,
}
impl Output {
fn disarm(&mut self) {
self.panic_on_drop = false;
}
}
impl Drop for Output {
fn drop(&mut self) {
let _ = self.on_drop.take().unwrap().send(());
if self.panic_on_drop {
panic!("Panicking in Output");
}
}
}
// A wrapper around the future that is spawned
struct FutWrapper<F> {
inner: F,
on_drop: Option<oneshot::Sender<()>>,
panic_on_drop: bool,
}
impl<F: Future> Future for FutWrapper<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
unsafe {
let me = Pin::into_inner_unchecked(self);
let inner = Pin::new_unchecked(&mut me.inner);
inner.poll(cx)
}
}
}
impl<F> Drop for FutWrapper<F> {
fn drop(&mut self) {
let _: Result<(), ()> = self.on_drop.take().unwrap().send(());
if self.panic_on_drop {
panic!("Panicking in FutWrapper");
}
}
}
// The channels passed to the task
struct Signals {
on_first_poll: Option<oneshot::Sender<()>>,
wait_complete: Option<oneshot::Receiver<()>>,
on_output_drop: Option<oneshot::Sender<()>>,
}
// The task we will spawn
async fn my_task(mut signal: Signals, task: CombiTask, out: CombiOutput) -> Output {
// Signal that we have been polled once
let _ = signal.on_first_poll.take().unwrap().send(());
// Wait for a signal, then complete the future
let _ = signal.wait_complete.take().unwrap().await;
// If the task gets past wait_complete without yielding, then aborts
// may not be caught without this yield_now.
crate::task::yield_now().await;
if task == CombiTask::PanicOnRun || task == CombiTask::PanicOnRunAndDrop {
panic!("Panicking in my_task on {:?}", std::thread::current().id());
}
Output {
panic_on_drop: out == CombiOutput::PanicOnDrop,
on_drop: signal.on_output_drop.take(),
}
}
let rt = Rt::new(rt, ls);
let (on_first_poll, wait_first_poll) = oneshot::channel();
let (on_complete, wait_complete) = oneshot::channel();
let (on_future_drop, wait_future_drop) = oneshot::channel();
let (on_output_drop, wait_output_drop) = oneshot::channel();
let signal = Signals {
on_first_poll: Some(on_first_poll),
wait_complete: Some(wait_complete),
on_output_drop: Some(on_output_drop),
};
// === Spawn task ===
let mut handle = Some(rt.spawn(FutWrapper {
inner: my_task(signal, task, output),
on_drop: Some(on_future_drop),
panic_on_drop: task == CombiTask::PanicOnDrop || task == CombiTask::PanicOnRunAndDrop,
}));
// Keep track of whether the task has been killed with an abort
let mut aborted = false;
// If we want to poll the JoinHandle, do it now
if ji == CombiJoinInterest::Polled {
assert!(
handle.as_mut().unwrap().now_or_never().is_none(),
"Polling handle succeeded"
);
}
// If we are either aborting the task via an abort handle, or dropping via
// an abort handle, do that now.
let mut abort_handle = if ah.is_some() || abort_src == CombiAbortSource::AbortHandle {
handle.as_ref().map(JoinHandle::abort_handle)
} else {
None
};
let do_abort = |abort_handle: &mut Option<AbortHandle>,
join_handle: Option<&mut JoinHandle<_>>| {
match abort_src {
CombiAbortSource::AbortHandle => abort_handle.take().unwrap().abort(),
CombiAbortSource::JoinHandle => join_handle.unwrap().abort(),
}
};
if abort == CombiAbort::AbortedImmediately {
do_abort(&mut abort_handle, handle.as_mut());
aborted = true;
}
if jh == CombiJoinHandle::DropImmediately {
drop(handle.take().unwrap());
}
// === Wait for first poll ===
let got_polled = rt.block_on(wait_first_poll).is_ok();
if !got_polled {
// it's possible that we are aborted but still got polled
assert!(
aborted,
"Task completed without ever being polled but was not aborted."
);
}
if abort == CombiAbort::AbortedFirstPoll {
do_abort(&mut abort_handle, handle.as_mut());
aborted = true;
}
if jh == CombiJoinHandle::DropFirstPoll {
drop(handle.take().unwrap());
}
if ah == Some(CombiJoinHandle::DropFirstPoll) {
drop(abort_handle.take().unwrap());
}
// Signal the future that it can return now
let _ = on_complete.send(());
// === Wait for future to be dropped ===
assert!(
rt.block_on(wait_future_drop).is_ok(),
"The future should always be dropped."
);
if abort == CombiAbort::AbortedAfterFinish {
// Don't set aborted to true here as the task already finished
do_abort(&mut abort_handle, handle.as_mut());
}
if jh == CombiJoinHandle::DropAfterNoConsume {
if ah == Some(CombiJoinHandle::DropAfterNoConsume) {
drop(handle.take().unwrap());
// The runtime will usually have dropped every ref-count at this point,
// in which case dropping the AbortHandle drops the output.
//
// (But it might race and still hold a ref-count)
let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
drop(abort_handle.take().unwrap());
}));
if panic.is_err() {
assert!(
(output == CombiOutput::PanicOnDrop)
&& (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
&& !aborted,
"Dropping AbortHandle shouldn't panic here"
);
}
} else {
// The runtime will usually have dropped every ref-count at this point,
// in which case dropping the JoinHandle drops the output.
//
// (But it might race and still hold a ref-count)
let panic = panic::catch_unwind(panic::AssertUnwindSafe(|| {
drop(handle.take().unwrap());
}));
if panic.is_err() {
assert!(
(output == CombiOutput::PanicOnDrop)
&& (!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop))
&& !aborted,
"Dropping JoinHandle shouldn't panic here"
);
}
}
}
// Check whether we drop after consuming the output
if jh == CombiJoinHandle::DropAfterConsume {
// Using as_mut() to not immediately drop the handle
let result = rt.block_on(handle.as_mut().unwrap());
match result {
Ok(mut output) => {
// Don't panic here.
output.disarm();
assert!(!aborted, "Task was aborted but returned output");
}
Err(err) if err.is_cancelled() => assert!(aborted, "Cancelled output but not aborted"),
Err(err) if err.is_panic() => {
assert!(
(task == CombiTask::PanicOnRun)
|| (task == CombiTask::PanicOnDrop)
|| (task == CombiTask::PanicOnRunAndDrop)
|| (output == CombiOutput::PanicOnDrop),
"Panic but nothing should panic"
);
}
_ => unreachable!(),
}
let mut handle = handle.take().unwrap();
if abort == CombiAbort::AbortedAfterConsumeOutput {
do_abort(&mut abort_handle, Some(&mut handle));
}
drop(handle);
if ah == Some(CombiJoinHandle::DropAfterConsume) {
drop(abort_handle.take());
}
}
// The output should have been dropped now. Check whether the output
// object was created at all.
let output_created = rt.block_on(wait_output_drop).is_ok();
assert_eq!(
output_created,
(!matches!(task, CombiTask::PanicOnRun | CombiTask::PanicOnRunAndDrop)) && !aborted,
"Creation of output object"
);
}