1
//! Abstract code to manage a set of circuits.
2
//!
3
//! This module implements the real logic for deciding when and how to
4
//! launch circuits, and for which circuits to hand out in response to
5
//! which requests.
6
//!
7
//! For testing and abstraction purposes, this module _does not_
8
//! actually know anything about circuits _per se_.  Instead,
9
//! everything is handled using a set of traits that are internal to this
10
//! crate:
11
//!
12
//!  * [`AbstractCirc`] is a view of a circuit.
13
//!  * [`AbstractSpec`] represents a circuit's possible usages.
14
//!  * [`AbstractCircBuilder`] knows how to build an `AbstractCirc`.
15
//!
16
//! Using these traits, the [`AbstractCircMgr`] object manages a set of
17
//! circuits, launching them as necessary, and keeping track of the
18
//! restrictions on their use.
19

            
20
// TODO:
21
// - Testing
22
//    - Error from prepare_action()
23
//    - Error reported by restrict_mut?
24

            
25
use crate::config::CircuitTiming;
26
use crate::{DirInfo, Error, Result};
27

            
28
use retry_error::RetryError;
29
use tor_config::MutCfg;
30
use tor_error::internal;
31
use tor_rtcompat::{Runtime, SleepProviderExt};
32

            
33
use async_trait::async_trait;
34
use futures::channel::{mpsc, oneshot};
35
use futures::future::{FutureExt, Shared};
36
use futures::stream::{FuturesUnordered, StreamExt};
37
use futures::task::SpawnExt;
38
use std::collections::HashMap;
39
use std::convert::TryInto;
40
use std::fmt::Debug;
41
use std::hash::Hash;
42
use std::panic::AssertUnwindSafe;
43
use std::sync::{self, Arc, Weak};
44
use std::time::{Duration, Instant};
45
use tracing::{debug, info, warn};
46
use weak_table::PtrWeakHashSet;
47

            
48
mod streams;
49

            
50
/// Represents restrictions on circuit usage.
51
///
52
/// An `AbstractSpec` describes what a circuit can be used for.  Each
53
/// `AbstractSpec` type has an associated `Usage` type that
54
/// describes a _single_ operation that the circuit might support or
55
/// not.
56
///
57
/// (For example, an `AbstractSpec` can describe a set of ports
58
/// supported by the exit relay on a circuit.  In that case, its
59
/// `Usage` type could be a single port that a client wants to
60
/// connect to.)
61
///
62
/// If an `AbstractSpec` A allows every operation described in a
63
/// `Usage` B, we say that A "supports" B.
64
///
65
/// If one `AbstractSpec` A supports every operation supported by
66
/// another `AbstractSpec` B, we say that A "contains" B.
67
///
68
/// Some circuits can be used for either of two operations, but not both.
69
/// For example, a circuit that is used as a rendezvous point can't
70
/// be used as an introduction point.  To represent these transitions,
71
/// we use a `restrict` operation.  Every time a circuit is used for something
72
/// new, that new use "restricts" the circuit's spec, and narrows
73
/// what the circuit can be used for.
74
pub(crate) trait AbstractSpec: Clone + Debug {
75
    /// A type to represent the kind of usages that this circuit permits.
76
    type Usage: Clone + Debug + Send + Sync;
77

            
78
    /// Return true if this spec permits the usage described by `other`.
79
    ///
80
    /// If this function returns `true`, then it is okay to use a circuit
81
    /// with this spec for the target usage described by `other`.
82
    fn supports(&self, other: &Self::Usage) -> bool;
83

            
84
    /// Change the value of this spec based on the circuit having
85
    /// been used for `usage`.
86
    ///
87
    /// # Requirements
88
    ///
89
    /// Must return an error and make no changes to `self` if `usage`
90
    /// was not supported by this spec.
91
    ///
92
    /// If this function returns Ok, the resulting spec must be
93
    /// contained by the original spec, and must support `usage`.
94
    fn restrict_mut(&mut self, usage: &Self::Usage) -> Result<()>;
95

            
96
    /// Find all open circuits in `list` whose specifications permit
97
    /// `usage`.
98
    ///
99
    /// By default, this calls `abstract_spec_find_supported`.
100
108
    fn find_supported<'a, 'b, C: AbstractCirc>(
101
108
        list: impl Iterator<Item = &'b mut OpenEntry<Self, C>>,
102
108
        usage: &Self::Usage,
103
108
    ) -> Vec<&'b mut OpenEntry<Self, C>> {
104
108
        abstract_spec_find_supported(list, usage)
105
108
    }
106
}
107

            
108
/// Default implementation of `AbstractSpec::find_supported`; provided as a separate function
109
/// so it can be used in overridden implementations.
110
///
111
/// This returns the all circuits in `list` for which `circuit.spec.supports(usage)` returns
112
/// `true`.
113
116
pub(crate) fn abstract_spec_find_supported<'a, 'b, S: AbstractSpec, C: AbstractCirc>(
114
116
    list: impl Iterator<Item = &'b mut OpenEntry<S, C>>,
115
116
    usage: &S::Usage,
116
116
) -> Vec<&'b mut OpenEntry<S, C>> {
117
116
    list.filter(|circ| circ.supports(usage)).collect()
