1
//! Re-exports of the async_std runtime for use with arti.
2
//!
3
//! This crate helps define a slim API around our async runtime so that we
4
//! can easily swap it out.
5
//!
6
//! We'll probably want to support tokio as well in the future.
7

            
8
/// Types used for networking (async_std implementation)
9
mod net {
10
    use crate::traits;
11

            
12
    use async_std_crate::net::{TcpListener, TcpStream};
13
    use async_trait::async_trait;
14
    use futures::future::Future;
15
    use futures::stream::Stream;
16
    use std::io::Result as IoResult;
17
    use std::net::SocketAddr;
18
    use std::pin::Pin;
19
    use std::task::{Context, Poll};
20

            
21
    /// A `Stream` of incoming TCP streams.
22
    ///
23
    /// Differs from the output of [`TcpListener::incoming`] in that this
24
    /// struct is a real type, and that it returns a TCP stream and an address
25
    /// for each input.
26
    pub struct IncomingStreams {
27
        /// A state object, stored in an Option so we can take ownership of it
28
        /// while poll is being called.
29
        // TODO(nickm): I hate using this trick.  At some point in the
30
        // future, once Rust has nice support for async traits, maybe
31
        // we can refactor it.
32
        state: Option<IncomingStreamsState>,
33
    }
34
    /// The result type returned by [`take_and_poll`].
35
    ///
36
    /// It has to include the TcpListener, since take_and_poll() has
37
    /// ownership of the listener.
38
    type FResult = (IoResult<(TcpStream, SocketAddr)>, TcpListener);
39
    /// Helper to implement [`IncomingStreams`]
40
    ///
41
    /// This function calls [`TcpListener::accept`] while owning the
42
    /// listener.  Thus, it returns a future that itself owns the listener,
43
    /// and we don't have lifetime troubles.
44
6
    async fn take_and_poll(lis: TcpListener) -> FResult {
45
6
        let result = lis.accept().await;
46
6
        (result, lis)
47
6
    }
48
    /// The possible states for an [`IncomingStreams`].
49
    enum IncomingStreamsState {
50
        /// We're ready to call `accept` on the listener again.
51
        Ready(TcpListener),
52
        /// We've called `accept` on the listener, and we're waiting
53
        /// for a future to complete.
54
        Accepting(Pin<Box<dyn Future<Output = FResult>>>),
55
    }
56
    impl IncomingStreams {
57
        /// Create a new IncomingStreams from a TcpListener.
58
1
        pub fn from_listener(lis: TcpListener) -> IncomingStreams {
59
1
            IncomingStreams {
60
1
                state: Some(IncomingStreamsState::Ready(lis)),
61
1
            }
62
1
        }
63
    }
64
    impl Stream for IncomingStreams {
65
        type Item = IoResult<(TcpStream, SocketAddr)>;
66

            
67
7
        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
68
7
            use IncomingStreamsState as St;
69
7
            let state = self.state.take().expect("No valid state!");
70
7
            let mut future = match state {
71
6
                St::Ready(lis) => Box::pin(take_and_poll(lis)),
72
1
                St::Accepting(fut) => fut,
73
            };
74
7
            match future.as_mut().poll(cx) {
75
6
                Poll::Ready((val, lis)) => {
76
6
                    self.state = Some(St::Ready(lis));
77
6
                    Poll::Ready(Some(val))
78
                }
79
                Poll::Pending => {
80
1
                    self.state = Some(St::Accepting(future));
81
1
                    Poll::Pending
82
                }
83
            }
84
7
        }
85
    }
86
    #[async_trait]
87
    impl traits::TcpListener for TcpListener {
88
        type TcpStream = TcpStream;
89
        type Incoming = IncomingStreams;
90
1
        async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
91
1
            TcpListener::accept(self).await
92
2
        }
93
1
        fn incoming(self) -> IncomingStreams {
94
1
            IncomingStreams::from_listener(self)
95
1
        }
96
2
        fn local_addr(&self) -> IoResult<SocketAddr> {
97
2
            TcpListener::local_addr(self)
98
2
        }
99
    }
100

            
101
    #[async_trait]
102
    impl traits::TcpProvider for async_executors::AsyncStd {
103
        type TcpStream = TcpStream;
104
        type TcpListener = TcpListener;
105
9
        async fn connect(&self, addr: &SocketAddr) -> IoResult<Self::TcpStream> {
106
15
            TcpStream::connect(addr).await
107
18
        }
108
2
        async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::TcpListener> {
109
2
            TcpListener::bind(*addr).await
110
4
        }
111
    }
112
}
113

            
114
// ==============================
115

            
116
use futures::{Future, FutureExt};
117
use std::pin::Pin;
118
use std::time::Duration;
119

            
120
use crate::traits::*;
121

            
122
/// Create and return a new `async_std` runtime.
123
573
pub fn create_runtime() -> async_executors::AsyncStd {
124
573
    async_executors::AsyncStd::new()
125
573
}
126

            
127
impl SleepProvider for async_executors::AsyncStd {
128
    type SleepFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
129
76
    fn sleep(&self, duration: Duration) -> Self::SleepFuture {
130
301
        Box::pin(async_io::Timer::after(duration).map(|_| ()))
131
76
    }
132
}
133

            
134
impl BlockOn for async_executors::AsyncStd {
135
107
    fn block_on<F: Future>(&self, f: F) -> F::Output {
136
107
        async_executors::AsyncStd::block_on(f)
137
107
    }
138
}