1
//! Implement a concrete type to build channels.
2

            
3
use std::io;
4
use std::net::SocketAddr;
5
use std::sync::{Arc, Mutex};
6

            
7
use crate::{event::ChanMgrEventSender, Error};
8

            
9
use std::time::Duration;
10
use tor_error::{bad_api_usage, internal};
11
use tor_linkspec::{ChanTarget, OwnedChanTarget};
12
use tor_llcrypto::pk;
13
use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider};
14

            
15
use async_trait::async_trait;
16
use futures::stream::FuturesUnordered;
17
use futures::task::SpawnExt;
18
use futures::StreamExt;
19
use futures::{FutureExt, TryFutureExt};
20

            
21
/// Time to wait between starting parallel connections to the same relay.
22
static CONNECTION_DELAY: Duration = Duration::from_millis(150);
23

            
24
/// TLS-based channel builder.
25
///
26
/// This is a separate type so that we can keep our channel management
27
/// code network-agnostic.
28
pub(crate) struct ChanBuilder<R: Runtime> {
29
    /// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
30
    runtime: R,
31
    /// Used to update our bootstrap reporting status.
32
    event_sender: Mutex<ChanMgrEventSender>,
33
    /// Object to build TLS connections.
34
    tls_connector: <R as TlsProvider<R::TcpStream>>::Connector,
35
}
36

            
37
impl<R: Runtime> ChanBuilder<R> {
38
    /// Construct a new ChanBuilder.
39
19
    pub(crate) fn new(runtime: R, event_sender: ChanMgrEventSender) -> Self {
40
19
        let tls_connector = runtime.tls_connector();
41
19
        ChanBuilder {
42
19
            runtime,
43
19
            event_sender: Mutex::new(event_sender),
44
19
            tls_connector,
45
19
        }
46
19
    }
47
}
48

            
49
#[async_trait]
50
impl<R: Runtime> crate::mgr::ChannelFactory for ChanBuilder<R> {
51
    type Channel = tor_proto::channel::Channel;
52
    type BuildSpec = OwnedChanTarget;
53

            
54
1
    async fn build_channel(&self, target: &Self::BuildSpec) -> crate::Result<Self::Channel> {
55
        use tor_rtcompat::SleepProviderExt;
56

            
57
        // TODO: make this an option.  And make a better value.
58
1
        let five_seconds = std::time::Duration::new(5, 0);
59
1

            
60
1
        self.runtime
61
3
            .timeout(five_seconds, self.build_channel_notimeout(target))
62
3
            .await?
63
2
    }
64
}
65

            
66
/// Connect to one of the addresses in `addrs` by running connections in parallel until one works.
67
///
68
/// This implements a basic version of RFC 8305 "happy eyeballs".
69
1
async fn connect_to_one<R: Runtime>(
70
1
    rt: &R,
71
1
    addrs: &[SocketAddr],
72
1
) -> crate::Result<(<R as TcpProvider>::TcpStream, SocketAddr)> {
73
1
    // We need *some* addresses to connect to.
74
1
    if addrs.is_empty() {
75
        return Err(Error::UnusableTarget(bad_api_usage!(
76
            "No addresses for chosen relay"
77
        )));
78
1
    }
79
1

            
80
1
    // Turn each address into a future that waits (i * CONNECTION_DELAY), then
81
1
    // attempts to connect to the address using the runtime (where i is the
82
1
    // array index). Shove all of these into a `FuturesUnordered`, polling them
83
1
    // simultaneously and returning the results in completion order.
84
1
    //
85
1
    // This is basically the concurrent-connection stuff from RFC 8305, ish.
86
1
    // TODO(eta): sort the addresses first?
87
1
    let mut connections = addrs
88
1
        .iter()
89
1
        .enumerate()
90
1
        .map(|(i, a)| {
91
1
            let delay = rt.sleep(CONNECTION_DELAY * i as u32);
92
1
            delay.then(move |_| {
93
1
                tracing::info!("Connecting to {}", a);
94
1
                rt.connect(a)
95
1
                    .map_ok(move |stream| (stream, *a))
96
1
                    .map_err(move |e| (e, *a))
97
1
            })
98
1
        })
99
1
        .collect::<FuturesUnordered<_>>();
100
1

            
101
1
    let mut ret = None;
102
1
    let mut errors = vec![];
103

            
104
1
    while let Some(result) = connections.next().await {
105
1
        match result {
106
1
            Ok(s) => {
107
1
                // We got a stream (and address).
108
1
                ret = Some(s);
109
1
                break;
110
            }
111
            Err((e, a)) => {
112
                // We got a failure on one of the streams. Store the error.
113
                // TODO(eta): ideally we'd start the next connection attempt immediately.
114
                tracing::warn!("Connection to {} failed: {}", a, e);
115
                errors.push((e, a));
116
            }
117
        }
118
    }
119

            
120
    // Ensure we don't continue trying to make connections.
121
1
    drop(connections);
122
1

            
123
1
    ret.ok_or_else(|| Error::ChannelBuild {
124
        addresses: errors.into_iter().map(|(e, a)| (a, Arc::new(e))).collect(),
125
1
    })
126
1
}
127

            
128
impl<R: Runtime> ChanBuilder<R> {
129
    /// As build_channel, but don't include a timeout.
130
1
    async fn build_channel_notimeout(
131
1
        &self,
132
1
        target: &OwnedChanTarget,
133
1
    ) -> crate::Result<tor_proto::channel::Channel> {
134
1
        use tor_proto::channel::ChannelBuilder;
135
1
        use tor_rtcompat::tls::CertifiedConn;
136
1

            
137
1
        // 1. Negotiate the TLS connection.
138
1
        {
139
1
            self.event_sender
140
1
                .lock()
141
1
                .expect("Lock poisoned")
142
1
                .record_attempt();
143
1
        }
144

            
145
1
        let (stream, addr) = connect_to_one(&self.runtime, target.addrs()).await?;
146

            
147
2
        let map_ioe = |action: &'static str| {
148
            move |ioe: io::Error| Error::Io {
149
                action,
150
                peer: addr,
151
                source: ioe.into(),
152
            }
153
2
        };
154

            
155
1
        {
156
1
            self.event_sender
157
1
                .lock()
158
1
                .expect("Lock poisoned")
159
1
                .record_tcp_success();
160
1
        }
161

            
162
        // TODO: add a random hostname here if it will be used for SNI?
163
1
        let tls = self
164
1
            .tls_connector
165
1
            .negotiate_unvalidated(stream, "ignored")
166
            .await
167
1
            .map_err(map_ioe("TLS negotiation"))?;
168

            
169
1
        let peer_cert = tls
170
1
            .peer_certificate()
171
1
            .map_err(map_ioe("TLS certs"))?
172
1
            .ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
173

            
174
1
        {
175
1
            self.event_sender
176
1
                .lock()
177
1
                .expect("Lock poisoned")
178
1
                .record_tls_finished();
179
1
        }
180
1

            
181
1
        // 2. Set up the channel.
182
1
        let mut builder = ChannelBuilder::new();
183
1
        builder.set_declared_addr(addr);
184
3
        let chan = builder.launch(tls).connect().await?;
185
1
        let now = self.runtime.wallclock();
186
1
        let chan = chan.check(target, &peer_cert, Some(now))?;
187
1
        let (chan, reactor) = chan.finish().await?;
188

            
189
1
        {
190
1
            self.event_sender
191
1
                .lock()
192
1
                .expect("Lock poisoned")
193
1
                .record_handshake_done();
194
1
        }
195
1

            
196
1
        // 3. Launch a task to run the channel reactor.
197
1
        self.runtime
198
1
            .spawn(async {
199
1
                let _ = reactor.run().await;
200
1
            })
201
1
            .map_err(|e| Error::from_spawn("channel reactor", e))?;
202
1
        Ok(chan)
203
1
    }
