Source code

Revision control

Copy as Markdown

Other Tools

use crate::sync::broadcast;
use crate::sync::broadcast::error::RecvError::{Closed, Lagged};
use loom::future::block_on;
use loom::sync::Arc;
use loom::thread;
use tokio_test::{assert_err, assert_ok};
#[test]
fn broadcast_send() {
loom::model(|| {
let (tx1, mut rx) = broadcast::channel(2);
let tx1 = Arc::new(tx1);
let tx2 = tx1.clone();
let th1 = thread::spawn(move || {
block_on(async {
assert_ok!(tx1.send("one"));
assert_ok!(tx1.send("two"));
assert_ok!(tx1.send("three"));
});
});
let th2 = thread::spawn(move || {
block_on(async {
assert_ok!(tx2.send("eins"));
assert_ok!(tx2.send("zwei"));
assert_ok!(tx2.send("drei"));
});
});
block_on(async {
let mut num = 0;
loop {
match rx.recv().await {
Ok(_) => num += 1,
Err(Closed) => break,
Err(Lagged(n)) => num += n as usize,
}
}
assert_eq!(num, 6);
});
assert_ok!(th1.join());
assert_ok!(th2.join());
});
}
// An `Arc` is used as the value in order to detect memory leaks.
#[test]
fn broadcast_two() {
loom::model(|| {
let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16);
let mut rx2 = tx.subscribe();
let th1 = thread::spawn(move || {
block_on(async {
let v = assert_ok!(rx1.recv().await);
assert_eq!(*v, "hello");
let v = assert_ok!(rx1.recv().await);
assert_eq!(*v, "world");
match assert_err!(rx1.recv().await) {
Closed => {}
_ => panic!(),
}
});
});
let th2 = thread::spawn(move || {
block_on(async {
let v = assert_ok!(rx2.recv().await);
assert_eq!(*v, "hello");
let v = assert_ok!(rx2.recv().await);
assert_eq!(*v, "world");
match assert_err!(rx2.recv().await) {
Closed => {}
_ => panic!(),
}
});
});
assert_ok!(tx.send(Arc::new("hello")));
assert_ok!(tx.send(Arc::new("world")));
drop(tx);
assert_ok!(th1.join());
assert_ok!(th2.join());
});
}
#[test]
fn broadcast_wrap() {
loom::model(|| {
let (tx, mut rx1) = broadcast::channel(2);
let mut rx2 = tx.subscribe();
let th1 = thread::spawn(move || {
block_on(async {
let mut num = 0;
loop {
match rx1.recv().await {
Ok(_) => num += 1,
Err(Closed) => break,
Err(Lagged(n)) => num += n as usize,
}
}
assert_eq!(num, 3);
});
});
let th2 = thread::spawn(move || {
block_on(async {
let mut num = 0;
loop {
match rx2.recv().await {
Ok(_) => num += 1,
Err(Closed) => break,
Err(Lagged(n)) => num += n as usize,
}
}
assert_eq!(num, 3);
});
});
assert_ok!(tx.send("one"));
assert_ok!(tx.send("two"));
assert_ok!(tx.send("three"));
drop(tx);
assert_ok!(th1.join());
assert_ok!(th2.join());
});
}
#[test]
fn drop_rx() {
loom::model(|| {
let (tx, mut rx1) = broadcast::channel(16);
let rx2 = tx.subscribe();
let th1 = thread::spawn(move || {
block_on(async {
let v = assert_ok!(rx1.recv().await);
assert_eq!(v, "one");
let v = assert_ok!(rx1.recv().await);
assert_eq!(v, "two");
let v = assert_ok!(rx1.recv().await);
assert_eq!(v, "three");
match assert_err!(rx1.recv().await) {
Closed => {}
_ => panic!(),
}
});
});
let th2 = thread::spawn(move || {
drop(rx2);
});
assert_ok!(tx.send("one"));
assert_ok!(tx.send("two"));
assert_ok!(tx.send("three"));
drop(tx);
assert_ok!(th1.join());
assert_ok!(th2.join());
});
}
#[test]
fn drop_multiple_rx_with_overflow() {
loom::model(move || {
// It is essential to have multiple senders and receivers in this test case.
let (tx, mut rx) = broadcast::channel(1);
let _rx2 = tx.subscribe();
let _ = tx.send(());
let tx2 = tx.clone();
let th1 = thread::spawn(move || {
block_on(async {
for _ in 0..100 {
let _ = tx2.send(());
}
});
});
let _ = tx.send(());
let th2 = thread::spawn(move || {
block_on(async { while let Ok(_) = rx.recv().await {} });
});
assert_ok!(th1.join());
assert_ok!(th2.join());
});
}