DXR is a code search and navigation tool aimed at making sense of large projects. It supports full-text and regex searches as well as structural queries.

Mercurial (d1ac49b9eb3e)

VCS Links

Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
extern crate futures;
extern crate tokio;
extern crate tokio_io;
extern crate env_logger;

use std::{io, thread};
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;

use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
use tokio::runtime::Runtime;

macro_rules! t {
    ($e:expr) => (match $e {
        Ok(e) => e,
        Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
    })
}

#[test]
fn hammer_old() {
    let _ = env_logger::try_init();

    let threads = (0..10).map(|_| {
        thread::spawn(|| {
            let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
            let addr = t!(srv.local_addr());
            let mine = TcpStream::connect(&addr);
            let theirs = srv.incoming().into_future()
                .map(|(s, _)| s.unwrap())
                .map_err(|(s, _)| s);
            let (mine, theirs) = t!(mine.join(theirs).wait());

            assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
            assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));
        })
    }).collect::<Vec<_>>();
    for thread in threads {
        thread.join().unwrap();
    }
}

struct Rd(Arc<TcpStream>);
struct Wr(Arc<TcpStream>);

impl io::Read for Rd {
    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
        <&TcpStream>::read(&mut &*self.0, dst)
    }
}

impl tokio_io::AsyncRead for Rd {
}

impl io::Write for Wr {
    fn write(&mut self, src: &[u8]) -> io::Result<usize> {
        <&TcpStream>::write(&mut &*self.0, src)
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

impl tokio_io::AsyncWrite for Wr {
    fn shutdown(&mut self) -> Poll<(), io::Error> {
        Ok(().into())
    }
}

#[test]
fn hammer_split() {
    use tokio_io::io;

    const N: usize = 100;
    const ITER: usize = 10;

    let _ = env_logger::try_init();

    for _ in 0..ITER {
        let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap()));
        let addr = t!(srv.local_addr());

        let cnt = Arc::new(AtomicUsize::new(0));

        let mut rt = Runtime::new().unwrap();

        fn split(socket: TcpStream, cnt: Arc<AtomicUsize>) {
            let socket = Arc::new(socket);
            let rd = Rd(socket.clone());
            let wr = Wr(socket);

            let cnt2 = cnt.clone();

            let rd = io::read(rd, vec![0; 1])
                .map(move |_| {
                    cnt2.fetch_add(1, Relaxed);
                })
                .map_err(|e| panic!("read error = {:?}", e));

            let wr = io::write_all(wr, b"1")
                .map(move |_| {
                    cnt.fetch_add(1, Relaxed);
                })
                .map_err(move |e| panic!("write error = {:?}", e));

            tokio::spawn(rd);
            tokio::spawn(wr);
        }

        rt.spawn({
            let cnt = cnt.clone();
            srv.incoming()
                .map_err(|e| panic!("accept error = {:?}", e))
                .take(N as u64)
                .for_each(move |socket| {
                    split(socket, cnt.clone());
                    Ok(())
                })
        });

        for _ in 0..N {
            rt.spawn({
                let cnt = cnt.clone();
                TcpStream::connect(&addr)
                    .map_err(move |e| panic!("connect error = {:?}", e))
                    .map(move |socket| split(socket, cnt))
            });
        }

        rt.shutdown_on_idle().wait().unwrap();
        assert_eq!(N * 4, cnt.load(Relaxed));
    }
}