1
1
//! `tor-circmgr`: circuits through the Tor network on demand.
2
//!
3
//! # Overview
4
//!
5
//! This crate is part of
6
//! [Arti](https://gitlab.torproject.org/tpo/core/arti/), a project to
7
//! implement [Tor](https://www.torproject.org/) in Rust.
8
//!
9
//! In Tor, a circuit is an encrypted multi-hop tunnel over multiple
10
//! relays.  This crate's purpose, long-term, is to manage a set of
11
//! circuits for a client.  It should construct circuits in response
12
//! to a client's needs, and preemptively construct circuits so as to
13
//! anticipate those needs.  If a client request can be satisfied with
14
//! an existing circuit, it should return that circuit instead of
15
//! constructing a new one.
16
//!
17
//! # Limitations
18
//!
19
//! But for now, this `tor-circmgr` code is extremely preliminary; its
20
//! data structures are all pretty bad, and it's likely that the API
21
//! is wrong too.
22

            
23
#![deny(missing_docs)]
24
#![warn(noop_method_call)]
25
#![deny(unreachable_pub)]
26
#![warn(clippy::all)]
27
#![deny(clippy::await_holding_lock)]
28
#![deny(clippy::cargo_common_metadata)]
29
#![deny(clippy::cast_lossless)]
30
#![deny(clippy::checked_conversions)]
31
#![warn(clippy::cognitive_complexity)]
32
#![deny(clippy::debug_assert_with_mut_call)]
33
#![deny(clippy::exhaustive_enums)]
34
#![deny(clippy::exhaustive_structs)]
35
#![deny(clippy::expl_impl_clone_on_copy)]
36
#![deny(clippy::fallible_impl_from)]
37
#![deny(clippy::implicit_clone)]
38
#![deny(clippy::large_stack_arrays)]
39
#![warn(clippy::manual_ok_or)]
40
#![deny(clippy::missing_docs_in_private_items)]
41
#![deny(clippy::missing_panics_doc)]
42
#![warn(clippy::needless_borrow)]
43
#![warn(clippy::needless_pass_by_value)]
44
#![warn(clippy::option_option)]
45
#![warn(clippy::rc_buffer)]
46
#![deny(clippy::ref_option_ref)]
47
#![warn(clippy::semicolon_if_nothing_returned)]
48
#![warn(clippy::trait_duplication_in_bounds)]
49
#![deny(clippy::unnecessary_wraps)]
50
#![warn(clippy::unseparated_literal_suffix)]
51
#![deny(clippy::unwrap_used)]
52

            
53
use tor_chanmgr::ChanMgr;
54
use tor_netdir::{fallback::FallbackDir, NetDir};
55
use tor_proto::circuit::{CircParameters, ClientCirc, UniqId};
56
use tor_rtcompat::Runtime;
57

            
58
use std::convert::TryInto;
59
use std::sync::{Arc, Mutex};
60
use std::time::Instant;
61
use tracing::{debug, error, info, warn};
62

            
63
pub mod build;
64
mod config;
65
mod err;
66
mod impls;
67
mod mgr;
68
pub mod path;
69
mod preemptive;
70
mod timeouts;
71
mod usage;
72

            
73
pub use err::Error;
74
pub use usage::{IsolationToken, StreamIsolation, StreamIsolationBuilder, TargetPort, TargetPorts};
75

            
76
pub use config::{
77
    CircMgrConfig, CircMgrConfigBuilder, CircuitTiming, CircuitTimingBuilder, PathConfig,
78
    PathConfigBuilder, PreemptiveCircuitConfig, PreemptiveCircuitConfigBuilder,
79
};
80

            
81
use crate::preemptive::PreemptiveCircuitPredictor;
82
use usage::TargetCircUsage;
83

            
84
/// A Result type as returned from this crate.
85
pub type Result<T> = std::result::Result<T, Error>;
86

            
87
/// Type alias for dynamic StorageHandle that can handle our timeout state.
88
type TimeoutStateHandle = tor_persist::DynStorageHandle<timeouts::pareto::ParetoTimeoutState>;
89

            
90
/// Key used to load timeout state information.
91
const PARETO_TIMEOUT_DATA_KEY: &str = "circuit_timeouts";
92

            
93
/// Represents what we know about the Tor network.
94
///
95
/// This can either be a complete directory, or a list of fallbacks.
96
///
97
/// Not every DirInfo can be used to build every kind of circuit:
98
/// if you try to build a path with an inadequate DirInfo, you'll get a
99
/// NeedConsensus error.
100
#[derive(Debug, Copy, Clone)]
101
#[non_exhaustive]
102
pub enum DirInfo<'a> {
103
    /// A list of fallbacks, for use when we don't know a network directory.
