DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (6863f516ba38)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
#![cfg(feature = "unstable-futures")]

// This test is the same as `global.rs`, but ported to futures 0.2

extern crate futures;
extern crate futures2;
extern crate tokio;
extern crate tokio_io;
extern crate env_logger;

use std::{io, thread};
use std::sync::Arc;

use futures2::prelude::*;
use futures2::executor::block_on;
use futures2::task;

use tokio::net::{TcpStream, TcpListener};
use tokio::runtime::Runtime;

macro_rules! t {
    ($e:expr) => (match $e {
        Ok(e) => e,
        Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
    })
}

#[test]
fn hammer() {
    let _ = env_logger::init();

    let threads = (0..10).map(|_| {
        thread::spawn(|| {
            let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
            let addr = t!(srv.local_addr());
            let mine = TcpStream::connect(&addr);
            let theirs = srv.incoming().next()
                .map(|(s, _)| s.unwrap())
                .map_err(|(s, _)| s);
            let (mine, theirs) = t!(block_on(mine.join(theirs)));

            assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
            assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));
        })
    }).collect::<Vec<_>>();
    for thread in threads {
        thread.join().unwrap();
    }
}

struct Rd(Arc<TcpStream>);
struct Wr(Arc<TcpStream>);

impl AsyncRead for Rd {
    fn poll_read(&mut self, cx: &mut task::Context, dst: &mut [u8]) -> Poll<usize, io::Error> {
        <&TcpStream>::poll_read(&mut &*self.0, cx, dst)
    }
}

impl AsyncWrite for Wr {
    fn poll_write(&mut self, cx: &mut task::Context, src: &[u8]) -> Poll<usize, io::Error> {
        <&TcpStream>::poll_write(&mut &*self.0, cx, src)
    }

    fn poll_flush(&mut self, _cx: &mut task::Context) -> Poll<(), io::Error> {
        Ok(().into())
    }

    fn poll_close(&mut self, _cx: &mut task::Context) -> Poll<(), io::Error> {
        Ok(().into())
    }
}

#[test]
fn hammer_split() {
    const N: usize = 100;

    let _ = env_logger::init();

    let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
    let addr = t!(srv.local_addr());

    let mut rt = Runtime::new().unwrap();

    fn split(socket: TcpStream) {
        let socket = Arc::new(socket);
        let rd = Rd(socket.clone());
        let wr = Wr(socket);

        let rd = rd.read(vec![0; 1])
            .map(|_| ())
            .map_err(|e| panic!("read error = {:?}", e));

        let wr = wr.write_all(b"1")
            .map(|_| ())
            .map_err(|e| panic!("write error = {:?}", e));

        tokio::spawn2(rd);
        tokio::spawn2(wr);
    }

    rt.spawn2({
        srv.incoming()
            .map_err(|e| panic!("accept error = {:?}", e))
            .take(N as u64)
            .for_each(|socket| {
                split(socket);
                Ok(())
            })
            .map(|_| ())
    });

    for _ in 0..N {
        rt.spawn2({
            TcpStream::connect(&addr)
                .map_err(|e| panic!("connect error = {:?}", e))
                .map(|socket| split(socket))
        });
    }

    futures::Future::wait(rt.shutdown_on_idle()).unwrap();
}