1
//! Facilities to build circuits directly, instead of via a circuit manager.
2

            
3
use crate::path::{OwnedPath, TorPath};
4
use crate::timeouts::{self, Action};
5
use crate::{Error, Result};
6
use async_trait::async_trait;
7
use futures::channel::oneshot;
8
use futures::task::SpawnExt;
9
use futures::Future;
10
use std::convert::TryInto;
11
use std::sync::{
12
    atomic::{AtomicU32, Ordering},
13
    Arc,
14
};
15
use std::time::{Duration, Instant};
16
use tor_chanmgr::ChanMgr;
17
use tor_guardmgr::GuardStatus;
18
use tor_linkspec::{ChanTarget, OwnedChanTarget, OwnedCircTarget};
19
use tor_proto::circuit::{CircParameters, ClientCirc, PendingClientCirc};
20
use tor_rtcompat::{Runtime, SleepProviderExt};
21

            
22
mod guardstatus;
23

            
24
pub(crate) use guardstatus::GuardStatusHandle;
25

            
26
/// Represents an objects that can be constructed in a circuit-like way.
27
///
28
/// This is only a separate trait for testing purposes, so that we can swap
29
/// our some other type when we're testing Builder.
30
///
31
/// TODO: I'd like to have a simpler testing strategy here; this one
32
/// complicates things a bit.
33
#[async_trait]
34
pub(crate) trait Buildable: Sized {
35
    /// Launch a new one-hop circuit to a given relay, given only a
36
    /// channel target `ct` specifying that relay.
37
    ///
38
    /// (Since we don't have a CircTarget here, we can't extend the circuit
39
    /// to be multihop later on.)
40
    async fn create_chantarget<RT: Runtime>(
41
        chanmgr: &ChanMgr<RT>,
42
        rt: &RT,
43
        ct: &OwnedChanTarget,
44
        params: &CircParameters,
45
    ) -> Result<Self>;
46

            
47
    /// Launch a new circuit through a given relay, given a circuit target
48
    /// `ct` specifying that relay.
49
    async fn create<RT: Runtime>(
50
        chanmgr: &ChanMgr<RT>,
51
        rt: &RT,
52
        ct: &OwnedCircTarget,
53
        params: &CircParameters,
54
    ) -> Result<Self>;
55

            
56
    /// Extend this circuit-like object by one hop, to the location described
57
    /// in `ct`.
58
    async fn extend<RT: Runtime>(
59
        &self,
60
        rt: &RT,
61
        ct: &OwnedCircTarget,
62
        params: &CircParameters,
63
    ) -> Result<()>;
64
}
65

            
66
/// Try to make a [`PendingClientCirc`] to a given relay, and start its
67
/// reactor.
68
///
69
/// This is common code, shared by all the first-hop functions in the
70
/// implementation of `Buildable` for `Arc<ClientCirc>`.
71
async fn create_common<RT: Runtime, CT: ChanTarget>(
72
    chanmgr: &ChanMgr<RT>,
73
    rt: &RT,
74
    target: &CT,
75
) -> Result<PendingClientCirc> {
76
    let chan = chanmgr
77
        .get_or_launch(target)
78
        .await
79
        .map_err(|cause| Error::Channel {
80
            peer: OwnedChanTarget::from_chan_target(target),
81
            cause,
82
        })?;
83
    let (pending_circ, reactor) = chan.new_circ().await?;
84

            
85
    rt.spawn(async {
86
        let _ = reactor.run().await;
87
    })
88
    .map_err(|e| Error::from_spawn("circuit reactor task", e))?;
89

            
90
    Ok(pending_circ)
91
}
92

            
93
#[async_trait]
94
impl Buildable for ClientCirc {
95
    async fn create_chantarget<RT: Runtime>(
96
        chanmgr: &ChanMgr<RT>,
97
        rt: &RT,
98
        ct: &OwnedChanTarget,
99
        params: &CircParameters,
100
    ) -> Result<Self> {
101
        let circ = create_common(chanmgr, rt, ct).await?;
102
        Ok(circ.create_firsthop_fast(params).await?)
103
    }
104
    async fn create<RT: Runtime>(
105
        chanmgr: &ChanMgr<RT>,
106
        rt: &RT,
107
        ct: &OwnedCircTarget,
108
        params: &CircParameters,
109
    ) -> Result<Self> {
110
        let circ = create_common(chanmgr, rt, ct).await?;
111
        Ok(circ.create_firsthop_ntor(ct, params.clone()).await?)
112
    }
113
    async fn extend<RT: Runtime>(
114
        &self,
115
        _rt: &RT,
116
        ct: &OwnedCircTarget,
117
        params: &CircParameters,
118
    ) -> Result<()> {
119
        self.extend_ntor(ct, params).await?;
120
        Ok(())
121
    }
122
}
123

            
124
/// An implementation type for [`CircuitBuilder`].
125
///
126
/// A `CircuitBuilder` holds references to all the objects that are needed
127
/// to build circuits correctly.
128
///
129
/// In general, you should not need to construct or use this object yourself,
130
/// unless you are choosing your own paths.
131
struct Builder<R: Runtime, C: Buildable + Sync + Send + 'static> {
132
    /// The runtime used by this circuit builder.
