1
//! Code for exporting events from the channel manager.
2
#![allow(dead_code, unreachable_pub)]
3

            
4
use educe::Educe;
5
use futures::{Stream, StreamExt};
6
use postage::watch;
7
use std::{
8
    fmt,
9
    time::{Duration, Instant},
10
};
11
use tor_basic_utils::skip_fmt;
12

            
13
/// The status of our connection to the internet.
14
548
#[derive(Default, Debug, Clone)]
15
pub struct ConnStatus {
16
    /// Have we been able to make TCP connections?
17
    ///
18
    /// True if we've been able to make outgoing connections recently.
19
    /// False if we've definitely been failing.
20
    /// None if we haven't succeeded yet, but it's too early to say if
21
    /// that's a problem.
22
    online: Option<bool>,
23
    /// Have we been able to successfully negotiate full Tor handshakes?
24
    ///
25
    /// True if we've been able to make TLS sessions recently.
26
    /// False if we've definitely been failing.
27
    /// None if we haven't succeeded yet, but it's too early to say if
28
    /// that's a problem.
29
    tls_works: Option<bool>,
30
}
31

            
32
/// A problem detected while connecting to the Tor network.
33
2
#[derive(Debug, Clone, Eq, PartialEq, derive_more::Display)]
34
#[non_exhaustive]
35
pub enum ConnBlockage {
36
    #[display(fmt = "unable to connect to the internet")]
37
    /// We haven't been able to make successful TCP connections.
38
    NoTcp,
39
    /// We've made TCP connections, but our TLS connections either failed, or
40
    /// got hit by an attempted man-in-the-middle attack.
41
    #[display(fmt = "our internet connection seems to be filtered")]
42
    NoHandshake,
43
}
44

            
45
impl ConnStatus {
46
    /// Return true if this status is equal to `other`.
47
    ///
48
    /// Note:(This would just be a PartialEq implementation, but I'm not sure I
49
    /// want to expose that PartialEq for this struct.)
50
20
    fn eq(&self, other: &ConnStatus) -> bool {
51
20
        self.online == other.online && self.tls_works == other.tls_works
52
20
    }
53

            
54
    /// Return true if this status indicates that we can successfully open Tor channels.
55
    pub fn usable(&self) -> bool {
56
        self.online == Some(true) && self.tls_works == Some(true)
57
    }
58

            
59
    /// Return a float representing "how bootstrapped" we are with respect to
60
    /// connecting to the Tor network, where 0 is "not at all" and 1 is
61
    /// "successful".
62
    ///
63
    /// Callers _should not_ depend on the specific meaning of any particular
64
    /// fraction; we may change these fractions in the future.
65
    pub fn frac(&self) -> f32 {
66
5
        match self {
67
            Self {
68
                online: Some(true),
69
                tls_works: Some(true),
70
2
            } => 1.0,
71
            Self {
72
                online: Some(true), ..
73
2
            } => 0.5,
74
3
            _ => 0.0,
75
        }
76
7
    }
77

            
78
    /// Return the cause of why we aren't able to connect to the Tor network,
79
    /// if we think we're stuck.
80
    pub fn blockage(&self) -> Option<ConnBlockage> {
81
5
        match self {
82
            Self {
83
                online: Some(false),
84
                ..
85
2
            } => Some(ConnBlockage::NoTcp),
86
            Self {
87
                tls_works: Some(false),
88
                ..
89
2
            } => Some(ConnBlockage::NoHandshake),
90
3
            _ => None,
91
        }
92
7
    }
93
}
94

            
95
impl fmt::Display for ConnStatus {
96
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97
1
        match self {
98
1
            ConnStatus { online: None, .. } => write!(f, "connecting to the internet"),
99
            ConnStatus {
100
                online: Some(false),
101
                ..
102
1
            } => write!(f, "unable to connect to the internet"),
103
            ConnStatus {
104
                tls_works: None, ..
105
1
            } => write!(f, "handshaking with Tor relays"),
106
            ConnStatus {
107
                tls_works: Some(false),
108
                ..
109
1
            } => write!(f, "unable to handshake with Tor relays"),
110
            ConnStatus {
111
                online: Some(true),
112
                tls_works: Some(true),
113
1
            } => write!(f, "connecting successfully"),
114
        }
115
5
    }