118
116
}
119

            
120
/// Minimal abstract view of a circuit.
121
///
122
/// From this module's point of view, circuits are simply objects
123
/// with unique identities, and a possible closed-state.
124
pub(crate) trait AbstractCirc: Clone + Debug {
125
    /// Type for a unique identifier for circuits.
126
    type Id: Clone + Debug + Hash + Eq + Send + Sync;
127
    /// Return the unique identifier for this circuit.
128
    ///
129
    /// # Requirements
130
    ///
131
    /// The values returned by this function are unique for distinct
132
    /// circuits.
133
    fn id(&self) -> Self::Id;
134

            
135
    /// Return true if this circuit is usable for some purpose.
136
    ///
137
    /// Reasons a circuit might be unusable include being closed.
138
    fn usable(&self) -> bool;
139
}
140

            
141
/// A plan for an `AbstractCircBuilder` that can maybe be mutated by tests.
142
///
143
/// You should implement this trait using all default methods for all code that isn't test code.
144
pub(crate) trait MockablePlan {
145
    /// Add a reason string that was passed to `SleepProvider::block_advance()` to this object
146
    /// so that it knows what to pass to `::release_advance()`.
147
    fn add_blocked_advance_reason(&mut self, _reason: String) {}
148
}
149

            
150
/// An object that knows how to build circuits.
151
///
152
/// AbstractCircBuilder creates circuits in two phases.  First, a plan is
153
/// made for how to build the circuit.  This planning phase should be
154
/// relatively fast, and must not suspend or block.  Its purpose is to
155
/// get an early estimate of which operations the circuit will be able
156
/// to support when it's done.
157
///
158
/// Second, the circuit is actually built, using the plan as input.
159
#[async_trait]
160
pub(crate) trait AbstractCircBuilder: Send + Sync {
161
    /// The specification type describing what operations circuits can
162
    /// be used for.
163
    type Spec: AbstractSpec + Debug + Send + Sync;
164
    /// The circuit type that this builder knows how to build.
165
    type Circ: AbstractCirc + Send + Sync;
166
    /// An opaque type describing how a given circuit will be built.
167
    /// It may represent some or all of a path-or it may not.
168
    // TODO: It would be nice to have this parameterized on a lifetime,
169
    // and have that lifetime depend on the lifetime of the directory.
170
    // But I don't think that rust can do that.
171

            
172
    // HACK(eta): I don't like the fact that `MockablePlan` is necessary here.
173
    type Plan: Send + Debug + MockablePlan;
174

            
175
    // TODO: I'd like to have a Dir type here to represent
176
    // create::DirInfo, but that would need to be parameterized too,
177
    // and would make everything complicated.
178

            
179
    /// Form a plan for how to build a new circuit that supports `usage`.
180
    ///
181
    /// Return an opaque Plan object, and a new spec describing what
182
    /// the circuit will actually support when it's built.  (For
183
    /// example, if the input spec requests a circuit that connect to
184
    /// port 80, then "planning" the circuit might involve picking an
185
    /// exit that supports port 80, and the resulting spec might be
186
    /// the exit's complete list of supported ports.)
187
    ///
188
    /// # Requirements
189
    ///
190
    /// The resulting Spec must support `usage`.
191
    fn plan_circuit(
192
        &self,
193
        usage: &<Self::Spec as AbstractSpec>::Usage,
194
        dir: DirInfo<'_>,
195
    ) -> Result<(Self::Plan, Self::Spec)>;
196

            
197
    /// Construct a circuit according to a given plan.
198
    ///
199
    /// On success, return a spec describing what the circuit can be used for,
200
    /// and the circuit that was just constructed.
201
    ///
202
    /// This function should implement some kind of a timeout for
203
    /// circuits that are taking too long.
204
    ///
205
    /// # Requirements
206
    ///
207
    /// The spec that this function returns _must_ support the usage
208
    /// that was originally passed to `plan_circuit`.  It _must_ also
209
    /// contain the spec that was originally returned by
210
    /// `plan_circuit`.
211
    async fn build_circuit(&self, plan: Self::Plan) -> Result<(Self::Spec, Self::Circ)>;
212

            
213
    /// Return a "parallelism factor" with which circuits should be
214
    /// constructed for a given purpose.
215
    ///
216
    /// If this function returns N, then whenever we launch circuits
217
    /// for this purpose, then we launch N in parallel.
218
    ///
219
    /// The default implementation returns 1.  The value of 0 is
220
    /// treated as if it were 1.
221
83
    fn launch_parallelism(&self, usage: &<Self::Spec as AbstractSpec>::Usage) -> usize {
222
83
        let _ = usage; // default implementation ignores this.
223
83
        1
224
83
    }
225

            
226
    /// Return a "parallelism factor" for which circuits should be
227
    /// used for a given purpose.
228
    ///
229
    /// If this function returns N, then whenever we select among
230
    /// open circuits for this purpose, we choose at random from the
231
    /// best N.
232
    ///
233
    /// The default implementation returns 1.  The value of 0 is
234
    /// treated as if it were 1.
235
    // TODO: Possibly this doesn't belong in this trait.
236
18
    fn select_parallelism(&self, usage: &<Self::Spec as AbstractSpec>::Usage) -> usize {
237
18
        let _ = usage; // default implementation ignores this.
238
18
        1
239
18
    }
240

            
241
    /// Return true if we are currently attempting to learn circuit
242
    /// timeouts by building testing circuits.
243
    fn learning_timeouts(&self) -> bool;
244
}
245

            
246
/// Enumeration to track the expiration state of a circuit.
247
///
248
/// A circuit an either be unused (at which point it should expire if it is
249
/// _still unused_ by a certain time, or dirty (at which point it should
250
/// expire after a certain duration).
251
///
252
/// All circuits start out "unused" and become "dirty" when their spec
253
/// is first restricted -- that is, when they are first handed out to be
254
/// used for a request.
255
7
#[derive(Debug, Clone, PartialEq)]
256
enum ExpirationInfo {
257
    /// The circuit has never been used.
258
    Unused {
259
        /// A time when the circuit should expire.
260
        use_before: Instant,
261
    },
262
    /// The circuit has been used (or at least, restricted for use with a
263
    /// request) at least once.
264
    Dirty {
265
        /// The time at which this circuit's spec was first restricted.
266
        dirty_since: Instant,
267
    },
268
}
269

            
270
impl ExpirationInfo {
271
    /// Return an ExpirationInfo for a newly created circuit.
272
13
    fn new(use_before: Instant) -> Self {
273
13
        ExpirationInfo::Unused { use_before }
274
13
    }
275

            
276
    /// Mark this ExpirationInfo as dirty, if it is not already dirty.
277
34
    fn mark_dirty(&mut self, now: Instant) {
278
34
        if matches!(self, ExpirationInfo::Unused { .. }) {
279
12
            *self = ExpirationInfo::Dirty { dirty_since: now };
280
22
        }
281
34
    }
282
}
283

            
284
/// An entry for an open circuit held by an `AbstractCircMgr`.
285
7
#[derive(PartialEq, Debug, Clone)]
286
pub(crate) struct OpenEntry<S, C> {
287
    /// Current AbstractCircSpec for this circuit's permitted usages.
288
    spec: S,
289
    /// The circuit under management.
290
    circ: C,
291
    /// When does this circuit expire?
292
    ///
293
    /// (Note that expired circuits are removed from the manager,
294
    /// which does not actually close them until there are no more
295
    /// references to them.)
296
    expiration: ExpirationInfo,
297
}
298

            
299
impl<S: AbstractSpec, C: AbstractCirc> OpenEntry<S, C> {
300
    /// Make a new OpenEntry for a given circuit and spec.
301
16
    fn new(spec: S, circ: C, expiration: ExpirationInfo) -> Self {
302
16
        OpenEntry {
303
16
            spec,
304
16
            circ,
305
16
            expiration,
306
16
        }
307
16
    }
308

            
309
    /// Return true if this circuit can be used for `usage`.
310
54
    fn supports(&self, usage: &<S as AbstractSpec>::Usage) -> bool {
311
54
        self.circ.usable() && self.spec.supports(usage)
312
54
    }
313

            
314
    /// Change this circuit's permissible usage, based on its having
315
    /// been used for `usage` at time `now`.
316
    ///
317
    /// Return an error if this circuit may not be used for `usage`.
318
    fn restrict_mut(&mut self, usage: &<S as AbstractSpec>::Usage, now: Instant) -> Result<()> {
319
35
        self.spec.restrict_mut(usage)?;
320
34
        self.expiration.mark_dirty(now);
321
34
        Ok(())
322
35
    }
323

            
324
    /// Find the "best" entry from a slice of OpenEntry for supporting
325
    /// a given `usage`.
326
    ///
327
    /// If `parallelism` is some N greater than 1, we pick randomly
328
    /// from the best `N` circuits.
329
    ///
330
    /// # Requirements
331
    ///
332
    /// Requires that `ents` is nonempty, and that every element of `ents`
333
    /// supports `spec`.
334
18
    fn find_best<'a>(
335
18
        // we do not mutate `ents`, but to return `&mut Self` we must have a mutable borrow
336
18
        ents: &'a mut [&'a mut Self],
337
18
        usage: &<S as AbstractSpec>::Usage,
338
18
        parallelism: usize,
339
18
    ) -> &'a mut Self {
340
18
        let _ = usage; // not yet used.
341
18
        use rand::seq::SliceRandom;
342
18
        let parallelism = parallelism.clamp(1, ents.len());
343
18
        // TODO: Actually look over the whole list to see which is better.
344
18
        let slice = &mut ents[0..parallelism];
345
18
        let mut rng = rand::thread_rng();
346
18
        slice.choose_mut(&mut rng).expect("Input list was empty")
347
18
    }
348

            
349
    /// Return true if this circuit has been marked as dirty before
350
    /// `dirty_cutoff`, or if it is an unused circuit set to expire before
351
    /// `unused_cutoff`.
352
2
    fn should_expire(&self, unused_cutoff: Instant, dirty_cutoff: Instant) -> bool {
353
2
        match self.expiration {
354
            ExpirationInfo::Unused { use_before } => use_before <= unused_cutoff,
355
2
            ExpirationInfo::Dirty { dirty_since } => dirty_since <= dirty_cutoff,
356
        }
357
2
    }
