Source code

Revision control

Copy as Markdown

Other Tools

#![warn(rust_2018_idioms)]
#![cfg(all(target_os = "freebsd", feature = "net"))]
use mio_aio::{AioCb, AioFsyncMode, LioCb};
use std::{
future::Future,
mem,
os::unix::io::{AsRawFd, RawFd},
pin::Pin,
task::{Context, Poll},
};
use tempfile::tempfile;
use tokio::io::bsd::{Aio, AioSource};
use tokio_test::assert_pending;
mod aio {
use super::*;
/// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
struct WrappedAioCb<'a>(AioCb<'a>);
impl<'a> AioSource for WrappedAioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}
/// A very crude implementation of an AIO-based future
struct FsyncFut(Aio<WrappedAioCb<'static>>);
impl Future for FsyncFut {
type Output = std::io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll_ready(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
// At this point, we could clear readiness. But there's no
// point, since we're about to drop the Aio.
let result = (*self.0).0.aio_return();
match result {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e.into())),
}
}
}
}
}
/// Low-level AIO Source
///
/// An example bypassing mio_aio and Nix to demonstrate how the kevent
/// registration actually works, under the hood.
struct LlSource(Pin<Box<libc::aiocb>>);
impl AioSource for LlSource {
fn register(&mut self, kq: RawFd, token: usize) {
let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
sev.sigev_notify = libc::SIGEV_KEVENT;
sev.sigev_signo = kq;
sev.sigev_value = libc::sigval {
sival_ptr: token as *mut libc::c_void,
};
self.0.aio_sigevent = sev;
}
fn deregister(&mut self) {
unsafe {
self.0.aio_sigevent = mem::zeroed();
}
}
}
struct LlFut(Aio<LlSource>);
impl Future for LlFut {
type Output = std::io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll_ready(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
assert_eq!(0, r);
Poll::Ready(Ok(()))
}
}
}
}
/// A very simple object that can implement AioSource and can be reused.
///
/// mio_aio normally assumes that each AioCb will be consumed on completion.
/// This somewhat contrived example shows how an Aio object can be reused
/// anyway.
struct ReusableFsyncSource {
aiocb: Pin<Box<AioCb<'static>>>,
fd: RawFd,
token: usize,
}
impl ReusableFsyncSource {
fn fsync(&mut self) {
self.aiocb.register_raw(self.fd, self.token);
self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
}
fn new(aiocb: AioCb<'static>) -> Self {
ReusableFsyncSource {
aiocb: Box::pin(aiocb),
fd: 0,
token: 0,
}
}
fn reset(&mut self, aiocb: AioCb<'static>) {
self.aiocb = Box::pin(aiocb);
}
}
impl AioSource for ReusableFsyncSource {
fn register(&mut self, kq: RawFd, token: usize) {
self.fd = kq;
self.token = token;
}
fn deregister(&mut self) {
self.fd = 0;
}
}
struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
impl<'a> Future for ReusableFsyncFut<'a> {
type Output = std::io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll_ready(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(ev)) => {
// Since this future uses a reusable Aio, we must clear
// its readiness here. That makes the future
// non-idempotent; the caller can't poll it repeatedly after
// it has already returned Ready. But that's ok; most
// futures behave this way.
self.0.clear_ready(ev);
let result = (*self.0).aiocb.aio_return();
match result {
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e.into())),
}
}
}
}
}
#[tokio::test]
async fn fsync() {
let f = tempfile().unwrap();
let fd = f.as_raw_fd();
let aiocb = AioCb::from_fd(fd, 0);
let source = WrappedAioCb(aiocb);
let mut poll_aio = Aio::new_for_aio(source).unwrap();
(*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
let fut = FsyncFut(poll_aio);
fut.await.unwrap();
}
#[tokio::test]
async fn ll_fsync() {
let f = tempfile().unwrap();
let fd = f.as_raw_fd();
let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
aiocb.aio_fildes = fd;
let source = LlSource(Box::pin(aiocb));
let mut poll_aio = Aio::new_for_aio(source).unwrap();
let r = unsafe {
let p = (*poll_aio).0.as_mut().get_unchecked_mut();
libc::aio_fsync(libc::O_SYNC, p)
};
assert_eq!(0, r);
let fut = LlFut(poll_aio);
fut.await.unwrap();
}
/// A suitably crafted future type can reuse an Aio object
#[tokio::test]
async fn reuse() {
let f = tempfile().unwrap();
let fd = f.as_raw_fd();
let aiocb0 = AioCb::from_fd(fd, 0);
let source = ReusableFsyncSource::new(aiocb0);
let mut poll_aio = Aio::new_for_aio(source).unwrap();
poll_aio.fsync();
let fut0 = ReusableFsyncFut(&mut poll_aio);
fut0.await.unwrap();
let aiocb1 = AioCb::from_fd(fd, 0);
poll_aio.reset(aiocb1);
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
assert_pending!(poll_aio.poll_ready(&mut ctx));
poll_aio.fsync();
let fut1 = ReusableFsyncFut(&mut poll_aio);
fut1.await.unwrap();
}
}
mod lio {
use super::*;
struct WrappedLioCb<'a>(LioCb<'a>);
impl<'a> AioSource for WrappedLioCb<'a> {
fn register(&mut self, kq: RawFd, token: usize) {
self.0.register_raw(kq, token)
}
fn deregister(&mut self) {
self.0.deregister_raw()
}
}
/// A very crude lio_listio-based Future
struct LioFut(Option<Aio<WrappedLioCb<'static>>>);
impl Future for LioFut {
type Output = std::io::Result<Vec<isize>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(_ev)) => {
// At this point, we could clear readiness. But there's no
// point, since we're about to drop the Aio.
let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
});
Poll::Ready(Ok(r))
}
}
}
}
/// Minimal example demonstrating reuse of an Aio object with lio
/// readiness. mio_aio::LioCb actually does something similar under the
/// hood.
struct ReusableLioSource {
liocb: Option<LioCb<'static>>,
fd: RawFd,
token: usize,
}
impl ReusableLioSource {
fn new(liocb: LioCb<'static>) -> Self {
ReusableLioSource {
liocb: Some(liocb),
fd: 0,
token: 0,
}
}
fn reset(&mut self, liocb: LioCb<'static>) {
self.liocb = Some(liocb);
}
fn submit(&mut self) {
self.liocb
.as_mut()
.unwrap()
.register_raw(self.fd, self.token);
self.liocb.as_mut().unwrap().submit().unwrap();
}
}
impl AioSource for ReusableLioSource {
fn register(&mut self, kq: RawFd, token: usize) {
self.fd = kq;
self.token = token;
}
fn deregister(&mut self) {
self.fd = 0;
}
}
struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
impl<'a> Future for ReusableLioFut<'a> {
type Output = std::io::Result<Vec<isize>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_result = self.0.poll_ready(cx);
match poll_result {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Ok(ev)) => {
// Since this future uses a reusable Aio, we must clear
// its readiness here. That makes the future
// non-idempotent; the caller can't poll it repeatedly after
// it has already returned Ready. But that's ok; most
// futures behave this way.
self.0.clear_ready(ev);
let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
});
Poll::Ready(Ok(r))
}
}
}
}
/// An lio_listio operation with one write element
#[tokio::test]
async fn onewrite() {
const WBUF: &[u8] = b"abcdef";
let f = tempfile().unwrap();
let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
builder = builder.emplace_slice(
f.as_raw_fd(),
0,
&WBUF[..],
0,
mio_aio::LioOpcode::LIO_WRITE,
);
let liocb = builder.finish();
let source = WrappedLioCb(liocb);
let mut poll_aio = Aio::new_for_lio(source).unwrap();
// Send the operation to the kernel
(*poll_aio).0.submit().unwrap();
let fut = LioFut(Some(poll_aio));
let v = fut.await.unwrap();
assert_eq!(v.len(), 1);
assert_eq!(v[0] as usize, WBUF.len());
}
/// A suitably crafted future type can reuse an Aio object
#[tokio::test]
async fn reuse() {
const WBUF: &[u8] = b"abcdef";
let f = tempfile().unwrap();
let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
builder0 = builder0.emplace_slice(
f.as_raw_fd(),
0,
&WBUF[..],
0,
mio_aio::LioOpcode::LIO_WRITE,
);
let liocb0 = builder0.finish();
let source = ReusableLioSource::new(liocb0);
let mut poll_aio = Aio::new_for_aio(source).unwrap();
poll_aio.submit();
let fut0 = ReusableLioFut(&mut poll_aio);
let v = fut0.await.unwrap();
assert_eq!(v.len(), 1);
assert_eq!(v[0] as usize, WBUF.len());
// Now reuse the same Aio
let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
builder1 = builder1.emplace_slice(
f.as_raw_fd(),
0,
&WBUF[..],
0,
mio_aio::LioOpcode::LIO_WRITE,
);
let liocb1 = builder1.finish();
poll_aio.reset(liocb1);
let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
assert_pending!(poll_aio.poll_ready(&mut ctx));
poll_aio.submit();
let fut1 = ReusableLioFut(&mut poll_aio);
let v = fut1.await.unwrap();
assert_eq!(v.len(), 1);
assert_eq!(v[0] as usize, WBUF.len());
}
}