104
    Fallbacks(&'a [FallbackDir]),
105
    /// A complete network directory
106
    Directory(&'a NetDir),
107
}
108

            
109
impl<'a> From<&'a [FallbackDir]> for DirInfo<'a> {
110
43
    fn from(v: &'a [FallbackDir]) -> DirInfo<'a> {
111
43
        DirInfo::Fallbacks(v)
112
43
    }
113
}
114
impl<'a> From<&'a NetDir> for DirInfo<'a> {
115
16
    fn from(v: &'a NetDir) -> DirInfo<'a> {
116
16
        DirInfo::Directory(v)
117
16
    }
118
}
119
impl<'a> DirInfo<'a> {
120
    /// Return a set of circuit parameters for this DirInfo.
121
3
    fn circ_params(&self) -> CircParameters {
122
3
        use tor_netdir::params::NetParameters;
123
3
        /// Extract a CircParameters from the NetParameters from a
124
3
        /// consensus.  We use a common function for both cases here
125
3
        /// to be sure that we look at the defaults from NetParameters
126
3
        /// code.
127
3
        fn from_netparams(inp: &NetParameters) -> CircParameters {
128
3
            let mut p = CircParameters::default();
129
3
            if let Err(e) = p.set_initial_send_window(inp.circuit_window.get() as u16) {
130
3
                warn!("Invalid parameter in directory: {}", e);
131
3
            }
132
3
            p.set_extend_by_ed25519_id(inp.extend_by_ed25519_id.into());
133
3
            p
134
3
        }
135
3

            
136
3
        match self {
137
1
            DirInfo::Fallbacks(_) => from_netparams(&NetParameters::default()),
138
2
            DirInfo::Directory(d) => from_netparams(d.params()),
139
        }
140
3
    }
141
}
142

            
143
/// A Circuit Manager (CircMgr) manages a set of circuits, returning them
144
/// when they're suitable, and launching them if they don't already exist.
145
///
146
/// Right now, its notion of "suitable" is quite rudimentary: it just
147
/// believes in two kinds of circuits: Exit circuits, and directory
148
/// circuits.  Exit circuits are ones that were created to connect to
149
/// a set of ports; directory circuits were made to talk to directory caches.
150
#[derive(Clone)]
151
pub struct CircMgr<R: Runtime> {
152
    /// The underlying circuit manager object that implements our behavior.
153
    mgr: Arc<mgr::AbstractCircMgr<build::CircuitBuilder<R>, R>>,
154
    /// A preemptive circuit predictor, for, uh, building circuits preemptively.
155
    predictor: Arc<Mutex<PreemptiveCircuitPredictor>>,
156
}
157

            
158
impl<R: Runtime> CircMgr<R> {
159
    /// Construct a new circuit manager.
160
2
    pub fn new<SM>(
161
2
        config: CircMgrConfig,
162
2
        storage: SM,
163
2
        runtime: &R,
164
2
        chanmgr: Arc<ChanMgr<R>>,
165
2
    ) -> Result<Arc<Self>>
166
2
    where
167
2
        SM: tor_persist::StateMgr + Send + Sync + 'static,
168
2
    {
169
2
        let CircMgrConfig {
170
2
            path_rules,
171
2
            circuit_timing,
172
2
            preemptive_circuits,
173
2
        } = config;
174
2

            
175
2
        let preemptive = Arc::new(Mutex::new(PreemptiveCircuitPredictor::new(
176
2
            preemptive_circuits,
177
2
        )));
178

            
179
2
        let guardmgr = tor_guardmgr::GuardMgr::new(runtime.clone(), storage.clone())?;
180

            
181
2
        let storage_handle = storage.create_handle(PARETO_TIMEOUT_DATA_KEY);
182
2

            
183
2
        let builder = build::CircuitBuilder::new(
184
2
            runtime.clone(),
185
2
            chanmgr,
186
2
            path_rules,
187
2
            storage_handle,
188
2
            guardmgr,
189
2
        );
190
2
        let mgr = mgr::AbstractCircMgr::new(builder, runtime.clone(), circuit_timing);
191
2
        let circmgr = Arc::new(CircMgr {
192
2
            mgr: Arc::new(mgr),
193
2
            predictor: preemptive,
194
2
        });
195
2

            
196
2
        Ok(circmgr)
197
2
    }
198

            
199
    /// Try to change our configuration settings to `new_config`.
200
    ///
201
    /// The actual behavior here will depend on the value of `how`.
202
    pub fn reconfigure(
203
        &self,
204
        new_config: &CircMgrConfig,
205
        how: tor_config::Reconfigure,
206
    ) -> std::result::Result<(), tor_config::ReconfigureError> {
207
        let old_path_rules = self.mgr.peek_builder().path_config();
208
        let predictor = self.predictor.lock().expect("poisoned lock");
209
        let preemptive_circuits = predictor.config();
210
        if preemptive_circuits.initial_predicted_ports
211
            != new_config.preemptive_circuits.initial_predicted_ports
212
        {
213
            // This change has no effect, since the list of ports was _initial_.
214
            how.cannot_change("preemptive_circuits.initial_predicted_ports")?;
215
        }
216

            
217
        if how == tor_config::Reconfigure::CheckAllOrNothing {
218
            return Ok(());
219
        }
220

            
221
        let discard_circuits = !new_config
222
            .path_rules
223
            .at_least_as_permissive_as(&old_path_rules);
224

            
225
        self.mgr
226
            .peek_builder()
227
            .set_path_config(new_config.path_rules.clone());
228
        self.mgr
229
            .set_circuit_timing(new_config.circuit_timing.clone());
230
        predictor.set_config(new_config.preemptive_circuits.clone());
231

            
232
        if discard_circuits {
233
            // TODO(nickm): Someday, we might want to take a more lenient approach, and only
234
            // retire those circuits that do not conform to the new path rules.
235
            info!("Path configuration has become more restrictive: retiring existing circuits.");
236
            self.retire_all_circuits();
237
        }
238
        Ok(())
239
    }
240

            
241
    /// Reload state from the state manager.
242
    ///
243
    /// We only call this method if we _don't_ have the lock on the state
244
    /// files.  If we have the lock, we only want to save.
245
    pub fn reload_persistent_state(&self) -> Result<()> {
246
        self.mgr.peek_builder().reload_state()?;
247
        Ok(())
248
    }
249

            
250
    /// Switch from having an unowned persistent state to having an owned one.
251
    ///
252
    /// Requires that we hold the lock on the state files.
253
    pub fn upgrade_to_owned_persistent_state(&self) -> Result<()> {
254
2
        self.mgr.peek_builder().upgrade_to_owned_state()?;
255
2
        Ok(())
256
2
    }
257

            
258
    /// Flush state to the state manager, if there is any unsaved state and
259
    /// we have the lock.
260
    ///
261
    /// Return true if we saved something; false if we didn't have the lock.
262
    pub fn store_persistent_state(&self) -> Result<bool> {
263
        self.mgr.peek_builder().save_state()
264
    }
265

            
266
    /// Reconfigure this circuit manager using the latest set of
267
    /// network parameters.
268
    ///
269
    /// (NOTE: for now, this only affects circuit timeout estimation.)
270
    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
271
        self.mgr.update_network_parameters(p);
272
        self.mgr.peek_builder().update_network_parameters(p);
273
    }
274

            
275
    /// Return true if `netdir` has enough information to be used for this
276
    /// circuit manager.
277
    ///
278
    /// (This will check whether the netdir is missing any primary guard
279
    /// microdescriptors)
280
    pub fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
281
        self.mgr
282
            .peek_builder()
283
            .guardmgr()
284
            .netdir_is_sufficient(netdir)
285
    }