358
}
359

            
360
/// A result type whose "Ok" value is the Id for a circuit from B.
361
type PendResult<B> = Result<<<B as AbstractCircBuilder>::Circ as AbstractCirc>::Id>;
362

            
363
/// An in-progress circuit request tracked by an `AbstractCircMgr`.
364
///
365
/// (In addition to tracking circuits, `AbstractCircMgr` tracks
366
/// _requests_ for circuits.  The manager uses these entries if it
367
/// finds that some circuit created _after_ a request first launched
368
/// might meet the request's requirements.)
369
struct PendingRequest<B: AbstractCircBuilder> {
370
    /// Usage for the operation requested by this request
371
    usage: <B::Spec as AbstractSpec>::Usage,
372
    /// A channel to use for telling this request about circuits that it
373
    /// might like.
374
    notify: mpsc::Sender<PendResult<B>>,
375
}
376

            
377
impl<B: AbstractCircBuilder> PendingRequest<B> {
378
    /// Return true if this request would be supported by `spec`.
379
7
    fn supported_by(&self, spec: &B::Spec) -> bool {
380
7
        spec.supports(&self.usage)
381
7
    }
382
}
383

            
384
/// An entry for an under-construction in-progress circuit tracked by
385
/// an `AbstractCircMgr`.
386
#[derive(Debug)]
387
struct PendingEntry<B: AbstractCircBuilder> {
388
    /// Specification that this circuit will support, if every pending
389
    /// request that is waiting for it is attached to it.
390
    ///
391
    /// This spec becomes more and more restricted as more pending
392
    /// requests are waiting for this circuit.
393
    ///
394
    /// This spec is contained by circ_spec, and must support the usage
395
    /// of every pending request that's waiting for this circuit.
396
    tentative_assignment: sync::Mutex<B::Spec>,
397
    /// A shared future for requests to use when waiting for
398
    /// notification of this circuit's success.
399
    receiver: Shared<oneshot::Receiver<PendResult<B>>>,
400
}
401

            
402
impl<B: AbstractCircBuilder> PendingEntry<B> {
403
    /// Make a new PendingEntry that starts out supporting a given
404
    /// spec.  Return that PendingEntry, along with a Sender to use to
405
    /// report the result of building this circuit.
406
52
    fn new(circ_spec: &B::Spec) -> (Self, oneshot::Sender<PendResult<B>>) {
407
52
        let tentative_assignment = sync::Mutex::new(circ_spec.clone());
408
52
        let (sender, receiver) = oneshot::channel();
409
52
        let receiver = receiver.shared();
410
52
        let entry = PendingEntry {
411
52
            tentative_assignment,
412
52
            receiver,
413
52
        };
414
52
        (entry, sender)
415
52
    }
416

            
417
    /// Return true if this circuit's current tentative assignment
418
    /// supports `usage`.
419
11
    fn supports(&self, usage: &<B::Spec as AbstractSpec>::Usage) -> bool {
420
11
        let assignment = self.tentative_assignment.lock().expect("poisoned lock");
421
11
        assignment.supports(usage)
422
11
    }
423

            
424
    /// Try to change the tentative assignment of this circuit by
425
    /// restricting it for use with `usage`.
426
    ///
427
    /// Return an error if the current tentative assignment didn't
428
    /// support `usage` in the first place.
429
    fn tentative_restrict_mut(&self, usage: &<B::Spec as AbstractSpec>::Usage) -> Result<()> {
430
7
        if let Ok(mut assignment) = self.tentative_assignment.lock() {
431
7
            assignment.restrict_mut(usage)?;
432
        }
433
7
        Ok(())
434
7
    }
435

            
436
    /// Find the best PendingEntry values from a slice for use with
437
    /// `usage`.
438
    ///
439
    /// # Requirements
440
    ///
441
    /// The `ents` slice must not be empty.  Every element of `ents`
442
    /// must support the given spec.
443
7
    fn find_best(ents: &[Arc<Self>], usage: &<B::Spec as AbstractSpec>::Usage) -> Vec<Arc<Self>> {
444
7
        // TODO: Actually look over the whole list to see which is better.
445
7
        let _ = usage; // currently unused
446
7
        vec![Arc::clone(&ents[0])]
447
7
    }
448
}
449

            
450
/// Wrapper type to represent the state between planning to build a
451
/// circuit and constructing it.
452
#[derive(Debug)]
453
struct CircBuildPlan<B: AbstractCircBuilder> {
454
    /// The Plan object returned by [`AbstractCircBuilder::plan_circuit`].
455
    plan: B::Plan,
456
    /// A sender to notify any pending requests when this circuit is done.
457
    sender: oneshot::Sender<PendResult<B>>,
458
    /// A strong entry to the PendingEntry for this circuit build attempt.
459
    pending: Arc<PendingEntry<B>>,
460
}
461

            
462
/// The inner state of an [`AbstractCircMgr`].
463
struct CircList<B: AbstractCircBuilder> {
464
    /// A map from circuit ID to [`OpenEntry`] values for all managed
465
    /// open circuits.
466
    #[allow(clippy::type_complexity)]
467
    open_circs: HashMap<<B::Circ as AbstractCirc>::Id, OpenEntry<B::Spec, B::Circ>>,
468
    /// Weak-set of PendingEntry for circuits that are being built.
469
    ///
470
    /// Because this set only holds weak references, and the only
471
    /// strong reference to the PendingEntry is held by the task
472
    /// building the circuit, this set's members are lazily removed
473
    /// after the circuit is either built or fails to build.
474
    pending_circs: PtrWeakHashSet<Weak<PendingEntry<B>>>,
475
    /// Weak-set of PendingRequest for requests that are waiting for a
476
    /// circuit to be built.
477
    ///
478
    /// Because this set only holds weak references, and the only
479
    /// strong reference to the PendingRequest is held by the task
480
    /// waiting for the circuit to be built, this set's members are
481
    /// lazily removed after the request succeeds or fails.
482
    pending_requests: PtrWeakHashSet<Weak<PendingRequest<B>>>,
483
}
484

            
485
impl<B: AbstractCircBuilder> CircList<B> {
486
    /// Make a new empty `CircList`
487
13
    fn new() -> Self {
488
13
        CircList {
489
13
            open_circs: HashMap::new(),
490
13
            pending_circs: PtrWeakHashSet::new(),
491
13
            pending_requests: PtrWeakHashSet::new(),
492
13
        }
493
13
    }
494

            
495
    /// Add `e` to the list of open circuits.
496
13
    fn add_open(&mut self, e: OpenEntry<B::Spec, B::Circ>) {
497
13
        let id = e.circ.id();
498
13
        self.open_circs.insert(id, e);
499
13
    }
500

            
501
    /// Find all the usable open circuits that support `usage`.
502
    ///
503
    /// Return None if there are no such circuits.
504
108
    fn find_open(
505
108
        &mut self,
506
108
        usage: &<B::Spec as AbstractSpec>::Usage,
507
108
    ) -> Option<Vec<&mut OpenEntry<B::Spec, B::Circ>>> {
508
108
        let list = self.open_circs.values_mut();
509
108
        let v = <B::Spec as AbstractSpec>::find_supported(list, usage);
510
108
        if v.is_empty() {
511
90
            None
512
        } else {
513
18
            Some(v)
514
        }
515
108
    }
516

            
517
    /// Find an open circuit by ID.
518
    ///
519
    /// Return None if no such circuit exists in this list.
520
18
    fn get_open_mut(
521
18
        &mut self,
522
18
        id: &<B::Circ as AbstractCirc>::Id,
523
18
    ) -> Option<&mut OpenEntry<B::Spec, B::Circ>> {
524
18
        self.open_circs.get_mut(id)
525
18
    }
526

            
527
    /// Extract an open circuit by ID, removing it from this list.
528
    ///
529
    /// Return None if no such circuit exists in this list.
530
2
    fn take_open(
531
2
        &mut self,
532
2
        id: &<B::Circ as AbstractCirc>::Id,
533
2
    ) -> Option<OpenEntry<B::Spec, B::Circ>> {
534
2
        self.open_circs.remove(id)
535
2
    }
536

            
537
    /// Remove circuits based on expiration times.
538
    ///
539
    /// We remove every unused circuit that is set to expire by
540
    /// `unused_cutoff`, and every dirty circuit that has been dirty
541
    /// since before `dirty_cutoff`.
542
1
    fn expire_circs(&mut self, unused_cutoff: Instant, dirty_cutoff: Instant) {
543
1
        self.open_circs
544
2
            .retain(|_k, v| !v.should_expire(unused_cutoff, dirty_cutoff));
545
1
    }
546

            
547
    /// Remove the circuit with given `id`, if it is scheduled to
548
    /// expire now, according to the provided expiration times.
549
    fn expire_circ(
550
        &mut self,
551
        id: &<B::Circ as AbstractCirc>::Id,
552
        unused_cutoff: Instant,
553
        dirty_cutoff: Instant,
554
    ) {
555
        let should_expire = self
556
            .open_circs
557
            .get(id)
558
            .map(|v| v.should_expire(unused_cutoff, dirty_cutoff))
559
            .unwrap_or_else(|| false);
560
        if should_expire {
561
            self.open_circs.remove(id);
562
        }
563
    }
564

            
565
    /// Add `pending` to the set of in-progress circuits.
566
52
    fn add_pending_circ(&mut self, pending: Arc<PendingEntry<B>>) {
567
52
        self.pending_circs.insert(pending);
568
52
    }
569

            
570
    /// Find all pending circuits that support `usage`.
571
    ///
572
    /// If no such circuits are currently being built, return None.
573
90
    fn find_pending_circs(
574
90
        &self,
575
90
        usage: &<B::Spec as AbstractSpec>::Usage,
576
90
    ) -> Option<Vec<Arc<PendingEntry<B>>>> {
577
90
        let result: Vec<_> = self
578
90
            .pending_circs
579
90
            .iter()
580
90
            .filter(|p| p.supports(usage))
581
90
            .filter(|p| !matches!(p.receiver.peek(), Some(Err(_))))
582
90
            .collect();
583
90

            
584
90
        if result.is_empty() {
585
83
            None
586
        } else {
587
7
            Some(result)
588
        }
589
90
    }
590

            
591
    /// Return true if `circ` is still pending.
592
    ///
593
    /// A circuit will become non-pending when finishes (successfully or not), or when it's
594
    /// removed from this list via `clear_all_circuits()`.
595
13
    fn circ_is_pending(&self, circ: &Arc<PendingEntry<B>>) -> bool {
596
13
        self.pending_circs.contains(circ)
597
13
    }
598

            
599
    /// Construct and add a new entry to the set of request waiting
600
    /// for a circuit.
601
    ///
602
    /// Return the request, and a new receiver stream that it should
603
    /// use for notification of possible circuits to use.
604
56
    fn add_pending_request(&mut self, pending: &Arc<PendingRequest<B>>) {
605
56
        self.pending_requests.insert(Arc::clone(pending));
606
56
    }
607

            
608
    /// Return all pending requests that would be satisfied by a circuit
609
    /// that supports `circ_spec`.
610
8
    fn find_pending_requests(&self, circ_spec: &B::Spec) -> Vec<Arc<PendingRequest<B>>> {
611
8
        self.pending_requests
612
8
            .iter()
613
8
            .filter(|pend| pend.supported_by(circ_spec))
614
8
            .collect()
615
8
    }
616

            
617
    /// Clear all pending circuits and open circuits.
618
    fn clear_all_circuits(&mut self) {
619
        self.pending_circs.clear();
620
        self.open_circs.clear();
621
    }
622
}
623

            
624
/// Timing information for circuits that have been built but never used.
625
///
626
/// Currently taken from the network parameters.
627
struct UnusedTimings {
628
    /// Minimum lifetime of a circuit created while learning
629
    /// circuit timeouts.
630
    learning: Duration,
631
    /// Minimum lifetime of a circuit created while not learning
632
    /// circuit timeouts.
633
    not_learning: Duration,
634
}
635

            
636
// This isn't really fallible, given the definitions of the underlying
637
// types.
638
#[allow(clippy::fallible_impl_from)]
639
impl From<&tor_netdir::params::NetParameters> for UnusedTimings {
640
22
    fn from(v: &tor_netdir::params::NetParameters) -> Self {
641
22
        // These try_into() calls can't fail, so unwrap() can't panic.
642
22
        #[allow(clippy::unwrap_used)]
643
22
        UnusedTimings {
644
22
            learning: v
645
22
                .unused_client_circ_timeout_while_learning_cbt
646
22
                .try_into()
647
22
                .unwrap(),
648
22
            not_learning: v.unused_client_circ_timeout.try_into().unwrap(),
649
22
        }
650
22
    }
651
}
652

            
653
/// Abstract implementation for circuit management.
654
///
655
/// The algorithm provided here is fairly simple. In its simplest form:
656
///
657
/// When somebody asks for a circuit for a given operation: if we find
658
/// one open already, we return it.  If we find in-progress circuits
659
/// that would meet our needs, we wait for one to finish (or for all
660
/// to fail).  And otherwise, we launch one or more circuits to meet the
661
/// request's needs.
662
///
663
/// If this process fails, then we retry it, up to a timeout or a
664
/// numerical limit.
665
///
666
/// If a circuit not previously considered for a given request
667
/// finishes before the request is satisfied, and if the circuit would
668
/// satisfy the request, we try to give that circuit as an answer to
669
/// that request even if it was not one of the circuits that request
670
/// was waiting for.
671
pub(crate) struct AbstractCircMgr<B: AbstractCircBuilder, R: Runtime> {
672
    /// Builder used to construct circuits.
673
    builder: B,
674
    /// An asynchronous runtime to use for launching tasks and
675
    /// checking timeouts.
676
    runtime: R,
677
    /// A CircList to manage our list of circuits, requests, and
678
    /// pending circuits.
679
    circs: sync::Mutex<CircList<B>>,
680

            
681
    /// Configured information about when to expire circuits and requests.
682
    circuit_timing: MutCfg<CircuitTiming>,
683

            
684
    /// Minimum lifetime of an unused circuit.
685
    ///
686
    /// Derived from the network parameters.
687
    unused_timing: sync::Mutex<UnusedTimings>,
688
}
689

            
690
/// An action to take in order to satisfy a request for a circuit.
691
enum Action<B: AbstractCircBuilder> {
692
    /// We found an open circuit: return immediately.
693
    Open(B::Circ),
694
    /// We found one or more pending circuits: wait until one succeeds,
695
    /// or all fail.
696
    Wait(FuturesUnordered<Shared<oneshot::Receiver<PendResult<B>>>>),
697
    /// We should launch circuits: here are the instructions for how
698
    /// to do so.
699
    Build(Vec<CircBuildPlan<B>>),
700
}
701

            
702
impl<B: AbstractCircBuilder + 'static, R: Runtime> AbstractCircMgr<B, R> {
703
    /// Construct a new AbstractCircMgr.
