1
1
//! `tor-guardmgr`: guard node selection for Tor network clients.
2
//!
3
//! # Overview
4
//!
5
//! This crate is part of
6
//! [Arti](https://gitlab.torproject.org/tpo/core/arti/), a project to
7
//! implement [Tor](https://www.torproject.org/) in Rust.
8
//!
9
//! "Guard nodes" are mechanism that Tor clients uses to limit the
10
//! impact of hostile relays. Approximately: each client chooses a
11
//! small set of relays to use as its "guards".  Later, when the
12
//! client picks its paths through network, rather than choosing a
13
//! different first hop randomly for every path, it chooses the best
14
//! "guard" as the first hop.
15
//!
16
//! This crate provides [`GuardMgr`], an object that manages a set of
17
//! guard nodes, and helps the `tor-circmgr` crate know when to use
18
//! them.
19
//!
20
//! Guard nodes are persistent across multiple process invocations.
21
//!
22
//! More Arti users won't need to use this crate directly.
23
//!
24
//! # Motivation
25
//!
26
//! What's the point?  By restricting their first hops to a small set,
27
//! clients increase their odds against traffic-correlation attacks.
28
//! Since we assume that an adversary who controls both ends of a
29
//! circuit can correlate its traffic, choosing many circuits with
30
//! random entry points will eventually cause a client to eventually
31
//! pick an attacker-controlled circuit, with probability approaching
32
//! 1 over time.  If entry nodes are restricted to a small set,
33
//! however, then the client has a chance of never picking an
34
//! attacker-controlled circuit.
35
//!
36
//! (The actual argument is a little more complicated here, and it
37
//! relies on the assumption that, since the attacker knows
38
//! statistics, exposing _any_ of your traffic is nearly as bad as
39
//! exposing _all_ of your traffic.)
40
//!
41
//! # Complications
42
//!
43
//! The real algorithm for selecting and using guards can get more
44
//! complicated because of a variety of factors.
45
//!
46
//! - In reality, we can't just "pick a few guards at random" and use
47
//!   them forever: relays can appear and disappear, relays can go
48
//!   offline and come back online, and so on.  What's more, keeping
49
//!   guards for too long can make targeted attacks against those
50
//!   guards more attractive.
51
//!
52
//! - Further, we may have particular restrictions on where we can
53
//!   connect. (For example, we might be restricted to ports 80 and
54
//!   443, but only when we're on a commuter train's wifi network.)
55
//!
56
//! - We need to resist attacks from local networks that block all but a
57
//!   small set of guard relays, to force us to choose those.
58
//!
59
//! - We need to give good, reliable performance while using the
60
//!   guards that we prefer.
61
//!
62
//! These needs complicate our API somewhat.  Instead of simply asking
63
//! the `GuardMgr` for a guard, the circuit-management code needs to
64
//! be able to tell the `GuardMgr` that a given guard has failed (or
65
//! succeeded), and that it needs a different guard in the future (or
66
//! not).
67
//!
68
//! Further, the `GuardMgr` code needs to be able to hand out
69
//! _provisional guards_, in effect saying "You can try building a
70
//! circuit with this guard, but please don't actually _use_ that
71
//! circuit unless I tell you it's safe."
72
//!
73
//! For details on the exact algorithm, see `guard-spec.txt` (link
74
//! below) and comments and internal documentation in this crate.
75
//!
76
//! # Limitations
77
//!
78
//! * Only one guard selection is currently supported: we don't allow a
79
//!   "filtered" or a "bridges" selection.
80
//!
81
//! * Our circuit blocking algorithm is simplified from the one that Tor uses.
82
//!   See comments in `GuardSet::circ_usability_status` for more information.
83
//!   See also [proposal 337](https://gitlab.torproject.org/tpo/core/torspec/-/blob/main/proposals/337-simpler-guard-usability.md).
84
//!
85
//! # References
86
//!
87
//! Guard nodes were first proposes (as "helper nodes") in "Defending
88
//! Anonymous Communications Against Passive Logging Attacks" by
89
//! Matthew Wright, Micah Adler, Brian N. Levine, and Clay Shields in
90
//! the Proceedings of the 2003 IEEE Symposium on Security and
91
//! Privacy.  (See <https://www.freehaven.net/anonbib/#wright03>)
92
//!
93
//! Tor's current guard selection algorithm is described in Tor's
94
//! [`guard-spec.txt`](https://gitlab.torproject.org/tpo/core/torspec/-/raw/main/guard-spec.txt)
95
//! document.
96

            
97
#![deny(missing_docs)]
98
#![warn(noop_method_call)]
99
#![deny(unreachable_pub)]
100
#![warn(clippy::all)]
101
#![deny(clippy::await_holding_lock)]
102
#![deny(clippy::cargo_common_metadata)]
103
#![deny(clippy::cast_lossless)]
104
#![deny(clippy::checked_conversions)]
105
#![warn(clippy::cognitive_complexity)]
106
#![deny(clippy::debug_assert_with_mut_call)]
107
#![deny(clippy::exhaustive_enums)]
108
#![deny(clippy::exhaustive_structs)]
109
#![deny(clippy::expl_impl_clone_on_copy)]
110
#![deny(clippy::fallible_impl_from)]
111
#![deny(clippy::implicit_clone)]
112
#![deny(clippy::large_stack_arrays)]
113
#![warn(clippy::manual_ok_or)]
114
#![deny(clippy::missing_docs_in_private_items)]
115
#![deny(clippy::missing_panics_doc)]
116
#![warn(clippy::needless_borrow)]
117
#![warn(clippy::needless_pass_by_value)]
118
#![warn(clippy::option_option)]
119
#![warn(clippy::rc_buffer)]
120
#![deny(clippy::ref_option_ref)]
121
#![warn(clippy::semicolon_if_nothing_returned)]
122
#![warn(clippy::trait_duplication_in_bounds)]
123
#![deny(clippy::unnecessary_wraps)]
124
#![warn(clippy::unseparated_literal_suffix)]
125
#![deny(clippy::unwrap_used)]
126

            
127
// Glossary:
128
//     Primary guard
129
//     Sample
130
//     confirmed
131
//     filtered
132

            
133
use educe::Educe;
134
use futures::channel::mpsc;
135
use futures::task::{SpawnError, SpawnExt};
136
use serde::{Deserialize, Serialize};
137
use std::collections::{HashMap, HashSet};
138
use std::convert::{TryFrom, TryInto};
139
use std::net::SocketAddr;
140
use std::sync::{Arc, Mutex};
141
use std::time::{Duration, Instant, SystemTime};
142
use tracing::{debug, info, trace, warn};
143

            
144
use tor_error::{ErrorKind, HasKind};
145
use tor_llcrypto::pk;
146
use tor_netdir::{params::NetParameters, NetDir, Relay};
147
use tor_persist::{DynStorageHandle, StateMgr};
148
use tor_rtcompat::Runtime;
149

            
150
mod daemon;
151
mod filter;
152
mod guard;
153
mod pending;
154
mod sample;
155
mod util;
156

            
157
pub use filter::GuardFilter;
158
pub use pending::{GuardMonitor, GuardStatus, GuardUsable};
159
pub use sample::PickGuardError;
160

            
161
use pending::{PendingRequest, RequestId};
162
use sample::GuardSet;
163

            
164
/// A "guard manager" that selects and remembers a persistent set of
165
/// guard nodes.
166
///
167
#[derive(Clone)]
168
pub struct GuardMgr<R: Runtime> {
169
    /// An asynchronous runtime object.
170
    ///
171
    /// GuardMgr uses this runtime for timing, timeouts, and spawning
172
    /// tasks.
173
    runtime: R,
174

            
175
    /// Internal state for the guard manager.
176
    inner: Arc<Mutex<GuardMgrInner>>,
177
}
178

            
179
/// Helper type that holds the data used by a [`GuardMgr`].
180
///
181
/// This would just be a [`GuardMgr`], except that it needs to sit inside
182
/// a `Mutex` and get accessed by daemon tasks.
183
struct GuardMgrInner {
184
    /// Last time when marked all of our primary guards as retriable.
185
    ///
186
    /// We keep track of this time so that we can rate-limit
187
    /// these attempts.
188
    last_primary_retry_time: Instant,
189

            
190
    /// Persistent guard manager state.
191
    ///
192
    /// This object remembers one or more persistent set of guards that we can
193
    /// use, along with their relative priorities and statuses.
194
    guards: GuardSets,
195

            
196
    /// Configuration values derived from the consensus parameters.
197
    ///
198
    /// This is updated whenever the consensus parameters change.
199
    params: GuardParams,
200

            
201
    /// A mpsc channel, used to tell the task running in
202
    /// [`daemon::report_status_events`] about a new event to monitor.
203
    ///
204
    /// This uses an `UnboundedSender` so that we don't have to await
205
    /// while sending the message, which in turn allows the GuardMgr
206
    /// API to be simpler.  The risk, however, is that there's no
207
    /// backpressure in the event that the task running
208
    /// [`daemon::report_status_events`] fails to read from this
209
    /// channel.
210
    ctrl: mpsc::UnboundedSender<daemon::Msg>,
211

            
212
    /// Information about guards that we've given out, but where we have
213
    /// not yet heard whether the guard was successful.
214
    ///
215
    /// Upon leaning whether the guard was successful, the pending
216
    /// requests in this map may be either moved to `waiting`, or
217
    /// discarded.
218
    ///
219
    /// There can be multiple pending requests corresponding to the
220
    /// same guard.
221
    pending: HashMap<RequestId, PendingRequest>,
222

            
223
    /// A list of pending requests for which we have heard that the
224
    /// guard was successful, but we have not yet decided whether the
225
    /// circuit may be used.
226
    ///
227
    /// There can be multiple waiting requests corresponding to the
228
    /// same guard.
229
    waiting: Vec<PendingRequest>,
230

            
231
    /// Location in which to store persistent state.
232
    storage: DynStorageHandle<GuardSets>,
233
}
234

            
235
/// Persistent state for a guard manager, as serialized to disk.
236
24
#[derive(Debug, Default, Clone, Serialize, Deserialize)]
237
struct GuardSets {
238
    /// The default set of guards to use.
239
    ///
240
    /// Right now, this is the _only_ `GuardSet` for each `GuardMgr`, but we
241
    /// expect that to change: our algorithm specifies that there can
242
    /// be multiple named guard sets, and we can swap between them
243
    /// depending on the user's selected [`GuardFilter`].
244
    default: GuardSet,
245

            
246
    /// Unrecognized fields, including (possibly) other guard sets.
247
    #[serde(flatten)]
248
    remaining: HashMap<String, tor_persist::JsonValue>,
249
}
250

            
251
/// The key (filename) we use for storing our persistent guard state in the
252
/// `StateMgr`.
253
///
254
/// We used to store this in a different format in a filename called
255
/// "default_guards" (before Arti 0.1.0).
256
const STORAGE_KEY: &str = "guards";
257

            
258
impl<R: Runtime> GuardMgr<R> {
259
    /// Create a new "empty" guard manager and launch its background tasks.
260
    ///
261
    /// It won't be able to hand out any guards until
262
    /// [`GuardMgr::update_network`] has been called.
263
26
    pub fn new<S>(runtime: R, state_mgr: S) -> Result<Self, GuardMgrError>
264
26
    where
265
26
        S: StateMgr + Send + Sync + 'static,
266
26
    {
267
26
        let (ctrl, rcv) = mpsc::unbounded();
268
26
        let storage: DynStorageHandle<GuardSets> = state_mgr.create_handle(STORAGE_KEY);
269
        // TODO(nickm): We should do something about the old state in
270
        // `default_guards`.  Probably it would be best to delete it.  We could
271
        // try to migrate it instead, but that's beyond the stability guarantee
272
        // that we're getting at this stage of our (pre-0.1) development.
273
26
        let state = storage.load()?.unwrap_or_default();
274
26
        let inner = Arc::new(Mutex::new(GuardMgrInner {
275
26
            guards: state,
276
26
            last_primary_retry_time: runtime.now(),
277
26
            params: GuardParams::default(),
278
26
            ctrl,
279
26
            pending: HashMap::new(),
280
26
            waiting: Vec::new(),
281
26
            storage,
282
26
        }));
283
26
        {
284
26
            let weak_inner = Arc::downgrade(&inner);
285
26
            let rt_clone = runtime.clone();
286
26
            runtime
287
26
                .spawn(daemon::report_status_events(rt_clone, weak_inner, rcv))
288
26
                .map_err(|e| GuardMgrError::from_spawn("guard status event reporter", e))?;
289
        }
290
        {
291
26
            let rt_clone = runtime.clone();
292
26
            let weak_inner = Arc::downgrade(&inner);
293
26
            runtime
294
26
                .spawn(daemon::run_periodic(rt_clone, weak_inner))
295
26
                .map_err(|e| GuardMgrError::from_spawn("periodic guard updater", e))?;
296
        }
297
26
        Ok(GuardMgr { runtime, inner })
298
26
    }
299

            
300
    /// Flush our current guard state to the state manager, if there
301
    /// is any unsaved state.
302
4
    pub fn store_persistent_state(&self) -> Result<(), GuardMgrError> {
303
4
        let inner = self.inner.lock().expect("Poisoned lock");
304
4
        trace!("Flushing guard state to disk.");
305
4
        inner.storage.store(&inner.guards)?;
306
4
        Ok(())
307
4
    }
308

            
309
    /// Reload state from the state manager.
310
    ///
311
    /// We only call this method if we _don't_ have the lock on the state
312
    /// files.  If we have the lock, we only want to save.
313
    pub fn reload_persistent_state(&self) -> Result<(), GuardMgrError> {
314
        let mut inner = self.inner.lock().expect("Poisoned lock");
315
        if let Some(new_guards) = inner.storage.load()? {
316
            let now = self.runtime.wallclock();
317
            inner.replace_guards_with(new_guards, now);
318
        }
319
        Ok(())
320
    }
321

            
322
    /// Switch from having an unowned persistent state to having an owned one.
323
    ///
324
    /// Requires that we hold the lock on the state files.
325
2
    pub fn upgrade_to_owned_persistent_state(&self) -> Result<(), GuardMgrError> {
326
2
        let mut inner = self.inner.lock().expect("Poisoned lock");
327
2
        debug_assert!(inner.storage.can_store());
328
2
        let new_guards = inner.storage.load()?.unwrap_or_default();
329
2
        let now = self.runtime.wallclock();
330
2
        inner.replace_guards_with(new_guards, now);
331
2
        Ok(())
332
2
    }
333

            
334
    /// Return true if `netdir` has enough information to safely become our new netdir.
335
    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
336
        let mut inner = self.inner.lock().expect("Poisoned lock");
337
        inner
338
            .guards
339
            .active_guards_mut()
340
            .missing_primary_microdescriptors(netdir)
341
            == 0
342
    }