133
    runtime: R,
134
    /// A channel manager that this circuit builder uses to make channels.
135
    chanmgr: Arc<ChanMgr<R>>,
136
    /// An estimator to determine the correct timeouts for circuit building.
137
    timeouts: timeouts::Estimator,
138
    /// We don't actually hold any clientcircs, so we need to put this
139
    /// type here so the compiler won't freak out.
140
    _phantom: std::marker::PhantomData<C>,
141
}
142

            
143
impl<R: Runtime, C: Buildable + Sync + Send + 'static> Builder<R, C> {
144
    /// Construct a new [`Builder`].
145
17
    fn new(runtime: R, chanmgr: Arc<ChanMgr<R>>, timeouts: timeouts::Estimator) -> Self {
146
17
        Builder {
147
17
            runtime,
148
17
            chanmgr,
149
17
            timeouts,
150
17
            _phantom: std::marker::PhantomData,
151
17
        }
152
17
    }
153

            
154
    /// Build a circuit, without performing any timeout operations.
155
    ///
156
    /// After each hop is built, increments n_hops_built.  Make sure that
157
    /// `guard_status` has its pending status set correctly to correspond
158
    /// to a circuit failure at any given stage.
159
    ///
160
    /// (TODO: Find
161
    /// a better design there.)
162
16
    async fn build_notimeout(
163
16
        self: Arc<Self>,
164
16
        path: OwnedPath,
165
16
        params: CircParameters,
166
16
        start_time: Instant,
167
16
        n_hops_built: Arc<AtomicU32>,
168
16
        guard_status: Arc<GuardStatusHandle>,
169
16
    ) -> Result<C> {
170
16
        match path {
171
4
            OwnedPath::ChannelOnly(target) => {
172
4
                // If we fail now, it's the guard's fault.
173
4
                guard_status.pending(GuardStatus::Failure);
174
4
                let circ =
175
4
                    C::create_chantarget(&self.chanmgr, &self.runtime, &target, &params).await?;
176
4
                self.timeouts
177
4
                    .note_hop_completed(0, self.runtime.now() - start_time, true);
178
4
                n_hops_built.fetch_add(1, Ordering::SeqCst);
179
4
                Ok(circ)
180
            }
181
12
            OwnedPath::Normal(p) => {
182
12
                assert!(!p.is_empty());
183
12
                let n_hops = p.len() as u8;
184
12
                // If we fail now, it's the guard's fault.
185
12
                guard_status.pending(GuardStatus::Failure);
186
12
                let circ = C::create(&self.chanmgr, &self.runtime, &p[0], &params).await?;
187
12
                self.timeouts
188
12
                    .note_hop_completed(0, self.runtime.now() - start_time, n_hops == 0);
189
12
                // If we fail after this point, we can't tell whether it's
190
12
                // the fault of the guard or some later relay.
191
12
                guard_status.pending(GuardStatus::Indeterminate);
192
12
                n_hops_built.fetch_add(1, Ordering::SeqCst);
193
12
                let mut hop_num = 1;
194
24
                for relay in p[1..].iter() {
195
24
                    circ.extend(&self.runtime, relay, &params).await?;
196
20
                    n_hops_built.fetch_add(1, Ordering::SeqCst);
197
20
                    self.timeouts.note_hop_completed(
198
20
                        hop_num,
199
20
                        self.runtime.now() - start_time,
200
20
                        hop_num == (n_hops - 1),
201
20
                    );
202
20
                    hop_num += 1;
203
                }
204
8
                Ok(circ)
205
            }
206
        }
207
12
    }
208

            
209
    /// Build a circuit from an [`OwnedPath`].