704
12
    pub(crate) fn new(builder: B, runtime: R, circuit_timing: CircuitTiming) -> Self {
705
12
        let circs = sync::Mutex::new(CircList::new());
706
12
        let dflt_params = tor_netdir::params::NetParameters::default();
707
12
        let unused_timing = (&dflt_params).into();
708
12
        AbstractCircMgr {
709
12
            builder,
710
12
            runtime,
711
12
            circs,
712
12
            circuit_timing: circuit_timing.into(),
713
12
            unused_timing: sync::Mutex::new(unused_timing),
714
12
        }
715
12
    }
716

            
717
    /// Reconfigure this manager using the latest set of network parameters.
718
    pub(crate) fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
719
        let mut u = self
720
            .unused_timing
721
            .lock()
722
            .expect("Poisoned lock for unused_timing");
723
        *u = p.into();
724
    }
725

            
726
    /// Return this manager's [`CircuitTiming`].
727
90
    pub(crate) fn circuit_timing(&self) -> Arc<CircuitTiming> {
728
90
        self.circuit_timing.get()
729
90
    }
730

            
731
    /// Return this manager's [`CircuitTiming`].
732
    pub(crate) fn set_circuit_timing(&self, new_config: CircuitTiming) {
733
        self.circuit_timing.replace(new_config);
734
    }
735
    /// Return a circuit suitable for use with a given `usage`,
736
    /// creating that circuit if necessary, and restricting it
737
    /// under the assumption that it will be used for that spec.
738
    ///
739
    /// This is the primary entry point for AbstractCircMgr.
740
38
    pub(crate) async fn get_or_launch(
741
38
        self: &Arc<Self>,
742
38
        usage: &<B::Spec as AbstractSpec>::Usage,
743
38
        dir: DirInfo<'_>,
744
38
    ) -> Result<B::Circ> {
745
38
        let circuit_timing = self.circuit_timing();
746
38
        let wait_for_circ = circuit_timing.request_timeout;
747
38
        let timeout_at = self.runtime.now() + wait_for_circ;
748
38
        let max_tries = circuit_timing.request_max_retries;
749
38

            
750
38
        let mut retry_err = RetryError::<Box<Error>>::in_attempt_to("find or build a circuit");
751

            
752
106
        for n in 1..(max_tries + 1) {
753
            // How much time is remaining?
754
106
            let remaining = match timeout_at.checked_duration_since(self.runtime.now()) {
755
                None => {
756
                    retry_err.push(Error::RequestTimeout);
757
                    break;
758
                }
759
106
                Some(t) => t,
760
106
            };
761
106

            
762
106
            match self.prepare_action(usage, dir, true) {
763
74
                Ok(action) => {
764
                    // We successfully found an action: Take that action.
765
74
                    let outcome = self
766
74
                        .runtime
767
200
                        .timeout(remaining, Arc::clone(self).take_action(action, usage))
768
200
                        .await;
769

            
770
72
                    match outcome {
771
34
                        Ok(Ok(circ)) => return Ok(circ),
772
38
                        Ok(Err(e)) => {
773
                            info!("Circuit attempt {} failed.", n);
774
38
                            retry_err.extend(e);
775
                        }
776
                        Err(_) => {
777
2
                            retry_err.push(Error::RequestTimeout);
778
2
                            break;
779
                        }
780
                    }
781
                }
782
32
                Err(e) => {
783
                    // We couldn't pick the action! This is unusual; wait
784
                    // a little while before we try again.
785
                    info!("Couldn't pick action for circuit attempt {}: {}", n, &e);
786
32
                    retry_err.push(e);
787
32
                    let wait_for_action = Duration::from_millis(50);
788
32
                    self.runtime
789
32
                        .sleep(std::cmp::min(remaining, wait_for_action))
790
32
                        .await;
791
                }
792
            };
793
        }
794

            
795
4
        Err(Error::RequestFailed(retry_err))
796
38
    }
797

            
798
    /// Make sure a circuit exists, without actually asking for it.
799
    ///
800
    /// Make sure that there is a circuit (built or in-progress) that could be
801
    /// used for `usage`, and launch one or more circuits in a background task
802
    /// if there is not.
803
    // TODO: This should probably take some kind of parallelism parameter.
804
    #[allow(dead_code)]
805
2
    pub(crate) async fn ensure_circuit(
806
2
        self: &Arc<Self>,
807
2
        usage: &<B::Spec as AbstractSpec>::Usage,
808
2
        dir: DirInfo<'_>,
809
2
    ) -> Result<()> {
810
2
        let action = self.prepare_action(usage, dir, false)?;
811
2
        if let Action::Build(plans) = action {
812
4
            for plan in plans {
813
2
                let self_clone = Arc::clone(self);
814
2
                let _ignore_receiver = self_clone.spawn_launch(usage, plan);
815
2
            }
816
        }
817

            
818
2
        Ok(())
819
2
    }
820

            
821
    /// Choose which action we should take in order to provide a circuit
822
    /// for a given `usage`.
823
    ///
824
    /// If `restrict_circ` is true, we restrict the spec of any
825
    /// circ we decide to use to mark that it _is_ being used for
826
    /// `usage`.
827
108
    fn prepare_action(
828
108
        &self,
829
108
        usage: &<B::Spec as AbstractSpec>::Usage,
830
108
        dir: DirInfo<'_>,
831
108
        restrict_circ: bool,
832
108
    ) -> Result<Action<B>> {
833
108
        let mut list = self.circs.lock().expect("poisoned lock");
834

            
835
108
        if let Some(mut open) = list.find_open(usage) {
836
            // We have open circuits that meet the spec: return the best one.
837
18
            let parallelism = self.builder.select_parallelism(usage);
838
18
            let best = OpenEntry::find_best(&mut open, usage, parallelism);
839
18
            if restrict_circ {
840
18
                let now = self.runtime.now();
841
18
                best.restrict_mut(usage, now)?;
842
            }
843
            // TODO: If we have fewer circuits here than our select
844
            // parallelism, perhaps we should launch more?
845

            
846
18
            return Ok(Action::Open(best.circ.clone()));
847
90
        }
848

            
849
90
        if let Some(pending) = list.find_pending_circs(usage) {
850
            // There are pending circuits that could meet the spec.
851
            // Restrict them under the assumption that they could all
852
            // be used for this, and then wait until one is ready (or
853
            // all have failed)
854
7
            let best = PendingEntry::find_best(&pending, usage);
855
7
            if restrict_circ {
856
14
                for item in &best {
857
                    // TODO: Do we want to tentatively restrict _all_ of these?
858
                    // not clear to me.
859
7
                    item.tentative_restrict_mut(usage)?;
860
                }
861
            }
862
7
            let stream = best.iter().map(|item| item.receiver.clone()).collect();
863
7
            // TODO: if we have fewer circuits here than our launch
864
7
            // parallelism, we might want to launch more.
865
7

            
866
7
            return Ok(Action::Wait(stream));
867
83
        }
868
83

            
869
83
        // Okay, we need to launch circuits here.
870
83
        let parallelism = std::cmp::max(1, self.builder.launch_parallelism(usage));
871
83
        let mut plans = Vec::new();
872
83
        for _ in 0..parallelism {
873
83
            let (pending, plan) = self.plan_by_usage(dir, usage)?;
874
51
            list.add_pending_circ(pending);
875
51
            plans.push(plan);
876
        }
877
51
        Ok(Action::Build(plans))
878
108
    }
879

            
880
    /// Execute an action returned by pick-action, and return the
881
    /// resulting circuit or error.