343

            
344
    /// Update the state of this [`GuardMgr`] based on a new or modified
345
    /// [`NetDir`] object.
346
    ///
347
    /// This method can add new guards, or notice that existing guards
348
    /// have become unusable.  It needs a `NetDir` so it can identify
349
    /// potential candidate guards.
350
    ///
351
    /// Call this method whenever the `NetDir` changes.
352
272
    pub fn update_network(&self, netdir: &NetDir) {
353
272
        trace!("Updating guard state from network directory");
354
272
        let now = self.runtime.wallclock();
355
272

            
356
272
        let mut inner = self.inner.lock().expect("Poisoned lock");
357
272
        inner.update(now, Some(netdir));
358
272
    }
359

            
360
    /// Replace the current [`GuardFilter`] used by this `GuardMgr`.
361
    ///
362
    /// (Since there is only one kind of filter right now, there's no
363
    /// real reason to call this function, but at least it should work.
364
4
    pub fn set_filter(&self, filter: GuardFilter, netdir: &NetDir) {
365
4
        // First we have to see how much of the possible guard space
366
4
        // this new filter allows.  (We don't use this info yet, but we will
367
4
        // one we have nontrivial filters.)
368
160
        let n_guards = netdir.relays().filter(|r| r.is_flagged_guard()).count();
369
4
        let n_permitted = netdir
370
4
            .relays()
371
160
            .filter(|r| r.is_flagged_guard() && filter.permits(r))
372
4
            .count();
373
4
        let frac_permitted = if n_guards > 0 {
374
4
            n_permitted as f64 / (n_guards as f64)
375
        } else {
376
            1.0
377
        };
378

            
379
4
        let now = self.runtime.wallclock();
380
4
        let mut inner = self.inner.lock().expect("Poisoned lock");
381
4

            
382
4
        let restrictive_filter = frac_permitted < inner.params.filter_threshold;
383
4

            
384
4
        // TODO: Once we support nontrivial filters, we might have to
385
4
        // swap out "active_guards" depending on which set it is.
386
4

            
387
4
        if frac_permitted < inner.params.extreme_threshold {
388
            warn!(
389
                "The number of guards permitted is smaller than the guard param minimum of {}%.",
390
                inner.params.extreme_threshold * 100.0,
391
            );
392
4
        }
393

            
394
4
        info!(
395
            ?filter,
396
            restrictive = restrictive_filter,
397
            "Guard filter replaced."
398
        );
399

            
400
4
        inner
401
4
            .guards
402
4
            .active_guards_mut()
403
4
            .set_filter(filter, restrictive_filter);
404
4
        inner.update(now, Some(netdir));
405
4
    }