286

            
287
    /// Reconfigure this circuit manager using the latest network directory.
288
    ///
289
    /// This should be called on _any_ change to the network, as opposed to
290
    /// [`CircMgr::update_network_parameters`], which should only be
291
    /// called when the parameters change.
292
    pub fn update_network(&self, netdir: &NetDir) {
293
        self.mgr.peek_builder().guardmgr().update_network(netdir);
294
    }
295

            
296
    /// Return a circuit suitable for sending one-hop BEGINDIR streams,
297
    /// launching it if necessary.
298
    pub async fn get_or_launch_dir(&self, netdir: DirInfo<'_>) -> Result<ClientCirc> {
299
        self.expire_circuits();
300
        let usage = TargetCircUsage::Dir;
301
        self.mgr.get_or_launch(&usage, netdir).await
302
    }
303

            
304
    /// Return a circuit suitable for exiting to all of the provided
305
    /// `ports`, launching it if necessary.
306
    ///
307
    /// If the list of ports is empty, then the chosen circuit will
308
    /// still end at _some_ exit.
309
    pub async fn get_or_launch_exit(
310
        &self,
311
        netdir: DirInfo<'_>, // TODO: This has to be a NetDir.
312
        ports: &[TargetPort],
313
        isolation: StreamIsolation,
314
    ) -> Result<ClientCirc> {
315
        self.expire_circuits();
316
        let time = Instant::now();
317
        {
318
            let mut predictive = self.predictor.lock().expect("preemptive lock poisoned");
319
            if ports.is_empty() {
320
                predictive.note_usage(None, time);
321
            } else {
322
                for port in ports.iter() {
323
                    predictive.note_usage(Some(*port), time);
324
                }
325
            }
326
        }
327
        let ports = ports.iter().map(Clone::clone).collect();
328
        let usage = TargetCircUsage::Exit { ports, isolation };
329
        self.mgr.get_or_launch(&usage, netdir).await
330
    }