210
16
    async fn build_owned(
211
16
        self: &Arc<Self>,
212
16
        path: OwnedPath,
213
16
        params: &CircParameters,
214
16
        guard_status: Arc<GuardStatusHandle>,
215
16
    ) -> Result<C> {
216
16
        let action = Action::BuildCircuit { length: path.len() };
217
16
        let (timeout, abandon_timeout) = self.timeouts.timeouts(&action);
218
16
        let start_time = self.runtime.now();
219
16

            
220
16
        // TODO: This is probably not the best way for build_notimeout to
221
16
        // tell us how many hops it managed to build, but at least it is
222
16
        // isolated here.
223
16
        let hops_built = Arc::new(AtomicU32::new(0));
224
16

            
225
16
        let self_clone = Arc::clone(self);
226
16
        let params = params.clone();
227
16

            
228
16
        let circuit_future = self_clone.build_notimeout(
229
16
            path,
230
16
            params,
231
16
            start_time,
232
16
            Arc::clone(&hops_built),
233
16
            guard_status,
234
16
        );
235
16

            
236
75
        match double_timeout(&self.runtime, circuit_future, timeout, abandon_timeout).await {
237
8
            Ok(circuit) => Ok(circuit),
238
            Err(Error::CircTimeout) => {
239
8
                let n_built = hops_built.load(Ordering::SeqCst);
240
8
                self.timeouts
241
8
                    .note_circ_timeout(n_built as u8, self.runtime.now() - start_time);
242
8
                Err(Error::CircTimeout)
243
            }
244
            Err(e) => Err(e),
245
        }
246
16
    }
247
}
248

            
249
/// A factory object to build circuits.
250
///
251
/// A `CircuitBuilder` holds references to all the objects that are needed
252
/// to build circuits correctly.
253
///
254
/// In general, you should not need to construct or use this object yourself,
255
/// unless you are choosing your own paths.
256
pub struct CircuitBuilder<R: Runtime> {
257
    /// The underlying [`Builder`] object
258
    builder: Arc<Builder<R, ClientCirc>>,
259
    /// Configuration for how to choose paths for circuits.
260
    path_config: tor_config::MutCfg<crate::PathConfig>,
261
    /// State-manager object to use in storing current state.
262
    storage: crate::TimeoutStateHandle,
263
    /// Guard manager to tell us which guards nodes to use for the circuits
264
    /// we build.
265
    guardmgr: tor_guardmgr::GuardMgr<R>,
266
}
267

            
268
impl<R: Runtime> CircuitBuilder<R> {
269
    /// Construct a new [`CircuitBuilder`].
270
    // TODO: eventually I'd like to make this a public function, but
271
    // TimeoutStateHandle is private.
272
2
    pub(crate) fn new(
273
2
        runtime: R,
274
2
        chanmgr: Arc<ChanMgr<R>>,
275
2
        path_config: crate::PathConfig,
276
2
        storage: crate::TimeoutStateHandle,
277
2
        guardmgr: tor_guardmgr::GuardMgr<R>,
278
2
    ) -> Self {
279
2
        let timeouts = timeouts::Estimator::from_storage(&storage);
280
2

            
281
2
        CircuitBuilder {
282
2
            builder: Arc::new(Builder::new(runtime, chanmgr, timeouts)),
283
2
            path_config: path_config.into(),
284
2
            storage,
285
2
            guardmgr,
286
2
        }
287
2
    }
288

            
289
    /// Return this builder's [`PathConfig`](crate::PathConfig).
290
    pub(crate) fn path_config(&self) -> Arc<crate::PathConfig> {
291
        self.path_config.get()
292
    }
293

            
294
    /// Replace this builder's [`PathConfig`](crate::PathConfig).
295
    pub(crate) fn set_path_config(&self, new_config: crate::PathConfig) {
296
        self.path_config.replace(new_config);
297
    }
298

            
299
    /// Flush state to the state manager if we own the lock.
300
    ///
301
    /// Return `Ok(true)` if we saved, and `Ok(false)` if we didn't hold the lock.
302
    pub(crate) fn save_state(&self) -> Result<bool> {
303
        if !self.storage.can_store() {
304
            return Ok(false);
305
        }
306
        // TODO: someday we'll want to only do this if there is something
307
        // changed.
308
        self.builder.timeouts.save_state(&self.storage)?;
309
        self.guardmgr.store_persistent_state()?;
310
        Ok(true)
311
    }
312

            
313
    /// Replace our state with a new owning state, assuming we have
314
    /// storage permission.
315
2
    pub(crate) fn upgrade_to_owned_state(&self) -> Result<()> {
316
2
        self.builder
317
2
            .timeouts
318
2
            .upgrade_to_owning_storage(&self.storage);
319
2
        self.guardmgr.upgrade_to_owned_persistent_state()?;
320
2
        Ok(())
321
2
    }
322
    /// Reload persistent state from disk, if we don't have storage permission.
323
    pub(crate) fn reload_state(&self) -> Result<()> {
324
        if !self.storage.can_store() {
325
            self.builder
326
                .timeouts
327
                .reload_readonly_from_storage(&self.storage);
328
        }
329
        self.guardmgr.reload_persistent_state()?;
330
        Ok(())
331
    }
332

            
333
    /// Reconfigure this builder using the latest set of network parameters.
334
    ///
335
    /// (NOTE: for now, this only affects circuit timeout estimation.)
336
    pub fn update_network_parameters(&self, p: &tor_netdir::params::NetParameters) {
337
        self.builder.timeouts.update_params(p);
338
    }
339

            
340
    /// Like `build`, but construct a new circuit from an [`OwnedPath`].
341
    pub(crate) async fn build_owned(
342
        &self,
343
        path: OwnedPath,
344
        params: &CircParameters,
345
        guard_status: Arc<GuardStatusHandle>,
346
    ) -> Result<ClientCirc> {
347
        self.builder.build_owned(path, params, guard_status).await
348
    }
349

            
350
    /// Try to construct a new circuit from a given path, using appropriate
351
    /// timeouts.
352
    ///
353
    /// This circuit is _not_ automatically registered with any
354
    /// circuit manager; if you don't hang on it it, it will
355
    /// automatically go away when the last reference is dropped.
356
    pub async fn build(&self, path: &TorPath<'_>, params: &CircParameters) -> Result<ClientCirc> {
357
        let owned = path.try_into()?;
358
        self.build_owned(owned, params, Arc::new(None.into())).await
359
    }
360

            
361
    /// Return true if this builder is currently learning timeout info.
362
    pub(crate) fn learning_timeouts(&self) -> bool {
363
        self.builder.timeouts.learning_timeouts()
364
    }
365

            
366
    /// Return a reference to this builder's `GuardMgr`.
367
    pub(crate) fn guardmgr(&self) -> &tor_guardmgr::GuardMgr<R> {
368
        &self.guardmgr
369
    }
370
}
371

            
372
/// Helper function: spawn a future as a background task, and run it with
373
/// two separate timeouts.
374
///
375
/// If the future does not complete by `timeout`, then return a
376
/// timeout error immediately, but keep running the future in the
377
/// background.
378
///
379
/// If the future does not complete by `abandon`, then abandon the
380
/// future completely.
381
32
async fn double_timeout<R, F, T>(
382
32
    runtime: &R,
383
32
    fut: F,
384
32
    timeout: Duration,
385
32
    abandon: Duration,
386
32
) -> Result<T>
387
32
where
388
32
    R: Runtime,