882
73
    async fn take_action(
883
73
        self: Arc<Self>,
884
73
        act: Action<B>,
885
73
        usage: &<B::Spec as AbstractSpec>::Usage,
886
73
    ) -> std::result::Result<B::Circ, RetryError<Box<Error>>> {
887
        // Get or make a stream of futures to wait on.
888
73
        let wait_on_stream = match act {
889
18
            Action::Open(c) => {
890
18
                return Ok(c);
891
            }
892
7
            Action::Wait(f) => f,
893
48
            Action::Build(plans) => {
894
48
                let futures = FuturesUnordered::new();
895
96
                for plan in plans {
896
48
                    let self_clone = Arc::clone(&self);
897
48
                    // (This is where we actually launch circuits.)
898
48
                    futures.push(self_clone.spawn_launch(usage, plan));
899
48
                }
900
49
                futures
901
            }
902
        };
903

            
904
        // Insert ourself into the list of pending requests, and make a
905
        // stream for us to listen on for notification from pending circuits
906
        // other than those we are pending on.
907
56
        let (pending_request, additional_stream) = {
908
56
            let (send, recv) = mpsc::channel(8);
909
56
            let pending = Arc::new(PendingRequest {
910
56
                usage: usage.clone(),
911
56
                notify: send,
912
56
            });
913
56

            
914
56
            let mut list = self.circs.lock().expect("poisoned lock");
915
56
            list.add_pending_request(&pending);
916
56

            
917
56
            (pending, recv)
918
56
        };
919
56

            
920
56
        // We use our "select_biased" stream combiner here to ensure
921
56
        // that:
922
56
        //   1) Circuits from wait_on_stream (the ones we're pending
923
56
        //      on) are preferred.
924
56
        //   2) We exit this function when those circuits are exhausted.
925
56
        //   3) We still get notified about other circuits that might
926
56
        //      meet our interests.
927
56
        let mut incoming = streams::select_biased(wait_on_stream, additional_stream.map(Ok));
928
56

            
929
56
        let mut retry_error = RetryError::in_attempt_to("wait for circuits");
930

            
931
200
        while let Some((src, id)) = incoming.next().await {
932
55
            match id {
933
18
                Ok(Ok(ref id)) => {
934
18
                    // Great, we have a circuit. See if we can use it!
935
18
                    let mut list = self.circs.lock().expect("poisoned lock");
936
18
                    if let Some(ent) = list.get_open_mut(id) {
937
17
                        let now = self.runtime.now();
938
17
                        match ent.restrict_mut(usage, now) {
939
                            Ok(()) => {
940
                                // Great, this will work.  We drop the
941
                                // pending request now explicitly to remove
942
                                // it from the list.
943
16
                                drop(pending_request);
944
16
                                if matches!(ent.expiration, ExpirationInfo::Unused { .. }) {
945
                                    // Since this circuit hasn't been used yet, schedule expiration task after `max_dirtiness` from now.
946
                                    spawn_expiration_task(
947
                                        &self.runtime,
948
                                        Arc::downgrade(&self),
949
                                        ent.circ.id(),
950
                                        now + self.circuit_timing().max_dirtiness,
951
                                    );
952
16
                                }
953
16
                                return Ok(ent.circ.clone());
954
                            }
955
1
                            Err(e) => {
956
1
                                // TODO: as below, improve this log message.
957
1
                                if src == streams::Source::Left {
958
                                    info!(
959
                                        "{:?} suggested we use {:?}, but restrictions failed: {:?}",
960
                                        src, id, &e
961
                                    );
962
                                } else {
963
                                    debug!(
964
                                        "{:?} suggested we use {:?}, but restrictions failed: {:?}",
965
                                        src, id, &e
966
                                    );
967
                                }
968
1
                                if src == streams::Source::Left {
969
1
                                    retry_error.push(e);
970
1
                                }
971
1
                                continue;
972
                            }
973
                        }
974
1
                    }
975
                }
976
37
                Ok(Err(ref e)) => {
977
                    debug!("{:?} sent error {:?}", src, e);
978
37
                    if src == streams::Source::Left {
979
37
                        retry_error.push(e.clone());
980
37
                    }
981
                }
982
                Err(oneshot::Canceled) => {
983
                    debug!(
984
                        "{:?} went away (Canceled), quitting take_action right away",
985
                        src
986
                    );
987
                    retry_error.push(Error::PendingCanceled);
988
                    return Err(retry_error);
989
                }
990
            }
991

            
992
            // TODO: Improve this log message; using :? here will make it
993
            // hard to understand.
994
            info!("While waiting on circuit: {:?} from {:?}", id, src);
995
        }
996

            
997
        // Nothing worked.  We drop the pending request now explicitly
998
        // to remove it from the list.  (We could just let it get dropped
999
        // implicitly, but that's a bit confusing.)
38
        drop(pending_request);
38

            
38
        Err(retry_error)
72
    }

            
    /// Given a directory and usage, compute the necessary objects to
    /// build a circuit: A [`PendingEntry`] to keep track of the in-process
    /// circuit, and a [`CircBuildPlan`] that we'll give to the thread
    /// that will build the circuit.
    ///
    /// The caller should probably add the resulting `PendingEntry` to
    /// `self.circs`.
    ///
    /// This is an internal function that we call when we're pretty sure
    /// we want to build a circuit.
84
    fn plan_by_usage(
84
        &self,
84
        dir: DirInfo<'_>,
84
        usage: &<B::Spec as AbstractSpec>::Usage,
84
    ) -> Result<(Arc<PendingEntry<B>>, CircBuildPlan<B>)> {
84
        let (plan, bspec) = self.builder.plan_circuit(usage, dir)?;
52
        let (pending, sender) = PendingEntry::new(&bspec);
52
        let pending = Arc::new(pending);
52

            
52
        let plan = CircBuildPlan {
52
            plan,
52
            sender,
52
            pending: Arc::clone(&pending),
52
        };
52

            
52
        Ok((pending, plan))
84
    }

            
    /// Launch a managed circuit for a target usage, without checking
    /// whether one already exists or is pending.
    ///
    /// Return a listener that will be informed when the circuit is done.
1
    pub(crate) fn launch_by_usage(
1
        self: &Arc<Self>,
1
        usage: &<B::Spec as AbstractSpec>::Usage,
1
        dir: DirInfo<'_>,
1
    ) -> Result<Shared<oneshot::Receiver<PendResult<B>>>> {
1
        let (pending, plan) = self.plan_by_usage(dir, usage)?;

            
1
        self.circs
1
            .lock()
1
            .expect("Poisoned lock for circuit list")
1
            .add_pending_circ(pending);
1

            
1
        Ok(Arc::clone(self).spawn_launch(usage, plan))
1
    }

            
    /// Spawn a background task to launch a circuit, and report its status.
    ///
    /// The `usage` argument is the usage from the original request that made
    /// us build this circuit.
51
    fn spawn_launch(
51
        self: Arc<Self>,
51
        usage: &<B::Spec as AbstractSpec>::Usage,
51
        plan: CircBuildPlan<B>,
51
    ) -> Shared<oneshot::Receiver<PendResult<B>>> {
51
        let _ = usage; // Currently unused.
51
        let CircBuildPlan {
51
            mut plan,
51
            sender,
51
            pending,
51
        } = plan;
51
        let request_loyalty = self.circuit_timing().request_loyalty;
51

            
51
        let wait_on_future = pending.receiver.clone();
51
        let runtime = self.runtime.clone();
51
        let runtime_copy = self.runtime.clone();
51

            
51
        let tid = rand::random::<u64>();
51
        // We release this block when the circuit builder task terminates.
51
        let reason = format!("circuit builder task {}", tid);
51
        runtime.block_advance(reason.clone());
51
        // During tests, the `FakeBuilder` will need to release the block in order to fake a timeout
51
        // correctly.
51
        plan.add_blocked_advance_reason(reason);
51

            
51
        runtime
52
            .spawn(async move {
52
                let self_clone = Arc::clone(&self);
52
                let future = AssertUnwindSafe(self_clone.do_launch(plan, pending)).catch_unwind();
52
                let (new_spec, reply) = match future.await {
48
                    Ok(x) => x, // Success or regular failure
                    Err(e) => {
                        // Okay, this is a panic.  We have to tell the calling
                        // thread about it, then exit this circuit builder task.
                        let _ = sender.send(Err(internal!("circuit build task panicked").into()));
                        std::panic::panic_any(e);
                    }
                };

            
                // Tell anybody who was listening about it that this
                // circuit is now usable or failed.
                //
                // (We ignore any errors from `send`: That just means that nobody
                // was waiting for this circuit.)
48
                let _ = sender.send(reply.clone());

            
48
                if let Some(new_spec) = new_spec {
                    // Wait briefly before we notify opportunistically.  This
                    // delay will give the circuits that were originally
                    // specifically intended for a request a little more time
                    // to finish, before we offer it this circuit instead.
13
                    let sl = runtime_copy.sleep(request_loyalty);
13
                    runtime_copy.allow_one_advance(request_loyalty);
13
                    sl.await;

            
8
                    let pending = {
8
                        let list = self.circs.lock().expect("poisoned lock");
8
                        list.find_pending_requests(&new_spec)
                    };
10
                    for pending_request in pending {
2
                        let _ = pending_request.notify.clone().try_send(reply.clone());
2
                    }
35
                }
43
                runtime_copy.release_advance(format!("circuit builder task {}", tid));
51
            })
51
            .expect("Couldn't spawn circuit-building task");
51

            
51
        wait_on_future
51
    }

            
    /// Run in the background to launch a circuit. Return a 2-tuple of the new
    /// circuit spec and the outcome that should be sent to the initiator.