331

            
332
    /// Launch circuits preemptively, using the preemptive circuit predictor's predictions.
333
    ///
334
    /// # Note
335
    ///
336
    /// This function is invoked periodically from the
337
    /// `arti-client` crate, based on timings from the network
338
    /// parameters. As with `launch_timeout_testing_circuit_if_appropriate`, this
339
    /// should ideally be refactored to be internal to this crate, and not be a
340
    /// public API here.
341
    pub async fn launch_circuits_preemptively(&self, netdir: DirInfo<'_>) {
342
        debug!("Checking preemptive circuit predictions.");
343
        let (circs, threshold) = {
344
            let preemptive = self.predictor.lock().expect("preemptive lock poisoned");
345
            let threshold = preemptive.config().disable_at_threshold;
346
            (preemptive.predict(), threshold)
347
        };
348

            
349
        if self.mgr.n_circs() >= threshold {
350
            return;
351
        }
352

            
353
        let futures = circs
354
            .iter()
355
            .map(|usage| self.mgr.get_or_launch(usage, netdir));
356
        let results = futures::future::join_all(futures).await;
357
        for (i, result) in results.iter().enumerate() {
358
            match result {
359
                Ok(_) => debug!("Circuit exists (or was created) for {:?}", circs[i]),
360
                Err(e) => warn!("Failed to build preemptive circuit {:?}: {}", circs[i], e),
361
            }
362
        }
363
    }
364

            
365
    /// If `circ_id` is the unique identifier for a circuit that we're
366
    /// keeping track of, don't give it out for any future requests.
367
    pub fn retire_circ(&self, circ_id: &UniqId) {
368
        let _ = self.mgr.take_circ(circ_id);
369
    }
370

            
371
    /// Mark every circuit that we have launched so far as unsuitable for
372
    /// any future requests.  This won't close existing circuits that have
373
    /// streams attached to them, but it will prevent any future streams from
374
    /// being attached.
375
    ///
376
    /// TODO: we may want to expose this eventually.  If we do, we should
377
    /// be very clear that you don't want to use it haphazardly.
378
    pub(crate) fn retire_all_circuits(&self) {
379
        self.mgr.retire_all_circuits();
380
    }
381

            
382
    /// Expire every circuit that has been dirty for too long.
383
    ///
384
    /// Expired circuits are not closed while they still have users,
385
    /// but they are no longer given out for new requests.
386
    fn expire_circuits(&self) {
387
        // TODO: I would prefer not to call this at every request, but
388
        // it should be fine for now.  (At some point we may no longer
389
        // need this, or might not need to call it so often, now that
390
        // our circuit expiration runs on scheduld timers via
391
        // spawn_expiration_task.)
392
        let now = self.mgr.peek_runtime().now();
393
        self.mgr.expire_circs(now);
394
    }
395

            
396
    /// If we need to launch a testing circuit to judge our circuit
397
    /// build timeouts timeouts, do so.
398
    ///
399
    /// # Note
400
    ///