389
32
    F: Future<Output = Result<T>> + Send + 'static,
390
32
    T: Send + 'static,
391
32
{
392
32
    let (snd, rcv) = oneshot::channel();
393
32
    let rt = runtime.clone();
394
32
    // We create these futures now, since we want them to look at the current
395
32
    // time when they decide when to expire.
396
32
    let inner_timeout_future = rt.timeout(abandon, fut);
397
32
    let outer_timeout_future = rt.timeout(timeout, rcv);
398
32

            
399
32
    runtime
400
32
        .spawn(async move {
401
48
            let result = inner_timeout_future.await;
402
28
            let _ignore_cancelled_error = snd.send(result);
403
32
        })
404
32
        .map_err(|e| Error::from_spawn("circuit construction task", e))?;
405

            
406
104
    let outcome = outer_timeout_future.await;
407
    // 4 layers of error to collapse:
408
    //     One from the receiver being cancelled.
409
    //     One from the outer timeout.
410
    //     One from the inner timeout.
411
    //     One from the actual future's result.
412
    //
413
    // (Technically, we could refrain from unwrapping the future's result,
414
    // but doing it this way helps make it more certain that we really are
415
    // collapsing all the layers into one.)
416
32
    outcome???
417
32
}
418

            
419
#[cfg(test)]
420
mod test {
421
    #![allow(clippy::unwrap_used)]
422
    use super::*;
423
    use crate::timeouts::TimeoutEstimator;
424
    use futures::channel::oneshot;
425
    use std::sync::Mutex;
426
    use tor_llcrypto::pk::ed25519::Ed25519Identity;
427
    use tor_rtcompat::{test_with_all_runtimes, SleepProvider};
428
    use tracing::trace;
429

            
430
    /// Make a new nonfunctional `Arc<GuardStatusHandle>`
431
    fn gs() -> Arc<GuardStatusHandle> {
432
        Arc::new(None.into())
433
    }
434

            
435
    #[test]
436
    // TODO: re-enable this test after arti#149 is fixed. For now, it
437
    // is not reliable enough.
438
    fn test_double_timeout() {
439
        let t1 = Duration::from_secs(1);
440
        let t10 = Duration::from_secs(10);
441
        /// Return true if d1 is in range [d2...d2 + 0.5sec]
442
        fn duration_close_to(d1: Duration, d2: Duration) -> bool {
443
            d1 >= d2 && d1 <= d2 + Duration::from_millis(500)
444
        }
445

            
446
        test_with_all_runtimes!(|rto| async move {
447
            #[allow(clippy::clone_on_copy)]
448
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
449

            
450
            // Try a future that's ready immediately.
451
            let x = double_timeout(&rt, async { Ok(3_u32) }, t1, t10).await;
452
            assert!(x.is_ok());
453
            assert_eq!(x.unwrap(), 3_u32);
454

            
455
            trace!("acquiesce after test1");
456
            #[allow(clippy::clone_on_copy)]
457
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
458

            
459
            // Try a future that's ready after a short delay.
460
            let rt_clone = rt.clone();
461
            // (We only want the short delay to fire, not any of the other timeouts.)
462
            rt_clone.block_advance("manually controlling advances");
463
            let x = rt
464
                .wait_for(double_timeout(
465
                    &rt,
466
                    async move {
467
                        let sl = rt_clone.sleep(Duration::from_millis(100));
468
                        rt_clone.allow_one_advance(Duration::from_millis(100));
469
                        sl.await;
470
                        Ok(4_u32)
471
                    },
472
                    t1,
473
                    t10,
474
                ))
475
                .await;
476
            assert!(x.is_ok());
477
            assert_eq!(x.unwrap(), 4_u32);
478

            
479
            trace!("acquiesce after test2");
480
            #[allow(clippy::clone_on_copy)]
481
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
482

            
483
            // Try a future that passes the first timeout, and make sure that
484
            // it keeps running after it times out.
485
            let rt_clone = rt.clone();
486
            let (snd, rcv) = oneshot::channel();
487
            let start = rt.now();
488
            rt.block_advance("manually controlling advances");
489
            let x = rt
490
                .wait_for(double_timeout(
491
                    &rt,
492
                    async move {
493
                        let sl = rt_clone.sleep(Duration::from_secs(2));
494
                        rt_clone.allow_one_advance(Duration::from_secs(2));
495
                        sl.await;
496
                        snd.send(()).unwrap();
497
                        Ok(4_u32)
498
                    },
499
                    t1,
500
                    t10,
501
                ))
502
                .await;
503
            assert!(matches!(x, Err(Error::CircTimeout)));
504
            let end = rt.now();
505
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
506
            let waited = rt.wait_for(rcv).await;
507
            assert_eq!(waited, Ok(()));
508

            
509
            trace!("acquiesce after test3");
510
            #[allow(clippy::clone_on_copy)]
511
            let rt = tor_rtmock::MockSleepRuntime::new(rto.clone());
512

            
513
            // Try a future that times out and gets abandoned.
514
            let rt_clone = rt.clone();
515
            rt.block_advance("manually controlling advances");
516
            let (snd, rcv) = oneshot::channel();
517
            let start = rt.now();
518
            // Let it hit the first timeout...
519
            rt.allow_one_advance(Duration::from_secs(1));
520
            let x = rt
521
                .wait_for(double_timeout(
522
                    &rt,
523
                    async move {
524
                        rt_clone.sleep(Duration::from_secs(30)).await;
525
                        snd.send(()).unwrap();
526
                        Ok(4_u32)
527
                    },
528
                    t1,
529
                    t10,
530
                ))
531
                .await;
532
            assert!(matches!(x, Err(Error::CircTimeout)));
533
            let end = rt.now();
534
            // ...and let it hit the second, too.
535
            rt.allow_one_advance(Duration::from_secs(9));
536
            let waited = rt.wait_for(rcv).await;
537
            assert!(waited.is_err());
538
            let end2 = rt.now();
539
            assert!(duration_close_to(end - start, Duration::from_secs(1)));
540
            assert!(duration_close_to(end2 - start, Duration::from_secs(10)));
541
        });
542
    }
543

            
544
    /// Get a pair of timeouts that we've encoded as an Ed25519 identity.
545
    ///
546
    /// In our FakeCircuit code below, the first timeout is the amount of
547
    /// time that we should sleep while building a hop to this key,
548
    /// and the second timeout is the length of time-advance we should allow
549
    /// after the hop is built.
550
    ///
551
    /// (This is pretty silly, but it's good enough for testing.)
552
    fn timeouts_from_key(id: &Ed25519Identity) -> (Duration, Duration) {
553
        let mut be = [0; 8];
554
        be[..].copy_from_slice(&id.as_bytes()[0..8]);
555
        let dur = u64::from_be_bytes(be);
556
        be[..].copy_from_slice(&id.as_bytes()[8..16]);
557
        let dur2 = u64::from_be_bytes(be);
558
        (Duration::from_millis(dur), Duration::from_millis(dur2))
559
    }
560
    /// Encode a pair of timeouts as an Ed25519 identity.
561
    ///
562
    /// In our FakeCircuit code below, the first timeout is the amount of
563
    /// time that we should sleep while building a hop to this key,
564
    /// and the second timeout is the length of time-advance we should allow
565
    /// after the hop is built.
566
    ///
567
    /// (This is pretty silly but it's good enough for testing.)
568
    fn key_from_timeouts(d1: Duration, d2: Duration) -> Ed25519Identity {
569
        let mut bytes = [0; 32];
570
        let dur = (d1.as_millis() as u64).to_be_bytes();
571
        bytes[0..8].copy_from_slice(&dur);
572
        let dur = (d2.as_millis() as u64).to_be_bytes();
573
        bytes[8..16].copy_from_slice(&dur);
574
        bytes.into()
575
    }
576

            
577
    /// Replacement type for circuit, to implement buildable.
578
    #[derive(Clone)]
579
    struct FakeCirc {
580
        hops: Vec<Ed25519Identity>,
581
        onehop: bool,
582
    }
583
    #[async_trait]
584
    impl Buildable for Mutex<FakeCirc> {
585
        async fn create_chantarget<RT: Runtime>(
586
            _: &ChanMgr<RT>,
587
            rt: &RT,
588
            ct: &OwnedChanTarget,
589
            _: &CircParameters,
590
        ) -> Result<Self> {
591
            let ed_id = ct.ed_identity();
592
            let (d1, d2) = timeouts_from_key(ed_id);
593
            rt.sleep(d1).await;
594
            if !d2.is_zero() {
595
                rt.allow_one_advance(d2);
596
            }
597

            
598
            let c = FakeCirc {
599
                hops: vec![*ct.ed_identity()],
600
                onehop: true,
601
            };
602
            Ok(Mutex::new(c))
603
        }
604
        async fn create<RT: Runtime>(
605
            _: &ChanMgr<RT>,
606
            rt: &RT,
607
            ct: &OwnedCircTarget,
608
            _: &CircParameters,
609
        ) -> Result<Self> {
610
            let ed_id = ct.ed_identity();
611
            let (d1, d2) = timeouts_from_key(ed_id);
612
            rt.sleep(d1).await;
613
            if !d2.is_zero() {
614
                rt.allow_one_advance(d2);
615
            }
616

            
617
            let c = FakeCirc {
618
                hops: vec![*ct.ed_identity()],
619
                onehop: false,
620
            };
621
            Ok(Mutex::new(c))
622
        }
623
        async fn extend<RT: Runtime>(
624
            &self,
625
            rt: &RT,
626
            ct: &OwnedCircTarget,
627
            _: &CircParameters,
628
        ) -> Result<()> {
629
            let ed_id = ct.ed_identity();
630
            let (d1, d2) = timeouts_from_key(ed_id);
631
            rt.sleep(d1).await;
632
            if !d2.is_zero() {
633
                rt.allow_one_advance(d2);
634
            }
635

            
636
            {
637
                let mut c = self.lock().unwrap();
638
                c.hops.push(*ed_id);
639
            }
640
            Ok(())
641
        }
642
    }
643

            
644
    /// Fake implementation of TimeoutEstimator that just records its inputs.
645
    struct TimeoutRecorder<R> {
646
        runtime: R,
647
        hist: Vec<(bool, u8, Duration)>,
648
        // How much advance to permit after being told of a timeout?
649
        on_timeout: Duration,
650
        // How much advance to permit after being told of a success?
651
        on_success: Duration,
652

            
653
        snd_success: Option<oneshot::Sender<()>>,
654
        rcv_success: Option<oneshot::Receiver<()>>,
655
    }
656

            
657
    impl<R> TimeoutRecorder<R> {
658
        fn new(runtime: R) -> Self {
659
            Self::with_delays(runtime, Duration::from_secs(0), Duration::from_secs(0))
660
        }
661

            
662
        fn with_delays(runtime: R, on_timeout: Duration, on_success: Duration) -> Self {
663
            let (snd_success, rcv_success) = oneshot::channel();
664
            Self {
665
                runtime,
666
                hist: Vec::new(),
667
                on_timeout,
668
                on_success,
669
                rcv_success: Some(rcv_success),
670
                snd_success: Some(snd_success),
671
            }
672
        }
673
    }
674
    impl<R: Runtime> TimeoutEstimator for Arc<Mutex<TimeoutRecorder<R>>> {
675
        fn note_hop_completed(&mut self, hop: u8, delay: Duration, is_last: bool) {
676
            if !is_last {
677
                return;
678
            }
679
            let (rt, advance) = {
680
                let mut this = self.lock().unwrap();
681
                this.hist.push((true, hop, delay));
682
                let _ = this.snd_success.take().unwrap().send(());
683
                (this.runtime.clone(), this.on_success)
684
            };
685
            if !advance.is_zero() {
686
                rt.allow_one_advance(advance);
687
            }
688
        }
689
        fn note_circ_timeout(&mut self, hop: u8, delay: Duration) {
690
            let (rt, advance) = {
691
                let mut this = self.lock().unwrap();
692
                this.hist.push((false, hop, delay));
693
                (this.runtime.clone(), this.on_timeout)
694
            };
695
            if !advance.is_zero() {
696
                rt.allow_one_advance(advance);
697
            }
698
        }
699
        fn timeouts(&mut self, _action: &Action) -> (Duration, Duration) {
700
            (Duration::from_secs(3), Duration::from_secs(100))
701
        }
702
        fn learning_timeouts(&self) -> bool {
703
            false
704
        }
705
        fn update_params(&mut self, _params: &tor_netdir::params::NetParameters) {}
706

            
707
        fn build_state(&mut self) -> Option<crate::timeouts::pareto::ParetoTimeoutState> {
708
            None
709
        }
710
    }
711

            
712
    /// Testing only: create a bogus circuit target
713
    fn circ_t(id: Ed25519Identity) -> OwnedCircTarget {
714
        OwnedCircTarget::new(chan_t(id), [0x33; 32].into(), "".parse().unwrap())
715
    }
716
    /// Testing only: create a bogus channel target
717
    fn chan_t(id: Ed25519Identity) -> OwnedChanTarget {
718
        OwnedChanTarget::new(vec![], id, [0x20; 20].into())
719
    }
720

            
721
    async fn run_builder_test<R: Runtime>(
722
        rt: tor_rtmock::MockSleepRuntime<R>,
723
        advance_initial: Duration,
724
        path: OwnedPath,
725
        advance_on_timeout: Option<(Duration, Duration)>,
726
    ) -> (Result<FakeCirc>, Vec<(bool, u8, Duration)>) {
727
        let chanmgr = Arc::new(ChanMgr::new(rt.clone()));
728
        // always has 3 second timeout, 100 second abandon.
729
        let timeouts = match advance_on_timeout {
730
            Some((d1, d2)) => TimeoutRecorder::with_delays(rt.clone(), d1, d2),
731
            None => TimeoutRecorder::new(rt.clone()),
732
        };
733
        let timeouts = Arc::new(Mutex::new(timeouts));
734
        let builder: Builder<_, Mutex<FakeCirc>> = Builder::new(
735
            rt.clone(),
736
            chanmgr,
737
            timeouts::Estimator::new(Arc::clone(&timeouts)),
738
        );
739

            
740
        let params = CircParameters::default();
741

            
742
        rt.block_advance("manually controlling advances");
743
        rt.allow_one_advance(advance_initial);
744
        let outcome = rt
745
            .wait_for(Arc::new(builder).build_owned(path, &params, gs()))
746
            .await;
747

            
748
        // Now we wait for a success to finally, finally be reported.
749
        if advance_on_timeout.is_some() {
750
            let receiver = { timeouts.lock().unwrap().rcv_success.take().unwrap() };
751
            let _ = rt.wait_for(receiver).await;
752
        }
753

            
754
        let circ = outcome.map(|m| m.lock().unwrap().clone());
755
        let timeouts = timeouts.lock().unwrap().hist.clone();
756

            
757
        (circ, timeouts)
758
    }
759

            
760
    #[test]
761
    fn build_onehop() {
762
        test_with_all_runtimes!(|rt| async move {
763
            let rt = tor_rtmock::MockSleepRuntime::new(rt);
764
            let id_100ms = key_from_timeouts(Duration::from_millis(100), Duration::from_millis(0));
765
            let path = OwnedPath::ChannelOnly(chan_t(id_100ms));
766

            
767
            let (outcome, timeouts) =
768
                run_builder_test(rt, Duration::from_millis(100), path, None).await;
769
            let circ = outcome.unwrap();
770
            assert!(circ.onehop);
771
            assert_eq!(circ.hops, [id_100ms]);
772

            
773
            assert_eq!(timeouts.len(), 1);
774
            assert!(timeouts[0].0); // success
775
            assert_eq!(timeouts[0].1, 0); // one-hop
776
            assert_eq!(timeouts[0].2, Duration::from_millis(100));
777
        });
778
    }
779

            
780
    #[test]
781
    fn build_threehop() {
782
        test_with_all_runtimes!(|rt| async move {
783
            let rt = tor_rtmock::MockSleepRuntime::new(rt);
784
            let id_100ms =
785
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
786
            let id_200ms =
787
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(300));
788
            let id_300ms = key_from_timeouts(Duration::from_millis(300), Duration::from_millis(0));
789
            let path =
790
                OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_300ms)]);
