Source code

Revision control

Copy as Markdown

Other Tools

use crate::sync::batch_semaphore::Semaphore;
use tokio_test::*;
const MAX_PERMITS: usize = crate::sync::Semaphore::MAX_PERMITS;
#[cfg(tokio_wasm_not_wasi)]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn poll_acquire_one_available() {
let s = Semaphore::new(100);
assert_eq!(s.available_permits(), 100);
// Polling for a permit succeeds immediately
assert_ready_ok!(task::spawn(s.acquire(1)).poll());
assert_eq!(s.available_permits(), 99);
}
#[test]
fn poll_acquire_many_available() {
let s = Semaphore::new(100);
assert_eq!(s.available_permits(), 100);
// Polling for a permit succeeds immediately
assert_ready_ok!(task::spawn(s.acquire(5)).poll());
assert_eq!(s.available_permits(), 95);
assert_ready_ok!(task::spawn(s.acquire(5)).poll());
assert_eq!(s.available_permits(), 90);
}
#[test]
fn try_acquire_one_available() {
let s = Semaphore::new(100);
assert_eq!(s.available_permits(), 100);
assert_ok!(s.try_acquire(1));
assert_eq!(s.available_permits(), 99);
assert_ok!(s.try_acquire(1));
assert_eq!(s.available_permits(), 98);
}
#[test]
fn try_acquire_many_available() {
let s = Semaphore::new(100);
assert_eq!(s.available_permits(), 100);
assert_ok!(s.try_acquire(5));
assert_eq!(s.available_permits(), 95);
assert_ok!(s.try_acquire(5));
assert_eq!(s.available_permits(), 90);
}
#[test]
fn poll_acquire_one_unavailable() {
let s = Semaphore::new(1);
// Acquire the first permit
assert_ready_ok!(task::spawn(s.acquire(1)).poll());
assert_eq!(s.available_permits(), 0);
let mut acquire_2 = task::spawn(s.acquire(1));
// Try to acquire the second permit
assert_pending!(acquire_2.poll());
assert_eq!(s.available_permits(), 0);
s.release(1);
assert_eq!(s.available_permits(), 0);
assert!(acquire_2.is_woken());
assert_ready_ok!(acquire_2.poll());
assert_eq!(s.available_permits(), 0);
s.release(1);
assert_eq!(s.available_permits(), 1);
}
#[test]
fn poll_acquire_many_unavailable() {
let s = Semaphore::new(5);
// Acquire the first permit
assert_ready_ok!(task::spawn(s.acquire(1)).poll());
assert_eq!(s.available_permits(), 4);
// Try to acquire the second permit
let mut acquire_2 = task::spawn(s.acquire(5));
assert_pending!(acquire_2.poll());
assert_eq!(s.available_permits(), 0);
// Try to acquire the third permit
let mut acquire_3 = task::spawn(s.acquire(3));
assert_pending!(acquire_3.poll());
assert_eq!(s.available_permits(), 0);
s.release(1);
assert_eq!(s.available_permits(), 0);
assert!(acquire_2.is_woken());
assert_ready_ok!(acquire_2.poll());
assert!(!acquire_3.is_woken());
assert_eq!(s.available_permits(), 0);
s.release(1);
assert!(!acquire_3.is_woken());
assert_eq!(s.available_permits(), 0);
s.release(2);
assert!(acquire_3.is_woken());
assert_ready_ok!(acquire_3.poll());
}
#[test]
fn try_acquire_one_unavailable() {
let s = Semaphore::new(1);
// Acquire the first permit
assert_ok!(s.try_acquire(1));
assert_eq!(s.available_permits(), 0);
assert_err!(s.try_acquire(1));
s.release(1);
assert_eq!(s.available_permits(), 1);
assert_ok!(s.try_acquire(1));
s.release(1);
assert_eq!(s.available_permits(), 1);
}
#[test]
fn try_acquire_many_unavailable() {
let s = Semaphore::new(5);
// Acquire the first permit
assert_ok!(s.try_acquire(1));
assert_eq!(s.available_permits(), 4);
assert_err!(s.try_acquire(5));
s.release(1);
assert_eq!(s.available_permits(), 5);
assert_ok!(s.try_acquire(5));
s.release(1);
assert_eq!(s.available_permits(), 1);
s.release(1);
assert_eq!(s.available_permits(), 2);
}
#[test]
fn poll_acquire_one_zero_permits() {
let s = Semaphore::new(0);
assert_eq!(s.available_permits(), 0);
// Try to acquire the permit
let mut acquire = task::spawn(s.acquire(1));
assert_pending!(acquire.poll());
s.release(1);
assert!(acquire.is_woken());
assert_ready_ok!(acquire.poll());
}
#[test]
fn max_permits_doesnt_panic() {
Semaphore::new(MAX_PERMITS);
}
#[test]
#[should_panic]
#[cfg(not(tokio_wasm))] // wasm currently doesn't support unwinding
fn validates_max_permits() {
Semaphore::new(MAX_PERMITS + 1);
}
#[test]
fn close_semaphore_prevents_acquire() {
let s = Semaphore::new(5);
s.close();
assert_eq!(5, s.available_permits());
assert_ready_err!(task::spawn(s.acquire(1)).poll());
assert_eq!(5, s.available_permits());
assert_ready_err!(task::spawn(s.acquire(1)).poll());
assert_eq!(5, s.available_permits());
}
#[test]
fn close_semaphore_notifies_permit1() {
let s = Semaphore::new(0);
let mut acquire = task::spawn(s.acquire(1));
assert_pending!(acquire.poll());
s.close();
assert!(acquire.is_woken());
assert_ready_err!(acquire.poll());
}
#[test]
fn close_semaphore_notifies_permit2() {
let s = Semaphore::new(2);
// Acquire a couple of permits
assert_ready_ok!(task::spawn(s.acquire(1)).poll());
assert_ready_ok!(task::spawn(s.acquire(1)).poll());
let mut acquire3 = task::spawn(s.acquire(1));
let mut acquire4 = task::spawn(s.acquire(1));
assert_pending!(acquire3.poll());
assert_pending!(acquire4.poll());
s.close();
assert!(acquire3.is_woken());
assert!(acquire4.is_woken());
assert_ready_err!(acquire3.poll());
assert_ready_err!(acquire4.poll());
assert_eq!(0, s.available_permits());
s.release(1);
assert_eq!(1, s.available_permits());
assert_ready_err!(task::spawn(s.acquire(1)).poll());
s.release(1);
assert_eq!(2, s.available_permits());
}
#[test]
fn cancel_acquire_releases_permits() {
let s = Semaphore::new(10);
s.try_acquire(4).expect("uncontended try_acquire succeeds");
assert_eq!(6, s.available_permits());
let mut acquire = task::spawn(s.acquire(8));
assert_pending!(acquire.poll());
assert_eq!(0, s.available_permits());
drop(acquire);
assert_eq!(6, s.available_permits());
assert_ok!(s.try_acquire(6));
}
#[test]
fn release_permits_at_drop() {
use crate::sync::semaphore::*;
use futures::task::ArcWake;
use std::future::Future;
use std::sync::Arc;
let sem = Arc::new(Semaphore::new(1));
struct ReleaseOnDrop(Option<OwnedSemaphorePermit>);
impl ArcWake for ReleaseOnDrop {
fn wake_by_ref(_arc_self: &Arc<Self>) {}
}
let mut fut = Box::pin(async {
let _permit = sem.acquire().await.unwrap();
});
// Second iteration shouldn't deadlock.
for _ in 0..=1 {
let waker = futures::task::waker(Arc::new(ReleaseOnDrop(
sem.clone().try_acquire_owned().ok(),
)));
let mut cx = std::task::Context::from_waker(&waker);
assert!(fut.as_mut().poll(&mut cx).is_pending());
}
}