406

            
407
    /// Select a guard for a given [`GuardUsage`].
408
    ///
409
    /// On success, we return a [`GuardId`] object to identify which
410
    /// guard we have picked, a [`GuardMonitor`] object that the
411
    /// caller can use to report whether its attempt to use the guard
412
    /// succeeded or failed, and a [`GuardUsable`] future that the
413
    /// caller can use to decide whether a circuit built through the
414
    /// guard is actually safe to use.
415
    ///
416
    /// That last point is important: It's okay to build a circuit
417
    /// through the guard returned by this function, but you can't
418
    /// actually use it for traffic unless the [`GuardUsable`] future
419
    /// yields "true".
420
    ///
421
    /// # Limitations
422
    ///
423
    /// This function will never return a guard that isn't listed in
424
    /// the [`NetDir`] most recently passed to [`GuardMgr::update_network`].
425
    /// That's _usually_ what you'd want, but when we're trying to
426
    /// bootstrap we might want to use _all_ guards as possible
427
    /// directory caches.  That's not implemented yet. (See ticket
428
    /// [#220](https://gitlab.torproject.org/tpo/core/arti/-/issues/220)).
429
    ///
430
    /// This function only looks at netdir when all of the known
431
    /// guards are down; to force an update, use [`GuardMgr::update_network`].