791

            
792
            let (outcome, timeouts) =
793
                run_builder_test(rt, Duration::from_millis(100), path, None).await;
794
            let circ = outcome.unwrap();
795
            assert!(!circ.onehop);
796
            assert_eq!(circ.hops, [id_100ms, id_200ms, id_300ms]);
797

            
798
            assert_eq!(timeouts.len(), 1);
799
            assert!(timeouts[0].0); // success
800
            assert_eq!(timeouts[0].1, 2); // three-hop
801
            assert_eq!(timeouts[0].2, Duration::from_millis(600));
802
        });
803
    }
804

            
805
    #[test]
806
    fn build_huge_timeout() {
807
        test_with_all_runtimes!(|rt| async move {
808
            let rt = tor_rtmock::MockSleepRuntime::new(rt);
809
            let id_100ms =
810
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
811
            let id_200ms =
812
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
813
            let id_hour = key_from_timeouts(Duration::from_secs(3600), Duration::from_secs(0));
814

            
815
            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_hour)]);
816

            
817
            let (outcome, timeouts) =
818
                run_builder_test(rt, Duration::from_millis(100), path, None).await;
819
            assert!(matches!(outcome, Err(Error::CircTimeout)));
820

            
821
            assert_eq!(timeouts.len(), 1);
822
            assert!(!timeouts[0].0); // timeout