52
    async fn do_launch(
52
        self: Arc<Self>,
52
        plan: <B as AbstractCircBuilder>::Plan,
52
        pending: Arc<PendingEntry<B>>,
52
    ) -> (Option<<B as AbstractCircBuilder>::Spec>, PendResult<B>) {
52
        let outcome = self.builder.build_circuit(plan).await;

            
48
        match outcome {
35
            Err(e) => (None, Err(e)),
13
            Ok((new_spec, circ)) => {
13
                let id = circ.id();
13

            
13
                let use_duration = self.pick_use_duration();
13
                let exp_inst = self.runtime.now() + use_duration;
13
                let runtime_copy = self.runtime.clone();
13
                spawn_expiration_task(&runtime_copy, Arc::downgrade(&self), circ.id(), exp_inst);
13
                // I used to call restrict_mut here, but now I'm not so
13
                // sure. Doing restrict_mut makes sure that this
13
                // circuit will be suitable for the request that asked
13
                // for us in the first place, but that should be
13
                // ensured anyway by our tracking its tentative
13
                // assignment.
13
                //
13
                // new_spec.restrict_mut(&usage_copy).unwrap();
13
                let use_before = ExpirationInfo::new(exp_inst);
13
                let open_ent = OpenEntry::new(new_spec.clone(), circ, use_before);
13
                {
13
                    let mut list = self.circs.lock().expect("poisoned lock");
13
                    if list.circ_is_pending(&pending) {
13
                        list.add_open(open_ent);
13
                        // We drop our reference to 'pending' here:
13
                        // this should make all the weak references to
13
                        // the `PendingEntry` become dangling.
13
                        drop(pending);
13
                        (Some(new_spec), Ok(id))
                    } else {
                        // This circuit is no longer pending! It must have been cancelled.
                        drop(pending); // ibid
                        (None, Err(Error::CircCanceled))
                    }
                }
            }
        }
48
    }

            
    /// Remove the circuit with a given `id` from this manager.
    ///
    /// After this function is called, that circuit will no longer be handed
    /// out to any future requests.
    ///
    /// Return None if we have no circuit with the given ID.
2
    pub(crate) fn take_circ(&self, id: &<B::Circ as AbstractCirc>::Id) -> Option<B::Circ> {
2
        let mut list = self.circs.lock().expect("poisoned lock");
2
        list.take_open(id).map(|e| e.circ)
2
    }

            
    /// Remove all circuits from this manager, to ensure they can't be given out for any more
    /// requests.
    pub(crate) fn retire_all_circuits(&self) {
        let mut list = self.circs.lock().expect("poisoned lock");
        list.clear_all_circuits();
    }

            
    /// Expire circuits according to the rules in `config` and the
    /// current time `now`.
    ///
    /// Expired circuits will not be automatically closed, but they will
    /// no longer be given out for new circuits.
1
    pub(crate) fn expire_circs(&self, now: Instant) {
1
        let mut list = self.circs.lock().expect("poisoned lock");
1
        let dirty_cutoff = now - self.circuit_timing().max_dirtiness;
1
        list.expire_circs(now, dirty_cutoff);
1
    }

            
    /// Consider expiring the circuit with given circuit `id`,
    /// according to the rules in `config` and the current time `now`.
    pub(crate) fn expire_circ(&self, circ_id: &<B::Circ as AbstractCirc>::Id, now: Instant) {
        let mut list = self.circs.lock().expect("poisoned lock");
        let dirty_cutoff = now - self.circuit_timing().max_dirtiness;
        list.expire_circ(circ_id, now, dirty_cutoff);
    }

            
    /// Return the number of open circuits held by this circuit manager.
6
    pub(crate) fn n_circs(&self) -> usize {
6
        let list = self.circs.lock().expect("poisoned lock");
6
        list.open_circs.len()
6
    }

            
    /// Return the number of pending circuits tracked by this circuit manager.
    #[cfg(test)]
2
    pub(crate) fn n_pending_circs(&self) -> usize {
2
        let list = self.circs.lock().expect("poisoned lock");
2
        list.pending_circs.len()
2
    }

            
    /// Get a reference to this manager's runtime.
2
    pub(crate) fn peek_runtime(&self) -> &R {
2
        &self.runtime
2
    }

            
    /// Get a reference to this manager's builder.
3
    pub(crate) fn peek_builder(&self) -> &B {
3
        &self.builder
3
    }

            
    /// Pick a duration by when a new circuit should expire from now
    /// if it has not yet been used
13
    fn pick_use_duration(&self) -> Duration {
13
        let timings = self
13
            .unused_timing
13
            .lock()
13
            .expect("Poisoned lock for unused_timing");
13

            
13
        if self.builder.learning_timeouts() {
            timings.learning
        } else {
            // TODO: In Tor, this calculation also depends on
            // stuff related to predicted ports and channel
            // padding.
            use rand::Rng;
13
            let mut rng = rand::thread_rng();
13
            rng.gen_range(timings.not_learning..timings.not_learning * 2)
        }
13
    }
}

            
/// Spawn an expiration task that expires a circuit at given instant.
///
/// If given instant is earlier than now, expire the circuit immediately.
/// Otherwise, spawn a timer expiration task on given runtime.
///
/// When the timeout occurs, if the circuit manager is still present,
/// the task will ask the manager to expire the circuit, if the circuit
/// is ready to expire.
13
fn spawn_expiration_task<B, R>(
13
    runtime: &R,
13
    circmgr: Weak<AbstractCircMgr<B, R>>,
13
    circ_id: <<B as AbstractCircBuilder>::Circ as AbstractCirc>::Id,
13
    exp_inst: Instant,
13
) where
13
    R: Runtime,
13
    B: 'static + AbstractCircBuilder,
13
{
13
    let now = runtime.now();
13
    let rt_copy = runtime.clone();
13
    let duration = exp_inst.saturating_duration_since(now);
13

            
13
    if duration == Duration::ZERO {
        // Circuit should already expire. Expire it now.
        let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
            cm
        } else {
            // Circuits manager has already been dropped, so are the references it held.
            return;
        };
        cm.expire_circ(&circ_id, now);
    } else {
        // Spawn a timer expiration task with given expiration instant.
13
        if let Err(e) = runtime.spawn(async move {
13
            rt_copy.sleep(duration).await;
            let cm = if let Some(cm) = Weak::upgrade(&circmgr) {
                cm
            } else {
                return;
            };
            cm.expire_circ(&circ_id, exp_inst);
13
        }) {
            warn!("Unable to launch expiration task: {}", e);
13
        }
    }