432
276
    pub fn select_guard(
433
276
        &self,
434
276
        usage: GuardUsage,
435
276
        netdir: Option<&NetDir>,
436
276
    ) -> Result<(Guard, GuardMonitor, GuardUsable), PickGuardError> {
437
276
        let now = self.runtime.now();
438
276
        let wallclock = self.runtime.wallclock();
439
276

            
440
276
        let mut inner = self.inner.lock().expect("Poisoned lock");
441
276

            
442
276
        // (I am not 100% sure that we need to consider_all_retries here, but
443
276
        // it should _probably_ not hurt.)
444
276
        inner.guards.active_guards_mut().consider_all_retries(now);
445

            
446
276
        let (origin, guard_id) = inner.select_guard_with_retries(&usage, netdir, wallclock)?;
447
276
        let guard = inner
448
276
            .guards
449
276
            .active_guards()
450
276
            .get(&guard_id)
451
276
            .expect("Selected guard that wasn't in our sample!?")
452
276
            .get_external_rep();
453
276

            
454
276
        trace!(?guard_id, ?usage, "Guard selected");
455

            
456
276
        let (usable, usable_sender) = if origin.is_primary() {
457
268
            (GuardUsable::new_primary(), None)
458
        } else {
459
8
            let (u, snd) = GuardUsable::new_uncertain();
460
8
            (u, Some(snd))
461
        };
462
276
        let request_id = pending::RequestId::next();
463
276
        let ctrl = inner.ctrl.clone();
464
276
        let monitor = GuardMonitor::new(request_id, ctrl);
465

            
466
        // Note that the network can be down even if all the primary guards
467
        // are not yet marked as unreachable.  But according to guard-spec we
468
        // don't want to acknowledge the net as down before that point, since
469
        // we don't mark all the primary guards as retriable unless
470
        // we've been forced to non-primary guards.
471
276
        let net_has_been_down =
472
276
            if let Some(duration) = tor_proto::time_since_last_incoming_traffic() {
473
                inner
474
                    .guards
475
                    .active_guards_mut()
476
                    .all_primary_guards_are_unreachable()
477
                    && duration >= inner.params.internet_down_timeout
478
            } else {
479
                // TODO: Is this the correct behavior in this case?
480
276
                false
481
            };
482

            
483
276
        let pending_request =
484
276
            pending::PendingRequest::new(guard_id.clone(), usage, usable_sender, net_has_been_down);
485
276
        inner.pending.insert(request_id, pending_request);
486
276

            
487
276
        inner
488
276
            .guards
489
276
            .active_guards_mut()
490
276
            .record_attempt(&guard_id, now);
491
276

            
492
276
        Ok((guard, monitor, usable))
493
276
    }
494

            
495
    /// Ensure that the message queue is flushed before proceeding to
496
    /// the next step.  Used for testing.
497
    #[cfg(test)]
498
24
    async fn flush_msg_queue(&self) {
499
24
        let (snd, rcv) = futures::channel::oneshot::channel();
500
24
        let pingmsg = daemon::Msg::Ping(snd);
501
24
        {
502
24
            let inner = self.inner.lock().expect("Poisoned lock");
503
24
            inner
504
24
                .ctrl
505
24
                .unbounded_send(pingmsg)
506
24
                .expect("Guard observer task exited prematurely.");
507
24
        }
508
24
        let _ = rcv.await;
509
24
    }