401
    /// This function is invoked periodically from the
402
    /// `arti-client` crate, based on timings from the network
403
    /// parameters.  Please don't invoke it on your own; I hope we can
404
    /// have this API go away in the future.
405
    ///
406
    /// I would much prefer to have this _not_ be a public API, and
407
    /// instead have it be a daemon task.  The trouble is that it
408
    /// needs to get a NetDir as input, and that isn't possible with
409
    /// the current CircMgr design.  See
410
    /// [arti#161](https://gitlab.torproject.org/tpo/core/arti/-/issues/161).
411
    pub fn launch_timeout_testing_circuit_if_appropriate(&self, netdir: &NetDir) -> Result<()> {
412
        if !self.mgr.peek_builder().learning_timeouts() {
413
            return Ok(());
414
        }
415
        // We expire any too-old circuits here, so they don't get
416
        // counted towards max_circs.
417
        self.expire_circuits();
418
        let max_circs: u64 = netdir
419
            .params()
420
            .cbt_max_open_circuits_for_testing
421
            .try_into()
422
            .expect("Out-of-bounds result from BoundedInt32");
423
        if (self.mgr.n_circs() as u64) < max_circs {
424
            // Actually launch the circuit!
425
            let usage = TargetCircUsage::TimeoutTesting;
426
            let dirinfo = netdir.into();
427
            let mgr = Arc::clone(&self.mgr);
428
            debug!("Launching a circuit to test build times.");
429
            let _ = mgr.launch_by_usage(&usage, dirinfo)?;
430
        }
431

            
432
        Ok(())
433
    }
434
}
435

            
436
impl<R: Runtime> Drop for CircMgr<R> {
437
    fn drop(&mut self) {
438
        match self.store_persistent_state() {
439
            Ok(true) => info!("Flushed persistent state at exit."),
440
            Ok(false) => debug!("Lock not held; no state to flush."),
441
            Err(e) => error!("Unable to flush state on circuit manager drop: {}", e),
442
        }
443
    }
444
}
445

            
446
#[cfg(test)]
447
mod test {
448
    #![allow(clippy::unwrap_used)]
449
    use super::*;
450

            
451
    /// Helper type used to help type inference.
452
    pub(crate) type OptDummyGuardMgr<'a> =
453
        Option<&'a tor_guardmgr::GuardMgr<tor_rtcompat::tokio::TokioNativeTlsRuntime>>;
454

            
455
    #[test]
456
    fn get_params() {
457
        use tor_netdir::{MdReceiver, PartialNetDir};
458
        use tor_netdoc::doc::netstatus::NetParams;
459
        // If it's just fallbackdir, we get the default parameters.
460
        let di: DirInfo<'_> = (&[][..]).into();
461

            
462
        let p1 = di.circ_params();
463
        assert!(!p1.extend_by_ed25519_id());
464
        assert_eq!(p1.initial_send_window(), 1000);
465

            
466
        // Now try with a directory and configured parameters.
467
        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
468
        let mut params = NetParams::default();
469
        params.set("circwindow".into(), 100);
470
        params.set("ExtendByEd25519ID".into(), 1);
471
        let mut dir = PartialNetDir::new(consensus, Some(&params));
472
        for m in microdescs {
473
            dir.add_microdesc(m);
474
        }
475
        let netdir = dir.unwrap_if_sufficient().unwrap();
476
        let di: DirInfo<'_> = (&netdir).into();
477
        let p2 = di.circ_params();
478
        assert_eq!(p2.initial_send_window(), 100);
479
        assert!(p2.extend_by_ed25519_id());
480

            
481
        // Now try with a bogus circwindow value.
482
        let (consensus, microdescs) = tor_netdir::testnet::construct_network().unwrap();
483
        let mut params = NetParams::default();
484
        params.set("circwindow".into(), 100_000);
485
        params.set("ExtendByEd25519ID".into(), 1);
486
        let mut dir = PartialNetDir::new(consensus, Some(&params));
487
        for m in microdescs {
488
            dir.add_microdesc(m);
489
        }
490
        let netdir = dir.unwrap_if_sufficient().unwrap();
491
        let di: DirInfo<'_> = (&netdir).into();
492
        let p2 = di.circ_params();
493
        assert_eq!(p2.initial_send_window(), 1000); // Not 100_000
494
        assert!(p2.extend_by_ed25519_id());
495
    }
496
}