1
//! Re-exports of the tokio runtime for use with arti.
2
//!
3
//! This crate helps define a slim API around our async runtime so that we
4
//! can easily swap it out.
5

            
6
/// Types used for networking (tokio implementation)
7
pub(crate) mod net {
8
    use crate::traits;
9
    use async_trait::async_trait;
10

            
11
    pub(crate) use tokio_crate::net::{
12
        TcpListener as TokioTcpListener, TcpStream as TokioTcpStream,
13
    };
14

            
15
    use futures::io::{AsyncRead, AsyncWrite};
16
    use tokio_util::compat::{Compat, TokioAsyncReadCompatExt as _};
17

            
18
    use std::io::Result as IoResult;
19
    use std::net::SocketAddr;
20
    use std::pin::Pin;
21
    use std::task::{Context, Poll};
22

            
23
    /// Wrapper for Tokio's TcpStream that implements the standard
24
    /// AsyncRead and AsyncWrite.
25
    pub struct TcpStream {
26
        /// Underlying tokio_util::compat::Compat wrapper.
27
        s: Compat<TokioTcpStream>,
28
    }
29
    impl From<TokioTcpStream> for TcpStream {
30
31
        fn from(s: TokioTcpStream) -> TcpStream {
31
31
            let s = s.compat();
32
31
            TcpStream { s }
33
31
        }
34
    }
35
    impl AsyncRead for TcpStream {
36
82
        fn poll_read(
37
82
            mut self: Pin<&mut Self>,
38
82
            cx: &mut Context<'_>,
39
82
            buf: &mut [u8],
40
82
        ) -> Poll<IoResult<usize>> {
41
82
            Pin::new(&mut self.s).poll_read(cx, buf)
42
82
        }
43
    }
44
    impl AsyncWrite for TcpStream {
45
27
        fn poll_write(
46
27
            mut self: Pin<&mut Self>,
47
27
            cx: &mut Context<'_>,
48
27
            buf: &[u8],
49
27
        ) -> Poll<IoResult<usize>> {
50
27
            Pin::new(&mut self.s).poll_write(cx, buf)
51
27
        }
52
23
        fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
53
23
            Pin::new(&mut self.s).poll_flush(cx)
54
23
        }
55
3
        fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
56
3
            Pin::new(&mut self.s).poll_close(cx)
57
3
        }
58
    }
59

            
60
    /// Wrap a Tokio TcpListener to behave as a futures::io::TcpListener.
61
    pub struct TcpListener {
62
        /// The underlying listener.
63
        pub(super) lis: TokioTcpListener,
64
    }
65

            
66
    /// Asynchronous stream that yields incoming connections from a
67
    /// TcpListener.
68
    ///
69
    /// This is analogous to async_std::net::Incoming.
70
    pub struct IncomingTcpStreams {
71
        /// Reference to the underlying listener.
72
        pub(super) lis: TokioTcpListener,
73
    }
74

            
75
    impl futures::stream::Stream for IncomingTcpStreams {
76
        type Item = IoResult<(TcpStream, SocketAddr)>;
77

            
78
        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79
14
            match self.lis.poll_accept(cx) {
80
12
                Poll::Ready(Ok((s, a))) => Poll::Ready(Some(Ok((s.into(), a)))),
81
                Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
82
2
                Poll::Pending => Poll::Pending,
83
            }
84
14
        }
85
    }
86
    #[async_trait]
87
    impl traits::TcpListener for TcpListener {
88
        type TcpStream = TcpStream;
89
        type Incoming = IncomingTcpStreams;
90
2
        async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
91
2
            let (stream, addr) = self.lis.accept().await?;
92
2
            Ok((stream.into(), addr))
93
4
        }
94
2
        fn incoming(self) -> Self::Incoming {
95
2
            IncomingTcpStreams { lis: self.lis }
96
2
        }
97
4
        fn local_addr(&self) -> IoResult<SocketAddr> {
98
4
            self.lis.local_addr()
99
4
        }
100
    }