510
}
511

            
512
impl GuardSets {
513
    /// Return a reference to the currently active set of guards.
514
    ///
515
    /// (That's easy enough for now, since there is never more than one set of
516
    /// guards.  But eventually that will change, as we add support for more
517
    /// complex filter types, and for bridge relays. Those will use separate
518
    /// `GuardSet` instances, and this accessor will choose the right one.)
519
11104
    fn active_guards(&self) -> &GuardSet {
520
11104
        &self.default
521
11104
    }
522

            
523
    /// Return a mutable reference to the currently active set of guards.
524
34578
    fn active_guards_mut(&mut self) -> &mut GuardSet {
525
34578
        &mut self.default
526
34578
    }
527

            
528
    /// Update all non-persistent state for the guards in this object with the
529
    /// state in `other`.
530
30
    fn copy_status_from(&mut self, other: &GuardSets) {
531
30
        self.default.copy_status_from(&other.default);
532
30
    }
533
}
534

            
535
impl GuardMgrInner {
536
    /// Update the status of all guards in the active set, based on
537
    /// the passage of time and (optionally) a network directory.
538
    ///
539
    /// We can expire guards based on the time alone; we can only
540
    /// add guards or change their status with a NetDir.
541
4056
    fn update(&mut self, now: SystemTime, netdir: Option<&NetDir>) {
542
        // Set the parameters.
543
4056
        if let Some(netdir) = netdir {
544
3860
            match GuardParams::try_from(netdir.params()) {
545
3860
                Ok(params) => self.params = params,
546
                Err(e) => warn!("Unusable guard parameters from consensus: {}", e),
547
            }
548
196
        }
549

            
550
        // Then expire guards.  Do that early, in case we need more.
551
4056
        self.guards
552
4056
            .active_guards_mut()
553
4056
            .expire_old_guards(&self.params, now);
554

            
555
4056
        if let Some(netdir) = netdir {
556
3860
            if self
557
3860
                .guards
558
3860
                .active_guards_mut()
559
3860
                .missing_primary_microdescriptors(netdir)
560
3860
                > 0
561
            {
562
                // We are missing primary guard descriptors, so we shouldn't update our guard
563
                // status.
564
                return;
565
3860
            }
566
3860
            self.guards
567
3860
                .active_guards_mut()
568
3860
                .update_status_from_netdir(netdir);
569
3992
            loop {
570
3992
                let added_any = self.guards.active_guards_mut().extend_sample_as_needed(
571
3992
                    now,
572
3992
                    &self.params,
573
3992
                    netdir,
574
3992
                );
575
3992
                if !added_any {
576
3860
                    break;
577
132
                }
578
            }
579
196
        }
580

            
581
4056
        self.guards
582
4056
            .active_guards_mut()
583
4056
            .select_primary_guards(&self.params);
584
4056
    }
585

            
586
    /// Replace the active guard state with `new_state`, preserving
587
    /// non-persistent state for any guards that are retained.
588
30
    fn replace_guards_with(&mut self, mut new_guards: GuardSets, now: SystemTime) {
589
30
        new_guards.copy_status_from(&self.guards);
590
30
        self.guards = new_guards;
591
30
        self.update(now, None);
592
30
    }
593

            
594
    /// Mark all of our primary guards as retriable, if we haven't done
595
    /// so since long enough before `now`.
596
    ///
597
    /// We want to call this function whenever a guard attempt succeeds,
598
    /// if the internet seemed to be down when the guard attempt was
599
    /// first launched.
600
    fn maybe_retry_primary_guards(&mut self, now: Instant) {
601
        // We don't actually want to mark our primary guards as
602
        // retriable more than once per internet_down_timeout: after
603
        // the first time, we would just be noticing the same "coming
604
        // back online" event more than once.
605
        let interval = self.params.internet_down_timeout;
606
        if self.last_primary_retry_time + interval <= now {
607
            debug!("Successfully reached a guard after a while off the internet; marking all primary guards retriable.");
608
            self.guards
609
                .active_guards_mut()
610
                .mark_primary_guards_retriable();
611
            self.last_primary_retry_time = now;
612
        }
613
    }
614

            
615
    /// Called when the circuit manager reports (via [`GuardMonitor`]) that
616
    /// a guard succeeded or failed.
617
    ///
618
    /// Changes the guard's status as appropriate, and updates the pending
619
    /// request as needed.
620
267
    pub(crate) fn handle_msg(
621
267
        &mut self,
622
267
        request_id: RequestId,
623
267
        status: GuardStatus,
624
267
        runtime: &impl tor_rtcompat::SleepProvider,
625
267
    ) {
626
267
        if let Some(mut pending) = self.pending.remove(&request_id) {
627
            // If there was a pending request matching this RequestId, great!
628
267
            let guard_id = pending.guard_id();
629
267
            trace!(?guard_id, ?status, "Received report of guard status");
630
267
            match status {
631
                GuardStatus::Success => {
632
                    // If we had gone too long without any net activity when we
633
                    // gave out this guard, and now we're seeing a circuit
634
                    // succeed, tell the primary guards that they might be
635
                    // retriable.
636
248
                    if pending.net_has_been_down() {
637
                        self.maybe_retry_primary_guards(runtime.now());
638
248
                    }
639

            
640
                    // The guard succeeded.  Tell the GuardSet.
641
248
                    self.guards.active_guards_mut().record_success(
642
248
                        guard_id,
643
248
                        &self.params,
644
248
                        runtime.wallclock(),
645
248
                    );
646
                    // Either tell the request whether the guard is
647
                    // usable, or schedule it as a "waiting" request.
648
248
                    if let Some(usable) = self.guard_usability_status(&pending, runtime.now()) {
649
248
                        trace!(?guard_id, usable, "Known usability status");
650
248
                        pending.reply(usable);
651
                    } else {
652
                        // This is the one case where we can't use the
653
                        // guard yet.
654
                        trace!(?guard_id, "Not able to answer right now");
655
                        pending.mark_waiting(runtime.now());
656
                        self.waiting.push(pending);
657
                    }
658
                }
659
12
                GuardStatus::Failure => {
660
12
                    self.guards
661
12
                        .active_guards_mut()
662
12
                        .record_failure(guard_id, runtime.now());
663
12
                    pending.reply(false);
664
12
                }
665
7
                GuardStatus::AttemptAbandoned => {
666
7
                    self.guards
667
7
                        .active_guards_mut()
668
7
                        .record_attempt_abandoned(guard_id);
669
7
                    pending.reply(false);
670
7
                }
671
                GuardStatus::Indeterminate => {
672
                    self.guards
673
                        .active_guards_mut()
674
                        .record_indeterminate_result(guard_id);
675
                    pending.reply(false);
676
                }
677
            };
678
        } else {
679
            warn!(
680
                "Got a status {:?} for a request {:?} that wasn't pending",
681
                status, request_id
682
            );
683
        }
684

            
685
        // We might need to update the primary guards based on changes in the
686
        // status of guards above.
687
267
        self.guards
688
267
            .active_guards_mut()
689
267
            .select_primary_guards(&self.params);
690
267

            
691
267
        // Some waiting request may just have become ready (usable or
692
267
        // not); we need to give them the information they're waiting
693
267
        // for.
694
267
        self.expire_and_answer_pending_requests(runtime.now());
695
267
    }
696

            
697
    /// If the circuit built because of a given [`PendingRequest`] may
698
    /// now be used (or discarded), return `Some(true)` or
699
    /// `Some(false)` respectively.
700
    ///
701
    /// Return None if we can't yet give an answer about whether such
702
    /// a circuit is usable.
703
3608
    fn guard_usability_status(&self, pending: &PendingRequest, now: Instant) -> Option<bool> {
704
3608
        self.guards.active_guards().circ_usability_status(
705
3608
            pending.guard_id(),
706
3608
            pending.usage(),
707
3608
            &self.params,
708
3608
            now,
709
3608
        )
710
3608
    }
711

            
712
    /// For requests that have been "waiting" for an answer for too long,
713
    /// expire them and tell the circuit manager that their circuits
714
    /// are unusable.
715
3778
    fn expire_and_answer_pending_requests(&mut self, now: Instant) {
716
3778
        // TODO: Use Vec::drain_filter or Vec::retain_mut when/if it's stable.
717
3778
        use retain_mut::RetainMut;
718
3778

            
719
3778
        // A bit ugly: we use a separate Vec here to avoid borrowing issues,
720
3778
        // and put it back when we're done.
721
3778
        let mut waiting = Vec::new();
722
3778
        std::mem::swap(&mut waiting, &mut self.waiting);
723
3778

            
724
3778
        RetainMut::retain_mut(&mut waiting, |pending| {
725
            let expired = pending
726
                .waiting_since()
727
                .and_then(|w| now.checked_duration_since(w))
728
                .map(|d| d >= self.params.np_idle_timeout)
729
                == Some(true);
730
            if expired {
731
                trace!(?pending, "Pending request expired");
732
                pending.reply(false);
733
                return false;
734
            }
735

            
736
            // TODO-SPEC: guard_usability_status isn't what the spec says.  It
737
            // says instead that we should look at _circuit_ status, saying:
738
            //  "   Definition: In the algorithm above, C2 "blocks" C1 if:
739
            // * C2 obeys all the restrictions that C1 had to obey, AND
740
            // * C2 has higher priority than C1, AND
741
            // * Either C2 is <complete>, or C2 is <waiting_for_better_guard>,
742
            // or C2 has been <usable_if_no_better_guard> for no more than
743
            // {NONPRIMARY_GUARD_CONNECT_TIMEOUT} seconds."
744
            //
745
            // See comments in sample::GuardSet::circ_usability_status.
746

            
747
            if let Some(answer) = self.guard_usability_status(pending, now) {
748
                trace!(?pending, answer, "Pending request now ready");
749
                pending.reply(answer);
750
                return false;
751
            }
752
            true
753
3778
        });
754
3778

            
755
3778
        // Put the waiting list back.
756
3778
        std::mem::swap(&mut waiting, &mut self.waiting);
757
3778
    }
758

            
759
    /// Run any periodic events that update guard status, and return a
760
    /// duration after which periodic events should next be run.
761
166
    pub(crate) fn run_periodic_events(&mut self, wallclock: SystemTime, now: Instant) -> Duration {
762
166
        self.update(wallclock, None);
763
166
        self.expire_and_answer_pending_requests(now);
764
166
        Duration::from_secs(1) // TODO: Too aggressive.
765
166
    }
766

            
767
    /// Try to select a guard, expanding the sample or marking guards retriable
768
    /// if the first attempts fail.
769
    fn select_guard_with_retries(
770
        &mut self,
771
        usage: &GuardUsage,
772
        netdir: Option<&NetDir>,
773
        now: SystemTime,
774
    ) -> Result<(sample::ListKind, GuardId), PickGuardError> {
775
        // Try to find a guard.
776
3748
        if let Ok(s) = self.guards.active_guards().pick_guard(usage, &self.params) {
777
3748
            return Ok(s);
778
        }
779

            
780
        // That didn't work. If we have a netdir, expand the sample and try again.
781
        if let Some(dir) = netdir {
782
            trace!("No guards available, trying to extend the sample.");
783
            self.update(now, Some(dir));
784
            if self
785
                .guards
786
                .active_guards_mut()
787
                .extend_sample_as_needed(now, &self.params, dir)
788
            {
789
                self.guards
790
                    .active_guards_mut()
791
                    .select_primary_guards(&self.params);
792
                if let Ok(s) = self.guards.active_guards().pick_guard(usage, &self.params) {
793
                    return Ok(s);
794
                }
795
            }
796
        }
797

            
798
        // That didn't work either. Mark everybody as potentially retriable.
799
        info!("All guards seem down. Marking them retriable and trying again.");
800
        self.guards.active_guards_mut().mark_all_guards_retriable();
801
        self.guards.active_guards().pick_guard(usage, &self.params)
802
3748
    }
803
}
804

            
805
/// A set of parameters, derived from the consensus document, controlling
806
/// the behavior of a guard manager.
807
#[derive(Debug, Clone)]
808
1
#[cfg_attr(test, derive(PartialEq))]
809
struct GuardParams {
810
    /// How long should a sampled, un-confirmed guard be kept in the sample before it expires?
811
    lifetime_unconfirmed: Duration,
812
    /// How long should a confirmed guard be kept in the sample before
813
    /// it expires?
814
    lifetime_confirmed: Duration,
815
    /// How long may  a guard be unlisted before we remove it from the sample?
816
    lifetime_unlisted: Duration,
817
    /// Largest number of guards we're willing to add to the sample.
818
    max_sample_size: usize,
819
    /// Largest fraction of the network's guard bandwidth that we're
820
    /// willing to add to the sample.
821
    max_sample_bw_fraction: f64,
822
    /// Smallest number of guards that we're willing to have in the
823
    /// sample, after applying a [`GuardFilter`].
824
    min_filtered_sample_size: usize,
825
    /// How many guards are considered "Primary"?
826
    n_primary: usize,
827
    /// When making a regular circuit, how many primary guards should we
828
    /// be willing to try?
829
    data_parallelism: usize,
830
    /// When making a one-hop directory circuit, how many primary
831
    /// guards should we be willing to try?
832
    dir_parallelism: usize,
833
    /// For how long does a pending attempt to connect to a guard
834
    /// block an attempt to use a less-favored non-primary guard?
835
    np_connect_timeout: Duration,
836
    /// How long do we allow a circuit to a successful but unfavored
837
    /// non-primary guard to sit around before deciding not to use it?
838
    np_idle_timeout: Duration,
839
    /// After how much time without successful activity does a
840
    /// successful circuit indicate that we should retry our primary
841
    /// guards?
842
    internet_down_timeout: Duration,
843
    /// What fraction of the guards can be can be filtered out before we
844
    /// decide that our filter is "very restrictive"?
845
    ///
846
    /// (Not fully implemented yet.)
847
    filter_threshold: f64,
848
    /// What fraction of the guards determine that our filter is "very
849
    /// restrictive"?
850
    extreme_threshold: f64,
851
}
852

            
853
impl Default for GuardParams {
854
180
    fn default() -> Self {
855
180
        let one_day = Duration::from_secs(86400);
856
180
        GuardParams {
857
180
            lifetime_unconfirmed: one_day * 120,
858
180
            lifetime_confirmed: one_day * 60,
859
180
            lifetime_unlisted: one_day * 20,
860
180
            max_sample_size: 60,
861
180
            max_sample_bw_fraction: 0.2,
862
180
            min_filtered_sample_size: 20,
863
180
            n_primary: 3,
864
180
            data_parallelism: 1,
865
180
            dir_parallelism: 3,
866
180
            np_connect_timeout: Duration::from_secs(15),
867
180
            np_idle_timeout: Duration::from_secs(600),
868
180
            internet_down_timeout: Duration::from_secs(600),
869
180
            filter_threshold: 0.2,
870
180
            extreme_threshold: 0.01,
871
180
        }
872
180
    }
873
}
874

            
875
impl TryFrom<&NetParameters> for GuardParams {
876
    type Error = tor_units::Error;
877
3861
    fn try_from(p: &NetParameters) -> Result<GuardParams, Self::Error> {
878
3861
        Ok(GuardParams {
879
3861
            lifetime_unconfirmed: p.guard_lifetime_unconfirmed.try_into()?,
880
3861
            lifetime_confirmed: p.guard_lifetime_confirmed.try_into()?,
881
3861
            lifetime_unlisted: p.guard_remove_unlisted_after.try_into()?,
882
3861
            max_sample_size: p.guard_max_sample_size.try_into()?,
883
3861
            max_sample_bw_fraction: p.guard_max_sample_threshold.as_fraction(),
884
3861
            min_filtered_sample_size: p.guard_filtered_min_sample_size.try_into()?,
885
3861
            n_primary: p.guard_n_primary.try_into()?,
886
3861
            data_parallelism: p.guard_use_parallelism.try_into()?,
887
3861
            dir_parallelism: p.guard_dir_use_parallelism.try_into()?,
888
3861
            np_connect_timeout: p.guard_nonprimary_connect_timeout.try_into()?,
889
3861
            np_idle_timeout: p.guard_nonprimary_idle_timeout.try_into()?,
890
3861
            internet_down_timeout: p.guard_internet_likely_down.try_into()?,
891
3861
            filter_threshold: p.guard_meaningful_restriction.as_fraction(),
892
3861
            extreme_threshold: p.guard_extreme_restriction.as_fraction(),
893
        })
894
3861
    }
895
}
896

            
897
/// A unique cryptographic identifier for a selected guard.
898
///
899
/// (This is implemented internally using both of the guard's Ed25519
900
/// and RSA identities.)
901
1111384
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
902
pub struct GuardId {
903
    /// Ed25519 identity key for a a guard
904
    ed25519: pk::ed25519::Ed25519Identity,
905
    /// RSA identity fingerprint for a a guard
906
    rsa: pk::rsa::RsaIdentity,
907
}
908

            
909
impl GuardId {
910
    /// Return a new, manually constructed GuardId
911
4131
    fn new(ed25519: pk::ed25519::Ed25519Identity, rsa: pk::rsa::RsaIdentity) -> Self {
912
4131
        Self { ed25519, rsa }
913
4131
    }
914

            
915
    /// Extract a GuardId from a Relay object.
916
4114
    pub(crate) fn from_relay(relay: &tor_netdir::Relay<'_>) -> Self {
917
4114
        Self::new(*relay.id(), *relay.rsa_id())
918
4114
    }
919

            
920
    /// Return the relay in `netdir` that corresponds to this ID, if there
921
    /// is one.
922
40949
    pub fn get_relay<'a>(&self, netdir: &'a NetDir) -> Option<Relay<'a>> {