13
}

            
#[cfg(test)]
mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::usage::{ExitPolicy, SupportedCircUsage};
    use crate::{Error, StreamIsolation, TargetCircUsage, TargetPort};
    use std::collections::BTreeSet;
    use std::sync::atomic::{self, AtomicUsize};
    use tor_error::bad_api_usage;
    use tor_netdir::testnet;
    use tor_rtcompat::SleepProvider;
    use tor_rtmock::MockSleepRuntime;
    use tracing::trace;

            
    #[derive(Debug, Clone, Eq, PartialEq, Hash, Copy)]
    struct FakeId {
        id: usize,
    }

            
    static NEXT_FAKE_ID: AtomicUsize = AtomicUsize::new(0);
    impl FakeId {
        fn next() -> Self {
            let id = NEXT_FAKE_ID.fetch_add(1, atomic::Ordering::SeqCst);
            FakeId { id }
        }
    }

            
    #[derive(Debug, PartialEq, Clone)]
    struct FakeCirc {
        id: FakeId,
    }

            
    impl FakeCirc {
        fn eq(&self, other: &Self) -> bool {
            self.id == other.id
        }
    }

            
    impl AbstractCirc for FakeCirc {
        type Id = FakeId;
        fn id(&self) -> FakeId {
            self.id
        }
        fn usable(&self) -> bool {
            true
        }
    }

            
    #[derive(Clone, Debug, Eq, PartialEq, Hash)]
    struct FakeSpec {
        ports: BTreeSet<u16>,
        isolation_group: Option<u8>,
    }

            
    impl AbstractSpec for FakeSpec {
        type Usage = FakeSpec;
        fn supports(&self, other: &FakeSpec) -> bool {
            let ports_ok = self.ports.is_superset(&other.ports);
            let iso_ok = match (self.isolation_group, other.isolation_group) {
                (None, _) => true,
                (_, None) => true,
                (Some(a), Some(b)) => a == b,
            };
            ports_ok && iso_ok
        }
        fn restrict_mut(&mut self, other: &FakeSpec) -> Result<()> {
            if !self.ports.is_superset(&other.ports) {
                return Err(bad_api_usage!("not supported").into());
            }
            let new_iso = match (self.isolation_group, other.isolation_group) {
                (None, x) => x,
                (x, None) => x,
                (Some(a), Some(b)) if a == b => Some(a),
                (_, _) => return Err(bad_api_usage!("not supported").into()),
            };

            
            self.isolation_group = new_iso;
            Ok(())
        }
    }

            
    impl FakeSpec {
        fn new<T>(ports: T) -> Self
        where
            T: IntoIterator,
            T::Item: Into<u16>,
        {
            let ports = ports.into_iter().map(Into::into).collect();
            FakeSpec {
                ports,
                isolation_group: None,
            }
        }
        fn isolated(self, group: u8) -> Self {
            FakeSpec {
                ports: self.ports,
                isolation_group: Some(group),
            }
        }
    }

            
    #[derive(Debug, Clone)]
    struct FakePlan {
        spec: FakeSpec,
        op: FakeOp,
    }

            
    #[derive(Debug)]
    struct FakeBuilder<RT: Runtime> {
        runtime: RT,
        script: sync::Mutex<HashMap<FakeSpec, Vec<FakeOp>>>,
    }

            
    #[derive(Debug, Clone)]
    enum FakeOp {
        Succeed,
        Fail,
        Delay(Duration),
        Timeout,
        TimeoutReleaseAdvance(String),
        NoPlan,
        WrongSpec(FakeSpec),
    }

            
    impl MockablePlan for FakePlan {
        fn add_blocked_advance_reason(&mut self, reason: String) {
            if let FakeOp::Timeout = self.op {
                self.op = FakeOp::TimeoutReleaseAdvance(reason);
            }
        }
    }

            
    const FAKE_CIRC_DELAY: Duration = Duration::from_millis(30);

            
    static DI_EMPTY: [tor_netdir::fallback::FallbackDir; 0] = [];

            
    fn di() -> DirInfo<'static> {
        DI_EMPTY[..].into()
    }

            
    #[async_trait]
    impl<RT: Runtime> AbstractCircBuilder for FakeBuilder<RT> {
        type Spec = FakeSpec;
        type Circ = FakeCirc;
        type Plan = FakePlan;

            
        fn plan_circuit(&self, spec: &FakeSpec, _dir: DirInfo<'_>) -> Result<(FakePlan, FakeSpec)> {
            let next_op = self.next_op(spec);
            if matches!(next_op, FakeOp::NoPlan) {
                return Err(Error::NoPath("No relays for you".into()));
            }
            let plan = FakePlan {
                spec: spec.clone(),
                op: next_op,
            };
            Ok((plan, spec.clone()))
        }

            
        async fn build_circuit(&self, plan: FakePlan) -> Result<(FakeSpec, FakeCirc)> {
            let op = plan.op;
            let sl = self.runtime.sleep(FAKE_CIRC_DELAY);
            self.runtime.allow_one_advance(FAKE_CIRC_DELAY);
            sl.await;
            match op {
                FakeOp::Succeed => Ok((plan.spec, FakeCirc { id: FakeId::next() })),
                FakeOp::WrongSpec(s) => Ok((s, FakeCirc { id: FakeId::next() })),
                FakeOp::Fail => Err(Error::PendingCanceled),
                FakeOp::Delay(d) => {
                    let sl = self.runtime.sleep(d);
                    self.runtime.allow_one_advance(d);
                    sl.await;
                    Err(Error::PendingCanceled)
                }
                FakeOp::Timeout => unreachable!(), // should be converted to the below
                FakeOp::TimeoutReleaseAdvance(reason) => {
                    trace!("releasing advance to fake a timeout");
                    self.runtime.release_advance(reason);
                    let () = futures::future::pending().await;
                    unreachable!()
                }
                FakeOp::NoPlan => unreachable!(),
            }
        }

            
        fn learning_timeouts(&self) -> bool {
            false
        }
    }

            
    impl<RT: Runtime> FakeBuilder<RT> {
        fn new(rt: &RT) -> Self {
            FakeBuilder {
                runtime: rt.clone(),
                script: sync::Mutex::new(HashMap::new()),
            }
        }

            
        /// set a plan for a given FakeSpec.
        fn set<I>(&self, spec: FakeSpec, v: I)
        where
            I: IntoIterator<Item = FakeOp>,
        {
            let mut ops: Vec<_> = v.into_iter().collect();
            ops.reverse();
            let mut lst = self.script.lock().unwrap();
            lst.insert(spec, ops);
        }

            
        fn next_op(&self, spec: &FakeSpec) -> FakeOp {
            let mut script = self.script.lock().unwrap();
            let mut s = script.get_mut(spec);
            match s {
                None => FakeOp::Succeed,
                Some(ref mut lst) => lst.pop().unwrap_or(FakeOp::Succeed),
            }
        }
    }

            
    #[test]
    fn basic_tests() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);

            
            let builder = FakeBuilder::new(&rt);

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));

            
            let webports = FakeSpec::new(vec![80_u16, 443]);

            
            // Check initialization.
            assert_eq!(mgr.n_circs(), 0);
            assert!(mgr.peek_builder().script.lock().unwrap().is_empty());

            
            // Launch a circuit; make sure we get it.
            let c1 = rt.wait_for(mgr.get_or_launch(&webports, di())).await;
            let c1 = c1.unwrap();
            assert_eq!(mgr.n_circs(), 1);

            
            // Make sure we get the one we already made if we ask for it.
            let port80 = FakeSpec::new(vec![80_u16]);
            let c2 = mgr.get_or_launch(&port80, di()).await;

            
            let c2 = c2.unwrap();
            assert!(FakeCirc::eq(&c1, &c2));
            assert_eq!(mgr.n_circs(), 1);

            
            // Now try launching two circuits "at once" to make sure that our
            // pending-circuit code works.

            
            let dnsport = FakeSpec::new(vec![53_u16]);
            let dnsport_restrict = dnsport.clone().isolated(7);

            
            let (c3, c4) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&dnsport, di()),
                    mgr.get_or_launch(&dnsport_restrict, di()),
                ))
                .await;

            
            let c3 = c3.unwrap();
            let c4 = c4.unwrap();
            assert!(!FakeCirc::eq(&c1, &c3));
            assert!(FakeCirc::eq(&c3, &c4));
            assert_eq!(c3.id(), c4.id());
            assert_eq!(mgr.n_circs(), 2);

            
            // Now we're going to remove c3 from consideration.  It's the
            // same as c4, so removing c4 will give us None.
            let c3_taken = mgr.take_circ(&c3.id()).unwrap();
            let now_its_gone = mgr.take_circ(&c4.id());
            assert!(FakeCirc::eq(&c3_taken, &c3));
            assert!(now_its_gone.is_none());
            assert_eq!(mgr.n_circs(), 1);

            
            // Having removed them, let's launch another dnsport and make
            // sure we get a different circuit.
            let c5 = rt.wait_for(mgr.get_or_launch(&dnsport, di())).await;
            let c5 = c5.unwrap();
            assert!(!FakeCirc::eq(&c3, &c5));
            assert!(!FakeCirc::eq(&c4, &c5));
            assert_eq!(mgr.n_circs(), 2);

            
            // Now try launch_by_usage.
            let prev = mgr.n_pending_circs();
            assert!(mgr.launch_by_usage(&dnsport, di()).is_ok());
            assert_eq!(mgr.n_pending_circs(), prev + 1);
            // TODO: Actually make sure that launch_by_usage launched
            // the right thing.
        });
    }

            
    #[test]
    fn request_timeout() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);

            
            let ports = FakeSpec::new(vec![80_u16, 443]);

            
            // This will fail once, and then completely time out.  The
            // result will be a failure.
            let builder = FakeBuilder::new(&rt);
            builder.set(ports.clone(), vec![FakeOp::Fail, FakeOp::Timeout]);

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = mgr
                .peek_runtime()
                .wait_for(mgr.get_or_launch(&ports, di()))
                .await;

            
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }

            
    #[test]
    fn request_timeout2() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);

            
            // Now try a more complicated case: we'll try to get things so
            // that we wait for a little over our predicted time because
            // of our wait-for-next-action logic.
            let ports = FakeSpec::new(vec![80_u16, 443]);
            let builder = FakeBuilder::new(&rt);
            builder.set(
                ports.clone(),
                vec![
                    FakeOp::Delay(Duration::from_millis(60_000 - 25)),
                    FakeOp::NoPlan,
                ],
            );

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = mgr
                .peek_runtime()
                .wait_for(mgr.get_or_launch(&ports, di()))
                .await;

            
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }

            
    #[test]
    fn request_unplannable() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);

            
            let ports = FakeSpec::new(vec![80_u16, 443]);

            
            // This will fail a the planning stages, a lot.
            let builder = FakeBuilder::new(&rt);
            builder.set(ports.clone(), vec![FakeOp::NoPlan; 2000]);

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;

            
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }

            
    #[test]
    fn request_fails_too_much() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);
            let ports = FakeSpec::new(vec![80_u16, 443]);

            
            // This will fail 1000 times, which is above the retry limit.
            let builder = FakeBuilder::new(&rt);
            builder.set(ports.clone(), vec![FakeOp::Fail; 1000]);

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;

            
            assert!(matches!(c1, Err(Error::RequestFailed(_))));
        });
    }

            
    #[test]
    fn request_wrong_spec() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);
            let ports = FakeSpec::new(vec![80_u16, 443]);

            
            // The first time this is called, it will build a circuit
            // with the wrong spec.  (A circuit builder should never
            // actually _do_ that, but it's something we code for.)
            let builder = FakeBuilder::new(&rt);
            builder.set(
                ports.clone(),
                vec![FakeOp::WrongSpec(FakeSpec::new(vec![22_u16]))],
            );

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            let c1 = rt.wait_for(mgr.get_or_launch(&ports, di())).await;

            
            assert!(matches!(c1, Ok(_)));
        });
    }

            
    #[test]
    fn request_retried() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);
            let ports = FakeSpec::new(vec![80_u16, 443]);

            
            // This will fail twice, and then succeed. The result will be
            // a success.
            let builder = FakeBuilder::new(&rt);
            builder.set(ports.clone(), vec![FakeOp::Fail, FakeOp::Fail]);

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));

            
            // This test doesn't exercise any timeout behaviour.
            rt.block_advance("test doesn't require advancing");

            
            let (c1, c2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&ports, di()),
                    mgr.get_or_launch(&ports, di()),
                ))
                .await;

            
            let c1 = c1.unwrap();
            let c2 = c2.unwrap();

            
            assert!(FakeCirc::eq(&c1, &c2));
        });
    }

            
    #[test]
    fn isolated() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);
            let builder = FakeBuilder::new(&rt);
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));

            
            let ports = FakeSpec::new(vec![443_u16]);
            // Set our isolation so that iso1 and iso2 can't share a circuit,
            // but no_iso can share a circuit with either.
            let iso1 = ports.clone().isolated(1);
            let iso2 = ports.clone().isolated(2);
            let no_iso = ports.clone();

            
            // We're going to try launching these circuits in 6 different
            // orders, to make sure that the outcome is correct each time.
            use itertools::Itertools;
            let timeouts: Vec<_> = [0_u64, 5, 10]
                .iter()
                .map(|d| Duration::from_millis(*d))
                .collect();

            
            for delays in timeouts.iter().permutations(3) {
                let d1 = delays[0];
                let d2 = delays[1];
                let d3 = delays[2];
                let (c_iso1, c_iso2, c_none) = rt
                    .wait_for(futures::future::join3(
                        async {
                            rt.sleep(*d1).await;
                            mgr.get_or_launch(&iso1, di()).await
                        },
                        async {
                            rt.sleep(*d2).await;
                            mgr.get_or_launch(&iso2, di()).await
                        },
                        async {
                            rt.sleep(*d3).await;
                            mgr.get_or_launch(&no_iso, di()).await
                        },
                    ))
                    .await;

            
                let c_iso1 = c_iso1.unwrap();
                let c_iso2 = c_iso2.unwrap();
                let c_none = c_none.unwrap();

            
                assert!(!FakeCirc::eq(&c_iso1, &c_iso2));
                assert!(FakeCirc::eq(&c_iso1, &c_none) || FakeCirc::eq(&c_iso2, &c_none));
            }
        });
    }

            
    #[test]
    fn opportunistic() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let rt = MockSleepRuntime::new(rt);

            
            // The first request will time out completely, but we're
            // making a second request after we launch it.  That
            // request should succeed, and notify the first request.

            
            let ports1 = FakeSpec::new(vec![80_u16]);
            let ports2 = FakeSpec::new(vec![80_u16, 443]);

            
            let builder = FakeBuilder::new(&rt);
            builder.set(ports1.clone(), vec![FakeOp::Timeout]);

            
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));
            // Note that ports2 will be wider than ports1, so the second
            // request will have to launch a new circuit.

            
            let (c1, c2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&ports1, di()),
                    async {
                        rt.sleep(Duration::from_millis(100)).await;
                        mgr.get_or_launch(&ports2, di()).await
                    },
                ))
                .await;

            
            if let (Ok(c1), Ok(c2)) = (c1, c2) {
                assert!(FakeCirc::eq(&c1, &c2));
            } else {
                panic!();
            };
        });
    }

            
    #[test]
    fn prebuild() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            // This time we're going to use ensure_circuit() to make
            // sure that a circuit gets built, and then launch two
            // other circuits that will use it.
            let rt = MockSleepRuntime::new(rt);
            let builder = FakeBuilder::new(&rt);
            let mgr = Arc::new(AbstractCircMgr::new(
                builder,
                rt.clone(),
                CircuitTiming::default(),
            ));

            
            let ports1 = FakeSpec::new(vec![80_u16, 443]);
            let ports2 = FakeSpec::new(vec![80_u16]);
            let ports3 = FakeSpec::new(vec![443_u16]);
            let (ok, c1, c2) = rt
                .wait_for(futures::future::join3(
                    mgr.ensure_circuit(&ports1, di()),
                    async {
                        rt.sleep(Duration::from_millis(10)).await;
                        mgr.get_or_launch(&ports2, di()).await
                    },
                    async {
                        rt.sleep(Duration::from_millis(50)).await;
                        mgr.get_or_launch(&ports3, di()).await
                    },
                ))
                .await;

            
            assert!(ok.is_ok());

            
            let c1 = c1.unwrap();
            let c2 = c2.unwrap();

            
            // If we had launched these separately, they wouldn't share
            // a circuit.
            assert!(FakeCirc::eq(&c1, &c2));
        });
    }

            
    #[test]
    fn expiration() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            use crate::config::CircuitTimingBuilder;
            // Now let's make some circuits -- one dirty, one clean, and
            // make sure that one expires and one doesn't.
            let rt = MockSleepRuntime::new(rt);
            let builder = FakeBuilder::new(&rt);

            
            let circuit_timing = CircuitTimingBuilder::default()
                .max_dirtiness(Duration::from_secs(15))
                .build()
                .unwrap();

            
            let mgr = Arc::new(AbstractCircMgr::new(builder, rt.clone(), circuit_timing));

            
            let imap = FakeSpec::new(vec![993_u16]);
            let pop = FakeSpec::new(vec![995_u16]);

            
            let (ok, pop1) = rt
                .wait_for(futures::future::join(
                    mgr.ensure_circuit(&imap, di()),
                    mgr.get_or_launch(&pop, di()),
                ))
                .await;

            
            assert!(ok.is_ok());
            let pop1 = pop1.unwrap();

            
            rt.advance(Duration::from_secs(30)).await;
            rt.advance(Duration::from_secs(15)).await;
            let imap1 = rt.wait_for(mgr.get_or_launch(&imap, di())).await.unwrap();

            
            // This should expire the pop circuit, since it came from
            // get_or_launch() [which marks the circuit as being
            // used].  It should not expire the imap circuit, since
            // it was not dirty until 15 seconds after the cutoff.
            let now = rt.now();

            
            mgr.expire_circs(now);

            
            let (pop2, imap2) = rt
                .wait_for(futures::future::join(
                    mgr.get_or_launch(&pop, di()),
                    mgr.get_or_launch(&imap, di()),
                ))
                .await;

            
            let pop2 = pop2.unwrap();
            let imap2 = imap2.unwrap();

            
            assert!(!FakeCirc::eq(&pop2, &pop1));
            assert!(FakeCirc::eq(&imap2, &imap1));
        });
    }

            
    /// Returns three exit policies; one that permits nothing, one that permits ports 80
    /// and 443 only, and one that permits all ports.
    fn get_exit_policies() -> (ExitPolicy, ExitPolicy, ExitPolicy) {
        // FIXME(eta): the below is copypasta; would be nice to have a better way of
        //             constructing ExitPolicy objects for testing maybe
        let network = testnet::construct_netdir()
            .unwrap()
            .unwrap_if_sufficient()
            .unwrap();

            
        // Nodes with ID 0x0a through 0x13 and 0x1e through 0x27 are
        // exits.  Odd-numbered ones allow only ports 80 and 443;
        // even-numbered ones allow all ports.
        let id_noexit = [0x05; 32].into();
        let id_webexit = [0x11; 32].into();
        let id_fullexit = [0x20; 32].into();

            
        let not_exit = network.by_id(&id_noexit).unwrap();
        let web_exit = network.by_id(&id_webexit).unwrap();
        let full_exit = network.by_id(&id_fullexit).unwrap();

            
        let ep_none = ExitPolicy::from_relay(&not_exit);
        let ep_web = ExitPolicy::from_relay(&web_exit);
        let ep_full = ExitPolicy::from_relay(&full_exit);
        (ep_none, ep_web, ep_full)
    }

            
    #[test]
    fn test_find_supported() {
        let (ep_none, ep_web, ep_full) = get_exit_policies();
        let fake_circ = FakeCirc { id: FakeId::next() };
        let expiration = ExpirationInfo::Unused {
            use_before: Instant::now() + Duration::from_secs(60 * 60),
        };

            
        let mut entry_none = OpenEntry::new(
            SupportedCircUsage::Exit {
                policy: ep_none,
                isolation: None,
            },
            fake_circ.clone(),
            expiration.clone(),
        );
        let mut entry_none_c = entry_none.clone();
        let mut entry_web = OpenEntry::new(
            SupportedCircUsage::Exit {
                policy: ep_web,
                isolation: None,
            },
            fake_circ.clone(),
            expiration.clone(),
        );
        let mut entry_web_c = entry_web.clone();
        let mut entry_full = OpenEntry::new(
            SupportedCircUsage::Exit {
                policy: ep_full,
                isolation: None,
            },
            fake_circ,
            expiration,
        );
        let mut entry_full_c = entry_full.clone();

            
        let usage_web = TargetCircUsage::Exit {
            ports: vec![TargetPort::ipv4(80)],
            isolation: StreamIsolation::no_isolation(),
        };
        let empty: Vec<&OpenEntry<SupportedCircUsage, FakeCirc>> = vec![];

            
        assert_eq!(
            SupportedCircUsage::find_supported(vec![&mut entry_none].into_iter(), &usage_web),
            empty
        );

            
        // HACK(eta): We have to faff around with clones and such because
        //            `abstract_spec_find_supported` has a silly signature that involves `&mut`
        //            refs, which we can't have more than one of.

            
        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_web,
            ),
            vec![&mut entry_web_c]
        );

            
        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
                &usage_web,
            ),
            vec![&mut entry_web_c, &mut entry_full_c]
        );

            
        // Test preemptive circuit usage:

            
        let usage_preemptive_web = TargetCircUsage::Preemptive {
            port: Some(TargetPort::ipv4(80)),
            circs: 2,
        };
        let usage_preemptive_dns = TargetCircUsage::Preemptive {
            port: None,
            circs: 2,
        };

            
        // shouldn't return anything unless there are >=2 circuits

            
        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none].into_iter(),
                &usage_preemptive_web
            ),
            empty
        );

            
        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none].into_iter(),
                &usage_preemptive_dns
            ),
            empty
        );

            
        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_preemptive_web
            ),
            empty
        );

            
        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web].into_iter(),
                &usage_preemptive_dns
            ),
            vec![&mut entry_none_c, &mut entry_web_c]
        );

            
        assert_eq!(
            SupportedCircUsage::find_supported(
                vec![&mut entry_none, &mut entry_web, &mut entry_full].into_iter(),
                &usage_preemptive_web
            ),
            vec![&mut entry_web_c, &mut entry_full_c]
        );
    }
}