Source code

Revision control

Copy as Markdown

Other Tools

#![warn(rust_2018_idioms)]
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use tokio_stream::StreamExt;
/// produces at most `remaining` zeros, that returns error.
/// each time it reads at most 31 byte.
struct Reader {
remaining: usize,
}
impl AsyncRead for Reader {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let this = Pin::into_inner(self);
assert_ne!(buf.remaining(), 0);
if this.remaining > 0 {
let n = std::cmp::min(this.remaining, buf.remaining());
let n = std::cmp::min(n, 31);
for x in &mut buf.initialize_unfilled_to(n)[..n] {
*x = 0;
}
buf.advance(n);
this.remaining -= n;
Poll::Ready(Ok(()))
} else {
Poll::Ready(Err(std::io::Error::from_raw_os_error(22)))
}
}
}
#[tokio::test]
async fn correct_behavior_on_errors() {
let reader = Reader { remaining: 8000 };
let mut stream = tokio_util::io::ReaderStream::new(reader);
let mut zeros_received = 0;
let mut had_error = false;
loop {
let item = stream.next().await.unwrap();
println!("{:?}", item);
match item {
Ok(bytes) => {
let bytes = &*bytes;
for byte in bytes {
assert_eq!(*byte, 0);
zeros_received += 1;
}
}
Err(_) => {
assert!(!had_error);
had_error = true;
break;
}
}
}
assert!(had_error);
assert_eq!(zeros_received, 8000);
assert!(stream.next().await.is_none());
}