923
40949
        netdir.by_id_pair(&self.ed25519, &self.rsa)
924
40949
    }
925
}
926

            
927
/// Representation of a guard, as returned by [`GuardMgr::select_guard()`].
928
12
#[derive(Debug, Clone, Eq, PartialEq)]
929
pub struct Guard {
930
    /// The guard's identities
931
    id: GuardId,
932
    /// The addresses at which the guard can be contacted.
933
    orports: Vec<SocketAddr>,
934
}
935

            
936
impl Guard {
937
    /// Return the identities of this guard.
938
3724
    pub fn id(&self) -> &GuardId {
939
3724
        &self.id
940
3724
    }
941
    /// Look up this guard in `netdir`.
942
3720
    pub fn get_relay<'a>(&self, netdir: &'a NetDir) -> Option<Relay<'a>> {
943
3720
        self.id().get_relay(netdir)
944
3720
    }
945
}
946

            
947
// This is somewhat redundant with the implementation in crate::guard::Guard.
948
impl tor_linkspec::ChanTarget for Guard {
949
    fn addrs(&self) -> &[SocketAddr] {
950
        &self.orports[..]
951
    }
952
    fn ed_identity(&self) -> &pk::ed25519::Ed25519Identity {
953
        &self.id.ed25519
954
    }
955
    fn rsa_identity(&self) -> &pk::rsa::RsaIdentity {
956
        &self.id.rsa
957
    }
958
}
959

            
960
/// The purpose for which we plan to use a guard.
961
///
962
/// This can affect the guard selection algorithm.
963
3746
#[derive(Clone, Debug, Eq, PartialEq, Educe)]
964
#[educe(Default)]
965
#[non_exhaustive]
966
pub enum GuardUsageKind {
967
    /// We want to use this guard for a data circuit.
968
    ///
969
    /// (This encompasses everything except the `OneHopDirectory` case.)
970
    #[educe(Default)]
971
    Data,
972
    /// We want to use this guard for a one-hop, non-anonymous
973
    /// directory request.
974
    ///
975
    /// (Our algorithm allows more parallelism for the guards that we use
976
    /// for these circuits.)
977
    OneHopDirectory,
978
}
979

            
980
/// A set of parameters describing how a single guard should be selected.
981
///
982
/// Used as an argument to [`GuardMgr::select_guard`].
983
3737
#[derive(Clone, Debug, Default, derive_builder::Builder)]
984
#[builder(build_fn(error = "tor_config::ConfigBuildError"))]
985
pub struct GuardUsage {
986
    /// The purpose for which this guard will be used.
987
    #[builder(default)]
988
    kind: GuardUsageKind,
989
    /// A list of restrictions on which guard may be used.
990
    #[builder(default)]
991
    restrictions: Vec<GuardRestriction>,
992
}
993

            
994
impl GuardUsageBuilder {
995
    /// Create a new empty [`GuardUsageBuilder`].
996
7
    pub fn new() -> Self {
997
7
        Self::default()
998
7
    }
999

            
    /// Add `restriction` to the list of restrictions on this guard usage.
126
    pub fn push_restriction(&mut self, restriction: GuardRestriction) -> &mut Self {
126
        self.restrictions
126
            .get_or_insert_with(Vec::new)
126
            .push(restriction);
126
        self
126
    }
}

            
/// A restriction that applies to a single request for a guard.
///
/// Restrictions differ from filters (see [`GuardFilter`]) in that
/// they apply to single requests, not to our entire set of guards.
/// They're suitable for things like making sure that we don't start
/// and end a circuit at the same relay, or requiring a specific
/// subprotocol version for certain kinds of requests.
126
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum GuardRestriction {
    /// Don't pick a guard with the provided Ed25519 identity.
    AvoidId(pk::ed25519::Ed25519Identity),
    /// Don't pick a guard with any of the provided Ed25519 identities.
    AvoidAllIds(HashSet<pk::ed25519::Ed25519Identity>),
}

            
/// An error caused while creating or updating a guard manager.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum GuardMgrError {
    /// An error manipulating persistent state
    #[error("Problem accessing persistent state")]
    State(#[from] tor_persist::Error),

            
    /// An error that occurred while trying to spawn a daemon task.
    #[error("Unable to spawn {spawning}")]
    Spawn {
        /// What we were trying to spawn.
        spawning: &'static str,
        /// What happened when we tried to spawn it.
        #[source]
        cause: Arc<SpawnError>,
    },
}

            
impl HasKind for GuardMgrError {
    #[rustfmt::skip] // to preserve table in match
    fn kind(&self) -> ErrorKind {
        use GuardMgrError as G;
        match self {
            G::State(e)               => e.kind(),
            G::Spawn{ cause, .. }     => cause.kind(),
        }
    }
}

            
impl GuardMgrError {
    /// Construct a new `GuardMgrError` from a `SpawnError`.
    fn from_spawn(spawning: &'static str, err: SpawnError) -> GuardMgrError {
        GuardMgrError::Spawn {
            spawning,
            cause: Arc::new(err),
        }
    }
}

            
#[cfg(test)]
mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use tor_persist::TestingStateMgr;
    use tor_rtcompat::test_with_all_runtimes;

            
    #[test]
    fn guard_param_defaults() {
        let p1 = GuardParams::default();
        let p2: GuardParams = (&NetParameters::default()).try_into().unwrap();
        assert_eq!(p1, p2);
    }

            
    fn init<R: Runtime>(rt: R) -> (GuardMgr<R>, TestingStateMgr, NetDir) {
        use tor_netdir::{testnet, MdReceiver, PartialNetDir};
        let statemgr = TestingStateMgr::new();
        let have_lock = statemgr.try_lock().unwrap();
        assert!(have_lock.held());
        let guardmgr = GuardMgr::new(rt, statemgr.clone()).unwrap();
        let (con, mds) = testnet::construct_network().unwrap();
        let override_p = "guard-min-filtered-sample-size=5 guard-n-primary-guards=2"
            .parse()
            .unwrap();
        let mut netdir = PartialNetDir::new(con, Some(&override_p));
        for md in mds {
            netdir.add_microdesc(md);
        }
        let netdir = netdir.unwrap_if_sufficient().unwrap();

            
        (guardmgr, statemgr, netdir)
    }

            
    #[test]
    #[allow(clippy::clone_on_copy)]
    fn simple_case() {
        test_with_all_runtimes!(|rt| async move {
            let (guardmgr, statemgr, netdir) = init(rt.clone());
            let usage = GuardUsage::default();

            
            guardmgr.update_network(&netdir);

            
            let (id, mon, usable) = guardmgr.select_guard(usage, Some(&netdir)).unwrap();
            // Report that the circuit succeeded.
            mon.succeeded();

            
            // May we use the circuit?
            let usable = usable.await.unwrap();
            assert!(usable);

            
            // Save the state...
            guardmgr.flush_msg_queue().await;
            guardmgr.store_persistent_state().unwrap();
            drop(guardmgr);

            
            // Try reloading from the state...
            let guardmgr2 = GuardMgr::new(rt.clone(), statemgr.clone()).unwrap();
            guardmgr2.update_network(&netdir);

            
            // Since the guard was confirmed, we should get the same one this time!
            let usage = GuardUsage::default();
            let (id2, _mon, _usable) = guardmgr2.select_guard(usage, Some(&netdir)).unwrap();
            assert_eq!(id2, id);
        });
    }

            
    #[test]
    fn simple_waiting() {
        // TODO(nickm): This test fails in rare cases; I suspect a
        // race condition somewhere.
        //
        // I've doubled up on the queue flushing in order to try to make the
        // race less likely, but we should investigate.
        test_with_all_runtimes!(|rt| async move {
            let (guardmgr, _statemgr, netdir) = init(rt);
            let u = GuardUsage::default();
            guardmgr.update_network(&netdir);

            
            // We'll have the first two guard fail, which should make us
            // try a non-primary guard.
            let (id1, mon, _usable) = guardmgr.select_guard(u.clone(), Some(&netdir)).unwrap();
            mon.failed();
            guardmgr.flush_msg_queue().await; // avoid race
            guardmgr.flush_msg_queue().await; // avoid race
            let (id2, mon, _usable) = guardmgr.select_guard(u.clone(), Some(&netdir)).unwrap();
            mon.failed();
            guardmgr.flush_msg_queue().await; // avoid race
            guardmgr.flush_msg_queue().await; // avoid race

            
            assert!(id1 != id2);

            
            // Now we should get two sampled guards. They should be different.
            let (id3, mon3, usable3) = guardmgr.select_guard(u.clone(), Some(&netdir)).unwrap();
            let (id4, mon4, usable4) = guardmgr.select_guard(u.clone(), Some(&netdir)).unwrap();
            assert!(id3 != id4);

            
            let (u3, u4) = futures::join!(
                async {
                    mon3.failed();
                    guardmgr.flush_msg_queue().await; // avoid race
                    usable3.await.unwrap()
                },
                async {
                    mon4.succeeded();
                    usable4.await.unwrap()
                }
            );

            
            assert_eq!((u3, u4), (false, true));
        });
    }

            
    #[test]
    fn filtering_basics() {
        test_with_all_runtimes!(|rt| async move {
            let (guardmgr, _statemgr, netdir) = init(rt);
            let u = GuardUsage::default();
            guardmgr.update_network(&netdir);
            guardmgr.set_filter(GuardFilter::TestingLimitKeys, &netdir);

            
            let (guard, _mon, _usable) = guardmgr.select_guard(u, Some(&netdir)).unwrap();
            // Make sure that the filter worked.
            assert_eq!(guard.id().rsa.as_bytes()[0] % 4, 0);
        });
    }
}