Source code

Revision control

Copy as Markdown

Other Tools

//! Interprocess Communication pipes
//!
//! A pipe is a section of shared memory that processes use for communication.
//! The process that creates a pipe is the _pipe server_. A process that connects
//! to a pipe is a _pipe client_. One process writes information to the pipe, then
//! the other process reads the information from the pipe. This overview
//! describes how to create, manage, and use pipes.
//!
//! There are two types of pipes: [anonymous pipes](#fn.anonymous.html) and
//! [named pipes](#fn.named.html). Anonymous pipes require less overhead than
//! named pipes, but offer limited services.
//!
//! # Anonymous pipes
//!
//! An anonymous pipe is an unnamed, one-way pipe that typically transfers data
//! between a parent process and a child process. Anonymous pipes are always
//! local; they cannot be used for communication over a network.
//!
//! # Named pipes
//!
//! A *named pipe* is a named, one-way or duplex pipe for communication between
//! the pipe server and one or more pipe clients. All instances of a named pipe
//! share the same pipe name, but each instance has its own buffers and handles,
//! and provides a separate conduit for client/server communication. The use of
//! instances enables multiple pipe clients to use the same named pipe
//! simultaneously.
//!
//! Any process can access named pipes, subject to security checks, making named
//! pipes an easy form of communication between related or unrelated processes.
//!
//! Any process can act as both a server and a client, making peer-to-peer
//! communication possible. As used here, the term pipe server refers to a
//! process that creates a named pipe, and the term pipe client refers to a
//! process that connects to an instance of a named pipe.
//!
//! Named pipes can be used to provide communication between processes on the
//! same computer or between processes on different computers across a network.
//! If the server service is running, all named pipes are accessible remotely. If
//! you intend to use a named pipe locally only, deny access to NT
//! AUTHORITY\\NETWORK or switch to local RPC.
//!
//! # References
//!
use std::cell::RefCell;
use std::ffi::OsStr;
use std::fs::{File, OpenOptions};
use std::io;
use std::io::prelude::*;
use std::os::windows::ffi::*;
use std::os::windows::io::*;
use std::time::Duration;
use crate::handle::Handle;
use crate::overlapped::Overlapped;
use winapi::shared::minwindef::*;
use winapi::shared::ntdef::HANDLE;
use winapi::shared::winerror::*;
use winapi::um::fileapi::*;
use winapi::um::handleapi::*;
use winapi::um::ioapiset::*;
use winapi::um::minwinbase::*;
use winapi::um::namedpipeapi::*;
use winapi::um::winbase::*;
/// Readable half of an anonymous pipe.
#[derive(Debug)]
pub struct AnonRead(Handle);
/// Writable half of an anonymous pipe.
#[derive(Debug)]
pub struct AnonWrite(Handle);
/// A named pipe that can accept connections.
#[derive(Debug)]
pub struct NamedPipe(Handle);
/// A builder structure for creating a new named pipe.
#[derive(Debug)]
pub struct NamedPipeBuilder {
name: Vec<u16>,
dwOpenMode: DWORD,
dwPipeMode: DWORD,
nMaxInstances: DWORD,
nOutBufferSize: DWORD,
nInBufferSize: DWORD,
nDefaultTimeOut: DWORD,
}
/// Creates a new anonymous in-memory pipe, returning the read/write ends of the
/// pipe.
///
/// The buffer size for this pipe may also be specified, but the system will
/// normally use this as a suggestion and it's not guaranteed that the buffer
/// will be precisely this size.
pub fn anonymous(buffer_size: u32) -> io::Result<(AnonRead, AnonWrite)> {
let mut read = 0 as HANDLE;
let mut write = 0 as HANDLE;
crate::cvt(unsafe { CreatePipe(&mut read, &mut write, 0 as *mut _, buffer_size) })?;
Ok((AnonRead(Handle::new(read)), AnonWrite(Handle::new(write))))
}
impl Read for AnonRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl<'a> Read for &'a AnonRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.read(buf)
}
}
impl AsRawHandle for AnonRead {
fn as_raw_handle(&self) -> HANDLE {
self.0.raw()
}
}
impl FromRawHandle for AnonRead {
unsafe fn from_raw_handle(handle: HANDLE) -> AnonRead {
AnonRead(Handle::new(handle))
}
}
impl IntoRawHandle for AnonRead {
fn into_raw_handle(self) -> HANDLE {
self.0.into_raw()
}
}
impl Write for AnonWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl<'a> Write for &'a AnonWrite {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl AsRawHandle for AnonWrite {
fn as_raw_handle(&self) -> HANDLE {
self.0.raw()
}
}
impl FromRawHandle for AnonWrite {
unsafe fn from_raw_handle(handle: HANDLE) -> AnonWrite {
AnonWrite(Handle::new(handle))
}
}
impl IntoRawHandle for AnonWrite {
fn into_raw_handle(self) -> HANDLE {
self.0.into_raw()
}
}
/// A convenience function to connect to a named pipe.
///
/// This function will block the calling process until it can connect to the
/// pipe server specified by `addr`. This will use `NamedPipe::wait` internally
/// to block until it can connect.
pub fn connect<A: AsRef<OsStr>>(addr: A) -> io::Result<File> {
_connect(addr.as_ref())
}
fn _connect(addr: &OsStr) -> io::Result<File> {
let mut r = OpenOptions::new();
let mut w = OpenOptions::new();
let mut rw = OpenOptions::new();
r.read(true);
w.write(true);
rw.read(true).write(true);
loop {
let res = rw
.open(addr)
.or_else(|_| r.open(addr))
.or_else(|_| w.open(addr));
match res {
Ok(f) => return Ok(f),
Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => {}
Err(e) => return Err(e),
}
NamedPipe::wait(addr, Some(Duration::new(20, 0)))?;
}
}
impl NamedPipe {
/// Creates a new initial named pipe.
///
/// This function is equivalent to:
///
/// ```
/// use miow::pipe::NamedPipeBuilder;
///
/// # let addr = "foo";
/// NamedPipeBuilder::new(addr)
/// .first(true)
/// .inbound(true)
/// .outbound(true)
/// .out_buffer_size(65536)
/// .in_buffer_size(65536)
/// .create();
/// ```
pub fn new<A: AsRef<OsStr>>(addr: A) -> io::Result<NamedPipe> {
NamedPipeBuilder::new(addr).create()
}
/// Waits until either a time-out interval elapses or an instance of the
/// specified named pipe is available for connection.
///
/// If this function succeeds the process can create a `File` to connect to
/// the named pipe.
pub fn wait<A: AsRef<OsStr>>(addr: A, timeout: Option<Duration>) -> io::Result<()> {
NamedPipe::_wait(addr.as_ref(), timeout)
}
fn _wait(addr: &OsStr, timeout: Option<Duration>) -> io::Result<()> {
let addr = addr.encode_wide().chain(Some(0)).collect::<Vec<_>>();
let timeout = crate::dur2ms(timeout);
crate::cvt(unsafe { WaitNamedPipeW(addr.as_ptr(), timeout) }).map(|_| ())
}
/// Connects this named pipe to a client, blocking until one becomes
/// available.
///
/// This function will call the `ConnectNamedPipe` function to await for a
/// client to connect. This can be called immediately after the pipe is
/// created, or after it has been disconnected from a previous client.
pub fn connect(&self) -> io::Result<()> {
match crate::cvt(unsafe { ConnectNamedPipe(self.0.raw(), 0 as *mut _) }) {
Ok(_) => Ok(()),
Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(()),
Err(e) => Err(e),
}
}
/// Issue a connection request with the specified overlapped operation.
///
/// This function will issue a request to connect a client to this server,
/// returning immediately after starting the overlapped operation.
///
/// If this function immediately succeeds then `Ok(true)` is returned. If
/// the overlapped operation is enqueued and pending, then `Ok(false)` is
/// returned. Otherwise an error is returned indicating what went wrong.
///
/// # Unsafety
///
/// This function is unsafe because the kernel requires that the
/// `overlapped` pointer is valid until the end of the I/O operation. The
/// kernel also requires that `overlapped` is unique for this I/O operation
/// and is not in use for any other I/O.
///
/// To safely use this function callers must ensure that this pointer is
/// valid until the I/O operation is completed, typically via completion
/// ports and waiting to receive the completion notification on the port.
pub unsafe fn connect_overlapped(&self, overlapped: *mut OVERLAPPED) -> io::Result<bool> {
match crate::cvt(ConnectNamedPipe(self.0.raw(), overlapped)) {
Ok(_) => Ok(true),
Err(ref e) if e.raw_os_error() == Some(ERROR_PIPE_CONNECTED as i32) => Ok(true),
Err(ref e) if e.raw_os_error() == Some(ERROR_IO_PENDING as i32) => Ok(false),
Err(ref e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => Ok(true),
Err(e) => Err(e),
}
}
/// Disconnects this named pipe from any connected client.
pub fn disconnect(&self) -> io::Result<()> {
crate::cvt(unsafe { DisconnectNamedPipe(self.0.raw()) }).map(|_| ())
}
/// Issues an overlapped read operation to occur on this pipe.
///
/// This function will issue an asynchronous read to occur in an overlapped
/// fashion, returning immediately. The `buf` provided will be filled in
/// with data and the request is tracked by the `overlapped` function
/// provided.
///
/// If the operation succeeds immediately, `Ok(Some(n))` is returned where
/// `n` is the number of bytes read. If an asynchronous operation is
/// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
/// it is returned.
///
/// When this operation completes (or if it completes immediately), another
/// mechanism must be used to learn how many bytes were transferred (such as
/// looking at the filed in the IOCP status message).
///
/// # Unsafety
///
/// This function is unsafe because the kernel requires that the `buf` and
/// `overlapped` pointers to be valid until the end of the I/O operation.
/// The kernel also requires that `overlapped` is unique for this I/O
/// operation and is not in use for any other I/O.
///
/// To safely use this function callers must ensure that the pointers are
/// valid until the I/O operation is completed, typically via completion
/// ports and waiting to receive the completion notification on the port.
pub unsafe fn read_overlapped(
&self,
buf: &mut [u8],
overlapped: *mut OVERLAPPED,
) -> io::Result<Option<usize>> {
self.0.read_overlapped(buf, overlapped)
}
/// Issues an overlapped write operation to occur on this pipe.
///
/// This function will issue an asynchronous write to occur in an overlapped
/// fashion, returning immediately. The `buf` provided will be filled in
/// with data and the request is tracked by the `overlapped` function
/// provided.
///
/// If the operation succeeds immediately, `Ok(Some(n))` is returned where
/// `n` is the number of bytes written. If an asynchronous operation is
/// enqueued, then `Ok(None)` is returned. Otherwise if an error occurred
/// it is returned.
///
/// When this operation completes (or if it completes immediately), another
/// mechanism must be used to learn how many bytes were transferred (such as
/// looking at the filed in the IOCP status message).
///
/// # Unsafety
///
/// This function is unsafe because the kernel requires that the `buf` and
/// `overlapped` pointers to be valid until the end of the I/O operation.
/// The kernel also requires that `overlapped` is unique for this I/O
/// operation and is not in use for any other I/O.
///
/// To safely use this function callers must ensure that the pointers are
/// valid until the I/O operation is completed, typically via completion
/// ports and waiting to receive the completion notification on the port.
pub unsafe fn write_overlapped(
&self,
buf: &[u8],
overlapped: *mut OVERLAPPED,
) -> io::Result<Option<usize>> {
self.0.write_overlapped(buf, overlapped)
}
/// Calls the `GetOverlappedResult` function to get the result of an
/// overlapped operation for this handle.
///
/// This function takes the `OVERLAPPED` argument which must have been used
/// to initiate an overlapped I/O operation, and returns either the
/// successful number of bytes transferred during the operation or an error
/// if one occurred.
///
/// # Unsafety
///
/// This function is unsafe as `overlapped` must have previously been used
/// to execute an operation for this handle, and it must also be a valid
/// pointer to an `Overlapped` instance.
///
/// # Panics
///
/// This function will panic
pub unsafe fn result(&self, overlapped: *mut OVERLAPPED) -> io::Result<usize> {
let mut transferred = 0;
let r = GetOverlappedResult(self.0.raw(), overlapped, &mut transferred, FALSE);
if r == 0 {
Err(io::Error::last_os_error())
} else {
Ok(transferred as usize)
}
}
}
thread_local! {
static NAMED_PIPE_OVERLAPPED: RefCell<Option<Overlapped>> = RefCell::new(None);
}
/// Call a function with a threadlocal `Overlapped`. The function `f` should be
/// sure that the event is reset, either manually or by a thread being released.
fn with_threadlocal_overlapped<F>(f: F) -> io::Result<usize>
where
F: FnOnce(&Overlapped) -> io::Result<usize>,
{
NAMED_PIPE_OVERLAPPED.with(|overlapped| {
let mut mborrow = overlapped.borrow_mut();
if let None = *mborrow {
let op = Overlapped::initialize_with_autoreset_event()?;
*mborrow = Some(op);
}
f(mborrow.as_ref().unwrap())
})
}
impl Read for NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0
.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
}
impl<'a> Read for &'a NamedPipe {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0
.read_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
}
impl Write for NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0
.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
fn flush(&mut self) -> io::Result<()> {
<&NamedPipe as Write>::flush(&mut &*self)
}
}
impl<'a> Write for &'a NamedPipe {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// This is necessary because the pipe is opened with `FILE_FLAG_OVERLAPPED`.
with_threadlocal_overlapped(|overlapped| unsafe {
self.0
.write_overlapped_wait(buf, overlapped.raw() as *mut OVERLAPPED)
})
}
fn flush(&mut self) -> io::Result<()> {
crate::cvt(unsafe { FlushFileBuffers(self.0.raw()) }).map(|_| ())
}
}
impl AsRawHandle for NamedPipe {
fn as_raw_handle(&self) -> HANDLE {
self.0.raw()
}
}
impl FromRawHandle for NamedPipe {
unsafe fn from_raw_handle(handle: HANDLE) -> NamedPipe {
NamedPipe(Handle::new(handle))
}
}
impl IntoRawHandle for NamedPipe {
fn into_raw_handle(self) -> HANDLE {
self.0.into_raw()
}
}
fn flag(slot: &mut DWORD, on: bool, val: DWORD) {
if on {
*slot |= val;
} else {
*slot &= !val;
}
}
impl NamedPipeBuilder {
/// Creates a new named pipe builder with the default settings.
pub fn new<A: AsRef<OsStr>>(addr: A) -> NamedPipeBuilder {
NamedPipeBuilder {
name: addr.as_ref().encode_wide().chain(Some(0)).collect(),
dwOpenMode: PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
dwPipeMode: PIPE_TYPE_BYTE,
nMaxInstances: PIPE_UNLIMITED_INSTANCES,
nOutBufferSize: 65536,
nInBufferSize: 65536,
nDefaultTimeOut: 0,
}
}
/// Indicates whether data is allowed to flow from the client to the server.
pub fn inbound(&mut self, allowed: bool) -> &mut Self {
flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_INBOUND);
self
}
/// Indicates whether data is allowed to flow from the server to the client.
pub fn outbound(&mut self, allowed: bool) -> &mut Self {
flag(&mut self.dwOpenMode, allowed, PIPE_ACCESS_OUTBOUND);
self
}
/// Indicates that this pipe must be the first instance.
///
/// If set to true, then creation will fail if there's already an instance
/// elsewhere.
pub fn first(&mut self, first: bool) -> &mut Self {
flag(&mut self.dwOpenMode, first, FILE_FLAG_FIRST_PIPE_INSTANCE);
self
}
/// Indicates whether this server can accept remote clients or not.
pub fn accept_remote(&mut self, accept: bool) -> &mut Self {
flag(&mut self.dwPipeMode, !accept, PIPE_REJECT_REMOTE_CLIENTS);
self
}
/// Specifies the maximum number of instances of the server pipe that are
/// allowed.
///
/// The first instance of a pipe can specify this value. A value of 255
/// indicates that there is no limit to the number of instances.
pub fn max_instances(&mut self, instances: u8) -> &mut Self {
self.nMaxInstances = instances as DWORD;
self
}
/// Specifies the number of bytes to reserver for the output buffer
pub fn out_buffer_size(&mut self, buffer: u32) -> &mut Self {
self.nOutBufferSize = buffer as DWORD;
self
}
/// Specifies the number of bytes to reserver for the input buffer
pub fn in_buffer_size(&mut self, buffer: u32) -> &mut Self {
self.nInBufferSize = buffer as DWORD;
self
}
/// Using the options in this builder, attempt to create a new named pipe.
///
/// This function will call the `CreateNamedPipe` function and return the
/// result.
pub fn create(&mut self) -> io::Result<NamedPipe> {
unsafe { self.with_security_attributes(::std::ptr::null_mut()) }
}
/// Using the options in the builder and the provided security attributes, attempt to create a
/// new named pipe. This function has to be called with a valid pointer to a
/// `SECURITY_ATTRIBUTES` struct that will stay valid for the lifetime of this function or a
/// null pointer.
///
/// This function will call the `CreateNamedPipe` function and return the
/// result.
pub unsafe fn with_security_attributes(
&mut self,
attrs: *mut SECURITY_ATTRIBUTES,
) -> io::Result<NamedPipe> {
let h = CreateNamedPipeW(
self.name.as_ptr(),
self.dwOpenMode,
self.dwPipeMode,
self.nMaxInstances,
self.nOutBufferSize,
self.nInBufferSize,
self.nDefaultTimeOut,
attrs,
);
if h == INVALID_HANDLE_VALUE {
Err(io::Error::last_os_error())
} else {
Ok(NamedPipe(Handle::new(h)))
}
}
}
#[cfg(test)]
mod tests {
use std::fs::{File, OpenOptions};
use std::io::prelude::*;
use std::sync::mpsc::channel;
use std::thread;
use std::time::Duration;
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use super::{anonymous, NamedPipe, NamedPipeBuilder};
use crate::iocp::CompletionPort;
use crate::Overlapped;
fn name() -> String {
let name = thread_rng()
.sample_iter(Alphanumeric)
.take(30)
.map(char::from)
.collect::<String>();
format!(r"\\.\pipe\{}", name)
}
#[test]
fn anon() {
let (mut read, mut write) = t!(anonymous(256));
assert_eq!(t!(write.write(&[1, 2, 3])), 3);
let mut b = [0; 10];
assert_eq!(t!(read.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
}
#[test]
fn named_not_first() {
let name = name();
let _a = t!(NamedPipe::new(&name));
assert!(NamedPipe::new(&name).is_err());
t!(NamedPipeBuilder::new(&name).first(false).create());
}
#[test]
fn named_connect() {
let name = name();
let a = t!(NamedPipe::new(&name));
let t = thread::spawn(move || {
t!(File::open(name));
});
t!(a.connect());
t!(a.disconnect());
t!(t.join());
}
#[test]
fn named_wait() {
let name = name();
let a = t!(NamedPipe::new(&name));
let (tx, rx) = channel();
let t = thread::spawn(move || {
t!(NamedPipe::wait(&name, None));
t!(File::open(&name));
assert!(NamedPipe::wait(&name, Some(Duration::from_millis(1))).is_err());
t!(tx.send(()));
});
t!(a.connect());
t!(rx.recv());
t!(a.disconnect());
t!(t.join());
}
#[test]
fn named_connect_overlapped() {
let name = name();
let a = t!(NamedPipe::new(&name));
let t = thread::spawn(move || {
t!(File::open(name));
});
let cp = t!(CompletionPort::new(1));
t!(cp.add_handle(2, &a));
let over = Overlapped::zero();
unsafe {
t!(a.connect_overlapped(over.raw()));
}
let status = t!(cp.get(None));
assert_eq!(status.bytes_transferred(), 0);
assert_eq!(status.token(), 2);
assert_eq!(status.overlapped(), over.raw());
t!(t.join());
}
#[test]
fn named_read_write() {
let name = name();
let mut a = t!(NamedPipe::new(&name));
let t = thread::spawn(move || {
let mut f = t!(OpenOptions::new().read(true).write(true).open(name));
t!(f.write_all(&[1, 2, 3]));
let mut b = [0; 10];
assert_eq!(t!(f.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
});
t!(a.connect());
let mut b = [0; 10];
assert_eq!(t!(a.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
t!(a.write_all(&[1, 2, 3]));
t!(a.flush());
t!(a.disconnect());
t!(t.join());
}
#[test]
fn named_read_write_multi() {
for _ in 0..5 {
named_read_write()
}
}
#[test]
fn named_read_write_multi_same_thread() {
let name1 = name();
let mut a1 = t!(NamedPipe::new(&name1));
let name2 = name();
let mut a2 = t!(NamedPipe::new(&name2));
let t = thread::spawn(move || {
let mut f = t!(OpenOptions::new().read(true).write(true).open(name1));
t!(f.write_all(&[1, 2, 3]));
let mut b = [0; 10];
assert_eq!(t!(f.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
let mut f = t!(OpenOptions::new().read(true).write(true).open(name2));
t!(f.write_all(&[1, 2, 3]));
let mut b = [0; 10];
assert_eq!(t!(f.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
});
t!(a1.connect());
let mut b = [0; 10];
assert_eq!(t!(a1.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
t!(a1.write_all(&[1, 2, 3]));
t!(a1.flush());
t!(a1.disconnect());
t!(a2.connect());
let mut b = [0; 10];
assert_eq!(t!(a2.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3]);
t!(a2.write_all(&[1, 2, 3]));
t!(a2.flush());
t!(a2.disconnect());
t!(t.join());
}
#[test]
fn named_read_overlapped() {
let name = name();
let a = t!(NamedPipe::new(&name));
let t = thread::spawn(move || {
let mut f = t!(File::create(name));
t!(f.write_all(&[1, 2, 3]));
});
let cp = t!(CompletionPort::new(1));
t!(cp.add_handle(3, &a));
t!(a.connect());
let mut b = [0; 10];
let over = Overlapped::zero();
unsafe {
t!(a.read_overlapped(&mut b, over.raw()));
}
let status = t!(cp.get(None));
assert_eq!(status.bytes_transferred(), 3);
assert_eq!(status.token(), 3);
assert_eq!(status.overlapped(), over.raw());
assert_eq!(&b[..3], &[1, 2, 3]);
t!(t.join());
}
#[test]
fn named_write_overlapped() {
let name = name();
let a = t!(NamedPipe::new(&name));
let t = thread::spawn(move || {
let mut f = t!(super::connect(name));
let mut b = [0; 10];
assert_eq!(t!(f.read(&mut b)), 3);
assert_eq!(&b[..3], &[1, 2, 3])
});
let cp = t!(CompletionPort::new(1));
t!(cp.add_handle(3, &a));
t!(a.connect());
let over = Overlapped::zero();
unsafe {
t!(a.write_overlapped(&[1, 2, 3], over.raw()));
}
let status = t!(cp.get(None));
assert_eq!(status.bytes_transferred(), 3);
assert_eq!(status.token(), 3);
assert_eq!(status.overlapped(), over.raw());
t!(t.join());
}
}