116
}
117

            
118
/// A stream of [`ConnStatus`] events describing changes in our connected-ness.
119
///
120
/// This stream is lossy; a reader might not see some events on the stream, if
121
/// they are produced faster than the reader can consume.  In that case, the
122
/// reader will see more recent updates, and miss older ones.
123
///
124
/// Note that the bootstrap status is not monotonic: we might become less
125
/// bootstrapped than we were before.  (For example, the internet could go
126
/// down.)
127
2
#[derive(Clone, Educe)]
128
#[educe(Debug)]
129
pub struct ConnStatusEvents {
130
    /// The receiver that implements this stream.
131
    ///
132
    /// (We wrap it in a new type here so that we can replace the implementation
133
    /// later on if we need to.)
134
    #[educe(Debug(method = "skip_fmt"))]
135
    inner: watch::Receiver<ConnStatus>,
136
}
137

            
138
impl Stream for ConnStatusEvents {
139
    type Item = ConnStatus;
140
60
    fn poll_next(
141
60
        mut self: std::pin::Pin<&mut Self>,
142
60
        cx: &mut std::task::Context<'_>,
143
60
    ) -> std::task::Poll<Option<Self::Item>> {
144
60
        self.inner.poll_next_unpin(cx)
145
60
    }
146
}
147

            
148
/// Crate-internal view of "how connected are we to the internet?"
149
///
150
/// This is a more complex and costly structure than ConnStatus, so we track
151
/// this here, and only expose the minimum via ConnStatus over a
152
/// `postage::watch`.  Later, we might want to expose more of this information.
153
//
154
// TODO: Eventually we should add some ability to reset our bootstrap status, if
155
// our connections start failing.
156
#[derive(Debug, Clone)]
157
struct ChanMgrStatus {
158
    /// When did we first get initialized?
159
    startup: Instant,
160

            
161
    /// Since we started, how many channels have we tried to build?
162
    n_attempts: usize,
163

            
164
    /// When (if ever) have we made a TCP connection to (what we hoped was) a
165
    /// Tor relay?
166
    ///
167
    /// If we don't reach this point, we're probably not on the internet.
168
    ///
169
    /// If we get no further than this, we're probably having our TCP
170
    /// connections captured or replaced.
171
    last_tcp_success: Option<Instant>,
172

            
173
    /// When (if ever) have we successfully finished a TLS handshake to (what we
174
    /// hoped was) a Tor relay?
175
    ///
176
    /// If we get no further than this, we might be facing a TLS MITM attack.
177
    //
178
    // TODO: We don't actually use this information yet: our output doesn't
179
    // distinguish filtering where TLS succeeds but gets MITM'd from filtering
180
    // where TLS fails.
181
    last_tls_success: Option<Instant>,
182

            
183
    /// When (if ever) have we successfully finished the inner Tor handshake
184
    /// with a relay?
185
    ///
186
    /// If we get to this point, we can successfully talk to something that
187
    /// holds the private key that it's supposed to.
188
    last_chan_success: Option<Instant>,
189
}
190

            
191
impl ChanMgrStatus {
192
    /// Construct a new ChanMgr status.
193
    ///
194
    /// It will be built as having been initialized at the time `now`.
195
273
    fn new_at(now: Instant) -> ChanMgrStatus {
196
273
        ChanMgrStatus {
197
273
            startup: now,
198
273
            n_attempts: 0,
199
273
            last_tcp_success: None,
200
273
            last_tls_success: None,
201
273
            last_chan_success: None,
202
273
        }
203
273
    }
204

            
205
    /// Return a [`ConnStatus`] for the current state, at time `now`.
206
    ///
207
    /// (The time is necessary because a lack of success doesn't indicate a
208
    /// problem until enough time has passed.)
209
15
    fn conn_status_at(&self, now: Instant) -> ConnStatus {
210
        /// How long do we need to be online before we'll acknowledge failure?
211
        const MIN_DURATION: Duration = Duration::from_secs(60);
212
        /// How many attempts do we need to launch before we'll acknowledge failure?
213
        const MIN_ATTEMPTS: usize = 6;
214

            
215
        // If set, it's too early to determine failure.
216
15
        let early = now < self.startup + MIN_DURATION || self.n_attempts < MIN_ATTEMPTS;
217

            
218
15
        let online = match (self.last_tcp_success.is_some(), early) {
219
9
            (true, _) => Some(true),
220
5
            (_, true) => None,
221
1
            (false, false) => Some(false),
222
        };
223

            
224
15
        let tls_works = match (self.last_chan_success.is_some(), early) {
225
3
            (true, _) => Some(true),
226
10
            (_, true) => None,
227
2
            (false, false) => Some(false),
228
        };
229

            
230
15
        ConnStatus { online, tls_works }
231
15
    }
232

            
233
    /// Note that an attempt to connect has been started.
234
12
    fn record_attempt(&mut self) {
235
12
        self.n_attempts += 1;
236
12
    }
237

            
238
    /// Note that we've successfully done a TCP handshake with an alleged relay.
239
3
    fn record_tcp_success(&mut self, now: Instant) {
240
3
        self.last_tcp_success = Some(now);
241
3
    }
242

            
243
    /// Note that we've completed a TLS handshake with an alleged relay.
244
    ///
245
    /// (Its identity won't be verified till the next step.)
246
2
    fn record_tls_finished(&mut self, now: Instant) {
247
2
        self.last_tls_success = Some(now);
248
2
    }
249

            
250
    /// Note that we've completed a Tor handshake with a relay.
251
    ///
252
    /// (This includes performing the TLS handshake, and verifying that the
253
    /// relay was indeed the one that we wanted to reach.)
254
3
    fn record_handshake_done(&mut self, now: Instant) {
255
3
        self.last_chan_success = Some(now);
256
3
    }
257
}
258

            
259
/// Object that manages information about a `ChanMgr`'s status, and sends
260
/// information about connectivity changes over an asynchronous channel
261
pub(crate) struct ChanMgrEventSender {
262
    /// The last ConnStatus that we sent over the channel.
263
    last_conn_status: ConnStatus,
264
    /// The unsummarized status information from the ChanMgr.
265
    mgr_status: ChanMgrStatus,
266
    /// The channel that we use for sending ConnStatus information.
267
    sender: watch::Sender<ConnStatus>,
268
}
269

            
270
impl ChanMgrEventSender {
271
    /// If the status has changed as of `now`, tell any listeners.
272
    ///
273
    /// (This takes a time because we need to know how much time has elapsed
274
    /// without successful attempts.)
275
    ///
276
    /// # Limitations
277
    ///
278
    /// We are dependent on calls to `record_attempt()` and similar methods to
279
    /// actually invoke this function; if they were never called, we'd never
280
    /// notice that we had gone too long without building connections.  That's
281
    /// okay for now, though, since any Tor client will immediately start
282
    /// building circuits, which will launch connection attempts until one
283
    /// succeeds or the client gives up entirely.  
284
8
    fn push_at(&mut self, now: Instant) {
285
8
        let status = self.mgr_status.conn_status_at(now);
286
8
        if !status.eq(&self.last_conn_status) {
287
4
            self.last_conn_status = status.clone();
288
4
            let mut b = self.sender.borrow_mut();
289
4
            *b = status;
290
4
        }
291
8
    }
292

            
293
    /// Note that an attempt to connect has been started.
294
2
    pub(crate) fn record_attempt(&mut self) {
295
2
        self.mgr_status.record_attempt();
296
2
        self.push_at(Instant::now());
297
2
    }
298

            
299
    /// Note that we've successfully done a TCP handshake with an alleged relay.
300
2
    pub(crate) fn record_tcp_success(&mut self) {
301
2
        let now = Instant::now();
302
2
        self.mgr_status.record_tcp_success(now);
303
2
        self.push_at(now);
304
2
    }
305

            
306
    /// Note that we've completed a TLS handshake with an alleged relay.
307
    ///
308
    /// (Its identity won't be verified till the next step.)
309
2
    pub(crate) fn record_tls_finished(&mut self) {
310
2
        let now = Instant::now();
311
2
        self.mgr_status.record_tls_finished(now);
312
2
        self.push_at(now);
313
2
    }
314

            
315
    /// Note that we've completed a Tor handshake with a relay.
316
    ///
317
    /// (This includes performing the TLS handshake, and verifying that the
318
    /// relay was indeed the one that we wanted to reach.)
319
2
    pub(crate) fn record_handshake_done(&mut self) {
320
2
        let now = Instant::now();
321
2
        self.mgr_status.record_handshake_done(now);
322
2
        self.push_at(now);
323
2
    }
324
}
325

            
326
/// Create a new channel for sending connectivity status events to other crates.
327
242
pub(crate) fn channel() -> (ChanMgrEventSender, ConnStatusEvents) {
328
242
    let (sender, receiver) = watch::channel();
329
242
    let receiver = ConnStatusEvents { inner: receiver };
330
242
    let sender = ChanMgrEventSender {
331
242
        last_conn_status: ConnStatus::default(),
332
242
        mgr_status: ChanMgrStatus::new_at(Instant::now()),
333
242
        sender,
334
242
    };
335
242
    (sender, receiver)
336
242
}
337

            
338
#[cfg(test)]
339
#[allow(clippy::unwrap_used, clippy::cognitive_complexity)]
340
mod test {
341
    use super::*;
342
    use float_eq::assert_float_eq;
343

            
344
    /// Tolerance for float comparison.
345
    const TOL: f32 = 0.00001;
346

            
347
    #[test]
348
    fn status_basics() {
349
        let s1 = ConnStatus::default();
350
        assert_eq!(s1.to_string(), "connecting to the internet");
351
        assert_float_eq!(s1.frac(), 0.0, abs <= TOL);
352
        assert!(s1.eq(&s1));
353
        assert!(s1.blockage().is_none());
354

            
355
        let s2 = ConnStatus {
356
            online: Some(false),
357
            tls_works: None,
358
        };
359
        assert_eq!(s2.to_string(), "unable to connect to the internet");
360
        assert_float_eq!(s2.frac(), 0.0, abs <= TOL);
361
        assert!(s2.eq(&s2));
362
        assert!(!s2.eq(&s1));
363
        assert_eq!(s2.blockage(), Some(ConnBlockage::NoTcp));
364
        assert_eq!(
365
            s2.blockage().unwrap().to_string(),
366
            "unable to connect to the internet"
367
        );
368

            
369
        let s3 = ConnStatus {
370
            online: Some(true),
371
            tls_works: None,
372
        };
373
        assert_eq!(s3.to_string(), "handshaking with Tor relays");
374
        assert_float_eq!(s3.frac(), 0.5, abs <= TOL);
375
        assert_eq!(s3.blockage(), None);
376
        assert!(!s3.eq(&s1));
377

            
378
        let s4 = ConnStatus {
379
            online: Some(true),
380
            tls_works: Some(false),
381
        };
382
        assert_eq!(s4.to_string(), "unable to handshake with Tor relays");
383
        assert_float_eq!(s4.frac(), 0.5, abs <= TOL);
384
        assert_eq!(s4.blockage(), Some(ConnBlockage::NoHandshake));
385
        assert_eq!(
386
            s4.blockage().unwrap().to_string(),
387
            "our internet connection seems to be filtered"
388
        );
389
        assert!(!s4.eq(&s1));
390
        assert!(!s4.eq(&s2));
391
        assert!(!s4.eq(&s3));
392
        assert!(s4.eq(&s4));
393

            
394
        let s5 = ConnStatus {
395
            online: Some(true),
396
            tls_works: Some(true),
397
        };
398
        assert_eq!(s5.to_string(), "connecting successfully");
399
        assert_float_eq!(s5.frac(), 1.0, abs <= TOL);
400
        assert!(s5.blockage().is_none());
401
        assert!(s5.eq(&s5));
402
        assert!(!s5.eq(&s4));
403
    }
404

            
405
    #[test]
406
    fn derive_status() {
407
        let start = Instant::now();
408
        let sec = Duration::from_secs(1);
409
        let hour = Duration::from_secs(3600);
410

            
411
        let mut ms = ChanMgrStatus::new_at(start);
412

            
413
        // when we start, we're unable to reach any conclusions.
414
        let s0 = ms.conn_status_at(start);
415
        assert!(s0.online.is_none());
416
        assert!(s0.tls_works.is_none());
417

            
418
        // Time won't let us make conclusions either, unless there have been
419
        // attempts.
420
        let s = ms.conn_status_at(start + hour);
421
        assert!(s.eq(&s0));
422

            
423
        // But if there have been attempts, _and_ time has passed, we notice
424
        // failure.
425
        for _ in 0..10 {
426
            ms.record_attempt();
427
        }
428
        // (Not immediately...)
429
        let s = ms.conn_status_at(start);
430
        assert!(s.eq(&s0));
431
        // (... but after a while.)
432
        let s = ms.conn_status_at(start + hour);
433
        assert_eq!(s.online, Some(false));
434
        assert_eq!(s.tls_works, Some(false));
435

            
436
        // If TCP has succeeded, we should notice that.
437
        ms.record_tcp_success(start + sec);
438
        let s = ms.conn_status_at(start + sec * 2);
439
        assert_eq!(s.online, Some(true));
440
        assert!(s.tls_works.is_none());
441
        let s = ms.conn_status_at(start + hour);
442
        assert_eq!(s.online, Some(true));
443
        assert_eq!(s.tls_works, Some(false));
444

            
445
        // If the handshake succeeded, we can notice that too.
446
        ms.record_handshake_done(start + sec * 2);
447
        let s = ms.conn_status_at(start + sec * 3);
448
        assert_eq!(s.online, Some(true));
449
        assert_eq!(s.tls_works, Some(true));
450
    }
451

            
452
    #[test]
453
    fn sender() {
454
        let (mut snd, rcv) = channel();
455

            
456
        {
457
            let s = rcv.inner.borrow().clone();
458
            assert_float_eq!(s.frac(), 0.0, abs <= TOL);
459
        }
460

            
461
        snd.record_attempt();
462
        snd.record_tcp_success();
463
        snd.record_tls_finished();
464
        snd.record_handshake_done();
465

            
466
        {
467
            let s = rcv.inner.borrow().clone();
468
            assert_float_eq!(s.frac(), 1.0, abs <= TOL);
469
        }
470
    }
471
}