204
}
205

            
206
impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
207
    type Ident = pk::ed25519::Ed25519Identity;
208
1
    fn ident(&self) -> &Self::Ident {
209
1
        self.peer_ed25519_id()
210
1
    }
211
1
    fn is_usable(&self) -> bool {
212
1
        !self.is_closing()
213
1
    }
214
    fn duration_unused(&self) -> Option<Duration> {
215
        self.duration_unused()
216
    }
217
}
218

            
219
#[cfg(test)]
220
mod test {
221
    #![allow(clippy::unwrap_used)]
222
    use super::*;
223
    use crate::{
224
        mgr::{AbstractChannel, ChannelFactory},
225
        Result,
226
    };
227
    use pk::ed25519::Ed25519Identity;
228
    use pk::rsa::RsaIdentity;
229
    use std::net::SocketAddr;
230
    use std::time::{Duration, SystemTime};
231
    use tor_proto::channel::Channel;
232
    use tor_rtcompat::{test_with_one_runtime, TcpListener};
233
    use tor_rtmock::{io::LocalStream, net::MockNetwork, MockSleepRuntime};
234

            
235
    // Make sure that the builder can build a real channel.  To test
236
    // this out, we set up a listener that pretends to have the right
237
    // IP, fake the current time, and use a canned response from
238
    // [`testing::msgs`] crate.
239
    #[test]
240
    fn build_ok() -> Result<()> {
241
        use crate::testing::msgs;
242
        let orport: SocketAddr = msgs::ADDR.parse().unwrap();
243
        let ed: Ed25519Identity = msgs::ED_ID.into();
244
        let rsa: RsaIdentity = msgs::RSA_ID.into();
245
        let client_addr = "192.0.2.17".parse().unwrap();
246
        let tls_cert = msgs::X509_CERT.into();
247
        let target = OwnedChanTarget::new(vec![orport], ed, rsa);
248
        let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
249

            
250
        test_with_one_runtime!(|rt| async move {
251
            // Stub out the internet so that this connection can work.
252
            let network = MockNetwork::new();
253

            
254
            // Set up a client runtime with a given IP
255
            let client_rt = network
256
                .builder()
257
                .add_address(client_addr)
258
                .runtime(rt.clone());
259
            // Mock the current time too
260
            let client_rt = MockSleepRuntime::new(client_rt);
261

            
262
            // Set up a relay runtime with a different IP
263
            let relay_rt = network
264
                .builder()
265
                .add_address(orport.ip())
266
                .runtime(rt.clone());
267

            
268
            // open a fake TLS listener and be ready to handle a request.
269
            let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
270

            
271
            // Tell the client to believe in a different timestamp.
272
            client_rt.jump_to(now);
273

            
274
            // Create the channelbuilder that we want to test.
275
            let (snd, _rcv) = crate::event::channel();
276
            let builder = ChanBuilder::new(client_rt, snd);
277

            
278
            let (r1, r2): (Result<Channel>, Result<LocalStream>) = futures::join!(
279
                async {
280
                    // client-side: build a channel!
281
                    builder.build_channel(&target).await
282
                },
283
                async {
284
                    // relay-side: accept the channel
285
                    // (and pretend to know what we're doing).
286
                    let (mut con, addr) = lis.accept().await.expect("accept failed");
287
                    assert_eq!(client_addr, addr.ip());
288
                    crate::testing::answer_channel_req(&mut con)
289
                        .await
290
                        .expect("answer failed");
291
                    Ok(con)
292
                }
293
            );
294

            
295
            let chan = r1.unwrap();
296
            assert_eq!(chan.ident(), &ed);
297
            assert!(chan.is_usable());
298
            r2.unwrap();
299
            Ok(())
300
        })
301
    }
302

            
303
    // TODO: Write tests for timeout logic, once there is smarter logic.
304
}