101
}
102

            
103
// ==============================
104

            
105
use crate::traits::*;
106
use async_trait::async_trait;
107
use futures::Future;
108
use std::io::Result as IoResult;
109
use std::time::Duration;
110

            
111
impl SleepProvider for TokioRuntimeHandle {
112
    type SleepFuture = tokio_crate::time::Sleep;
113
782
    fn sleep(&self, duration: Duration) -> Self::SleepFuture {
114
782
        tokio_crate::time::sleep(duration)
115
782
    }
116
}
117

            
118
#[async_trait]
119
impl crate::traits::TcpProvider for TokioRuntimeHandle {
120
    type TcpStream = net::TcpStream;
121
    type TcpListener = net::TcpListener;
122

            
123
17
    async fn connect(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpStream> {
124
17
        let s = net::TokioTcpStream::connect(addr).await?;
125
17
        Ok(s.into())
126
34
    }
127
4
    async fn listen(&self, addr: &std::net::SocketAddr) -> IoResult<Self::TcpListener> {
128
4
        let lis = net::TokioTcpListener::bind(*addr).await?;
129
4
        Ok(net::TcpListener { lis })
130
8
    }
131
}
132

            
133
/// Create and return a new Tokio multithreaded runtime.
134
2415
pub(crate) fn create_runtime() -> IoResult<TokioRuntimeHandle> {
135
2415
    let mut builder = async_executors::TokioTpBuilder::new();
136
2415
    builder.tokio_builder().enable_all();
137
2415
    let owned = builder.build()?;
138
2415
    Ok(owned.into())
139
2415
}
140

            
141
/// Wrapper around a Handle to a tokio runtime.
142
///
143
/// Ideally, this type would go away, and we would just use
144
/// `tokio::runtime::Handle` directly.  Unfortunately, we can't implement
145
/// `futures::Spawn` on it ourselves because of Rust's orphan rules, so we need
146
/// to define a new type here.
147
///
148
/// # Limitations
149
///
150
/// Note that Arti requires that the runtime should have working implementations
151
/// for Tokio's time, net, and io facilities, but we have no good way to check
152
/// that when creating this object.
153
4816
#[derive(Clone, Debug)]
154
pub struct TokioRuntimeHandle {
155
    /// If present, the tokio executor that we've created (and which we own).
156
    ///
157
    /// We never access this directly; only through `handle`.  We keep it here
158
    /// so that our Runtime types can be agnostic about whether they own the
159
    /// executor.
160
    owned: Option<async_executors::TokioTp>,
161
    /// The underlying Handle.
162
    handle: tokio_crate::runtime::Handle,
163
}
164

            
165
impl TokioRuntimeHandle {
166
    /// Wrap a tokio runtime handle into a format that Arti can use.
167
    ///
168
    /// # Limitations
169
    ///
170
    /// Note that Arti requires that the runtime should have working
171
    /// implementations for Tokio's time, net, and io facilities, but we have
172
    /// no good way to check that when creating this object.
173
2
    pub(crate) fn new(handle: tokio_crate::runtime::Handle) -> Self {
174
2
        handle.into()
175
2
    }
176

            
177
    /// Return true if this handle owns the executor that it points to.
178
    pub fn is_owned(&self) -> bool {
179
        self.owned.is_some()
180
    }
181
}
182

            
183
impl From<tokio_crate::runtime::Handle> for TokioRuntimeHandle {
184
2
    fn from(handle: tokio_crate::runtime::Handle) -> Self {
185
2
        Self {
186
2
            owned: None,
187
2
            handle,
188
2
        }
189
2
    }
190
}
191

            
192
impl From<async_executors::TokioTp> for TokioRuntimeHandle {
193
2415
    fn from(owner: async_executors::TokioTp) -> TokioRuntimeHandle {
194
2415
        let handle = owner.block_on(async { tokio_crate::runtime::Handle::current() });
195
2415
        Self {
196
2415
            owned: Some(owner),
197
2415
            handle,
198
2415
        }
199
2415
    }
200
}
201

            
202
impl BlockOn for TokioRuntimeHandle {
203
155
    fn block_on<F: Future>(&self, f: F) -> F::Output {
204
155
        self.handle.block_on(f)
205
155
    }
206
}
207

            
208
impl futures::task::Spawn for TokioRuntimeHandle {
209
3150
    fn spawn_obj(
210
3150
        &self,
211
3150
        future: futures::task::FutureObj<'static, ()>,
212
3150
    ) -> Result<(), futures::task::SpawnError> {
213
3150
        let join_handle = self.handle.spawn(future);
214
3150
        drop(join_handle); // this makes the task detached.
215
3150
        Ok(())
216
3150
    }
217
}