823

            
824
            // BUG: Sometimes this is 1 and sometimes this is 2.
825
            // assert_eq!(timeouts[0].1, 2); // at third hop.
826
            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
827
        });
828
    }
829

            
830
    #[test]
831
    fn build_modest_timeout() {
832
        test_with_all_runtimes!(|rt| async move {
833
            let rt = tor_rtmock::MockSleepRuntime::new(rt);
834
            let id_100ms =
835
                key_from_timeouts(Duration::from_millis(100), Duration::from_millis(200));
836
            let id_200ms =
837
                key_from_timeouts(Duration::from_millis(200), Duration::from_millis(2700));
838
            let id_3sec = key_from_timeouts(Duration::from_millis(3000), Duration::from_millis(0));
839

            
840
            let timeout_advance = (Duration::from_millis(4000), Duration::from_secs(0));
841

            
842
            let path = OwnedPath::Normal(vec![circ_t(id_100ms), circ_t(id_200ms), circ_t(id_3sec)]);
843

            
844
            let (outcome, timeouts) = run_builder_test(
845
                rt.clone(),
846
                Duration::from_millis(100),
847
                path,
848
                Some(timeout_advance),
849
            )
850
            .await;
851
            assert!(matches!(outcome, Err(Error::CircTimeout)));
852

            
853
            assert_eq!(timeouts.len(), 2);
854
            assert!(!timeouts[0].0); // timeout
855

            
856
            // BUG: Sometimes this is 1 and sometimes this is 2.
857
            //assert_eq!(timeouts[0].1, 2); // at third hop.
858
            assert_eq!(timeouts[0].2, Duration::from_millis(3000));
859

            
860
            assert!(timeouts[1].0); // success
861
            assert_eq!(timeouts[1].1, 2); // three-hop
862
                                          // BUG: This timer is not always reliable, due to races.
863
                                          //assert_eq!(timeouts[1].2, Duration::from_millis(3300));
864
        });
865
    }
866
}