1
1
//! `tor-dirmgr`: Code to fetch, store, and update Tor directory information.
2
//!
3
//! # Overview
4
//!
5
//! This crate is part of
6
//! [Arti](https://gitlab.torproject.org/tpo/core/arti/), a project to
7
//! implement [Tor](https://www.torproject.org/) in Rust.
8
//!
9
//! In its current design, Tor requires a set of up-to-date
10
//! authenticated directory documents in order to build multi-hop
11
//! anonymized circuits through the network.
12
//!
13
//! This directory manager crate is responsible for figuring out which
14
//! directory information we lack, downloading what we're missing, and
15
//! keeping a cache of it on disk.
16
//!
17
//! # Compile-time features
18
//!
19
//! `mmap` (default) -- Use memory mapping to reduce the memory load for
20
//! reading large directory objects from disk.
21
//!
22
//! `static` -- Try to link with a static copy of sqlite3.
23
//!
24
//! `routerdesc` -- (Incomplete) support for downloading and storing
25
//!      router descriptors.
26

            
27
#![deny(missing_docs)]
28
#![warn(noop_method_call)]
29
#![deny(unreachable_pub)]
30
#![warn(clippy::all)]
31
#![deny(clippy::await_holding_lock)]
32
#![deny(clippy::cargo_common_metadata)]
33
#![deny(clippy::cast_lossless)]
34
#![deny(clippy::checked_conversions)]
35
#![warn(clippy::cognitive_complexity)]
36
#![deny(clippy::debug_assert_with_mut_call)]
37
#![deny(clippy::exhaustive_enums)]
38
#![deny(clippy::exhaustive_structs)]
39
#![deny(clippy::expl_impl_clone_on_copy)]
40
#![deny(clippy::fallible_impl_from)]
41
#![deny(clippy::implicit_clone)]
42
#![deny(clippy::large_stack_arrays)]
43
#![warn(clippy::manual_ok_or)]
44
#![deny(clippy::missing_docs_in_private_items)]
45
#![deny(clippy::missing_panics_doc)]
46
#![warn(clippy::needless_borrow)]
47
#![warn(clippy::needless_pass_by_value)]
48
#![warn(clippy::option_option)]
49
#![warn(clippy::rc_buffer)]
50
#![deny(clippy::ref_option_ref)]
51
#![warn(clippy::semicolon_if_nothing_returned)]
52
#![warn(clippy::trait_duplication_in_bounds)]
53
#![deny(clippy::unnecessary_wraps)]
54
#![warn(clippy::unseparated_literal_suffix)]
55
#![deny(clippy::unwrap_used)]
56

            
57
pub mod authority;
58
mod bootstrap;
59
pub mod config;
60
mod docid;
61
mod docmeta;
62
mod err;
63
mod event;
64
mod retry;
65
mod shared_ref;
66
mod state;
67
mod storage;
68

            
69
use crate::docid::{CacheUsage, ClientRequest, DocQuery};
70
#[cfg(not(feature = "experimental-api"))]
71
use crate::shared_ref::SharedMutArc;
72
#[cfg(feature = "experimental-api")]
73
pub use crate::shared_ref::SharedMutArc;
74
use crate::storage::DynStore;
75
use postage::watch;
76
pub use retry::DownloadSchedule;
77
use tor_circmgr::CircMgr;
78
use tor_netdir::NetDir;
79
use tor_netdoc::doc::netstatus::ConsensusFlavor;
80

            
81
use async_trait::async_trait;
82
use futures::{channel::oneshot, stream::BoxStream, task::SpawnExt};
83
use tor_rtcompat::{Runtime, SleepProviderExt};
84
use tracing::{debug, info, trace, warn};
85

            
86
use std::sync::atomic::{AtomicBool, Ordering};
87
use std::sync::{Arc, Mutex};
88
use std::{collections::HashMap, sync::Weak};
89
use std::{fmt::Debug, time::SystemTime};
90

            
91
pub use authority::{Authority, AuthorityBuilder};
92
pub use config::{
93
    DirMgrConfig, DirMgrConfigBuilder, DownloadScheduleConfig, DownloadScheduleConfigBuilder,
94
    NetworkConfig, NetworkConfigBuilder,
95
};
96
pub use docid::DocId;
97
pub use err::Error;
98
pub use event::{DirBootstrapEvents, DirBootstrapStatus, DirEvent, DirStatus};
99
pub use storage::DocumentText;
100
pub use tor_netdir::fallback::{FallbackDir, FallbackDirBuilder};
101

            
102
/// A Result as returned by this crate.
103
pub type Result<T> = std::result::Result<T, Error>;
104

            
105
/// Trait for DirMgr implementations
106
#[async_trait]
107
pub trait DirProvider {
108
    /// Return a handle to our latest directory, if we have one.
109
    fn latest_netdir(&self) -> Option<Arc<NetDir>>;
110

            
111
    /// Return a new asynchronous stream that will receive notification
112
    /// whenever the consensus has changed.
113
    ///
114
    /// Multiple events may be batched up into a single item: each time
115
    /// this stream yields an event, all you can assume is that the event has
116
    /// occurred at least once.
117
    fn events(&self) -> BoxStream<'static, DirEvent>;
118

            
119
    /// Try to change our configuration to `new_config`.
120
    ///
121
    /// Actual behavior will depend on the value of `how`.
122
    fn reconfigure(
123
        &self,
124
        new_config: &DirMgrConfig,
125
        how: tor_config::Reconfigure,
126
    ) -> std::result::Result<(), tor_config::ReconfigureError>;
127

            
128
    /// Bootstrap a `DirProvider` that hasn't been bootstrapped yet.
129
    async fn bootstrap(&self) -> Result<()>;
130

            
131
    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
132
    /// in the latest directory's bootstrap status.
133
    ///
134
    /// Note that this stream can be lossy: the caller will not necessarily
135
    /// observe every event on the stream
136
    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus>;
137
}
138

            
139
#[async_trait]
140
impl<R: Runtime> DirProvider for Arc<DirMgr<R>> {
141
5
    fn latest_netdir(&self) -> Option<Arc<NetDir>> {
142
5
        self.opt_netdir()
143
5
    }
144

            
145
2
    fn events(&self) -> BoxStream<'static, DirEvent> {
146
2
        Box::pin(self.events.subscribe())
147
2
    }
148

            
149
    fn reconfigure(
150
        &self,
151
        new_config: &DirMgrConfig,
152
        how: tor_config::Reconfigure,
153
    ) -> std::result::Result<(), tor_config::ReconfigureError> {
154
        DirMgr::reconfigure(self, new_config, how)
155
    }
156

            
157
    async fn bootstrap(&self) -> Result<()> {
158
        DirMgr::bootstrap(self).await
159
    }
160

            
161
2
    fn bootstrap_events(&self) -> BoxStream<'static, DirBootstrapStatus> {
162
2
        Box::pin(DirMgr::bootstrap_events(self))
163
2
    }
164
}
165

            
166
/// A directory manager to download, fetch, and cache a Tor directory.
167
///
168
/// A DirMgr can operate in three modes:
169
///   * In **offline** mode, it only reads from the cache, and can
170
///     only read once.
171
///   * In **read-only** mode, it reads from the cache, but checks
172
///     whether it can acquire an associated lock file.  If it can, then
173
///     it enters read-write mode.  If not, it checks the cache
174
///     periodically for new information.
175
///   * In **read-write** mode, it knows that no other process will be
176
///     writing to the cache, and it takes responsibility for fetching
177
///     data from the network and updating the directory with new
178
///     directory information.
179
pub struct DirMgr<R: Runtime> {
180
    /// Configuration information: where to find directories, how to
181
    /// validate them, and so on.
182
    config: tor_config::MutCfg<DirMgrConfig>,
183
    /// Handle to our sqlite cache.
184
    // TODO(nickm): I'd like to use an rwlock, but that's not feasible, since
185
    // rusqlite::Connection isn't Sync.
186
    // TODO is needed?
187
    store: Mutex<DynStore>,
188
    /// Our latest sufficiently bootstrapped directory, if we have one.
189
    ///
190
    /// We use the RwLock so that we can give this out to a bunch of other
191
    /// users, and replace it once a new directory is bootstrapped.
192
    netdir: SharedMutArc<NetDir>,
193

            
194
    /// A publisher handle that we notify whenever the consensus changes.
195
    events: event::FlagPublisher<DirEvent>,
196

            
197
    /// A publisher handle that we notify whenever our bootstrapping status
198
    /// changes.
199
    send_status: Mutex<watch::Sender<event::DirBootstrapStatus>>,
200

            
201
    /// A receiver handle that gets notified whenever our bootstrapping status
202
    /// changes.
203
    ///
204
    /// We don't need to keep this drained, since `postage::watch` already knows
205
    /// to discard unread events.
206
    receive_status: DirBootstrapEvents,
207

            
208
    /// A circuit manager, if this DirMgr supports downloading.
209
    circmgr: Option<Arc<CircMgr<R>>>,
210

            
211
    /// Our asynchronous runtime.
212
    runtime: R,
213

            
214
    /// Whether or not we're operating in offline mode.
215
    offline: bool,
216

            
217
    /// If we're not in offline mode, stores whether or not the `DirMgr` has attempted
218
    /// to bootstrap yet or not.
219
    ///
220
    /// This exists in order to prevent starting two concurrent bootstrap tasks.
221
    ///
222
    /// (In offline mode, this does nothing.)
223
    bootstrap_started: AtomicBool,
224
}
225

            
226
/// RAII guard to reset an AtomicBool on drop.
227
struct BoolResetter<'a> {
228
    /// The bool to reset.
229
    inner: &'a AtomicBool,
230
    /// What value to store.
231
    reset_to: bool,
232
    /// What atomic ordering to use.
233
    ordering: Ordering,
234
}
235

            
236
impl<'a> Drop for BoolResetter<'a> {
237
1
    fn drop(&mut self) {
238
1
        self.inner.store(self.reset_to, self.ordering);
239
1
    }
240
}
241

            
242
impl<'a> BoolResetter<'a> {
243
    /// Disarm the guard, consuming it to make it not reset any more.
244
1
    fn disarm(self) {
245
1
        std::mem::forget(self);
246
1
    }
247
}
248

            
249
/// The possible origins of a document.
250
///
251
/// Used (for example) to report where we got a document from if it fails to
252
/// parse.
253
2
#[derive(Debug, Clone, derive_more::Display)]
254
#[non_exhaustive]
255
pub enum DocSource {
256
    /// We loaded the document from our cache.
257
    #[display(fmt = "local cache")]
258
    LocalCache,
259
    /// We fetched the document from a server.
260
    //
261
    // TODO: we'll should add a lot more information here in the future, once
262
    // it's available from tor_dirclient::DirSource,
263
    #[display(fmt = "directory server")]
264
    DirServer {},
265
}
266

            
267
impl<R: Runtime> DirMgr<R> {
268
    /// Try to load the directory from disk, without launching any
269
    /// kind of update process.
270
    ///
271
    /// This function runs in **offline** mode: it will give an error
272
    /// if the result is not up-to-date, or not fully downloaded.
273
    ///
274
    /// In general, you shouldn't use this function in a long-running
275
    /// program; it's only suitable for command-line or batch tools.
276
    // TODO: I wish this function didn't have to be async or take a runtime.
277
    pub async fn load_once(runtime: R, config: DirMgrConfig) -> Result<Arc<NetDir>> {
278
        let dirmgr = Arc::new(Self::from_config(config, runtime, None, true)?);
279

            
280
        // TODO: add some way to return a directory that isn't up-to-date
281
        let _success = dirmgr.load_directory().await?;
282

            
283
        dirmgr.opt_netdir().ok_or(Error::DirectoryNotPresent)
284
    }
285

            
286
    /// Return a current netdir, either loading it or bootstrapping it
287
    /// as needed.
288
    ///
289
    /// Like load_once, but will try to bootstrap (or wait for another
290
    /// process to bootstrap) if we don't have an up-to-date
291
    /// bootstrapped directory.
292
    ///
293
    /// In general, you shouldn't use this function in a long-running
294
    /// program; it's only suitable for command-line or batch tools.
295
    pub async fn load_or_bootstrap_once(
296
        config: DirMgrConfig,
297
        runtime: R,
298
        circmgr: Arc<CircMgr<R>>,
299
    ) -> Result<Arc<NetDir>> {
300
        let dirmgr = DirMgr::bootstrap_from_config(config, runtime, circmgr).await?;
301
        dirmgr.netdir()
302
    }
303

            
304
    /// Create a new `DirMgr` in online mode, but don't bootstrap it yet.
305
    ///
306
    /// The `DirMgr` can be bootstrapped later with `bootstrap`.
307
1
    pub fn create_unbootstrapped(
308
1
        config: DirMgrConfig,
309
1
        runtime: R,
310
1
        circmgr: Arc<CircMgr<R>>,
311
1
    ) -> Result<Arc<Self>> {
312
1
        Ok(Arc::new(DirMgr::from_config(
313
1
            config,
314
1
            runtime,
315
1
            Some(circmgr),
316
1
            false,
317
1
        )?))
318
1
    }
319

            
320
    /// Bootstrap a `DirMgr` created in online mode that hasn't been bootstrapped yet.
321
    ///
322
    /// This function will not return until the directory is bootstrapped enough to build circuits.
323
    /// It will also launch a background task that fetches any missing information, and that
324
    /// replaces the directory when a new one is available.
325
    ///
326
    /// This function is intended to be used together with `create_unbootstrapped`. There is no
327
    /// need to call this function otherwise.
328
    ///
329
    /// If bootstrapping has already successfully taken place, returns early with success.
330
    ///
331
    /// # Errors
332
    ///
333
    /// Returns an error if bootstrapping fails. If the error is [`Error::CantAdvanceState`],
334
    /// it may be possible to successfully bootstrap later on by calling this function again.
335
    ///
336
    /// # Panics
337
    ///
338
    /// Panics if the `DirMgr` passed to this function was not created in online mode, such as
339
    /// via `load_once`.
340
    pub async fn bootstrap(self: &Arc<Self>) -> Result<()> {
341
        if self.offline {
342
            return Err(Error::OfflineMode);
343
        }
344

            
345
        // The semantics of this are "attempt to replace a 'false' value with 'true'.
346
        // If the value in bootstrap_started was not 'false' when the attempt was made, returns
347
        // `Err`; this means another bootstrap attempt is in progress or has completed, so we
348
        // return early.
349

            
350
        // NOTE(eta): could potentially weaken the `Ordering` here in future
351
        if self
352
            .bootstrap_started
353
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
354
            .is_err()
355
        {
356
            debug!("Attempted to bootstrap twice; ignoring.");
357
            return Ok(());
358
        }
359

            
360
        // Use a RAII guard to reset `bootstrap_started` to `false` if we return early without
361
        // completing bootstrap.
362
        let resetter = BoolResetter {
363
            inner: &self.bootstrap_started,
364
            reset_to: false,
365
            ordering: Ordering::SeqCst,
366
        };
367

            
368
        // Try to load from the cache.
369
        let have_directory = self.load_directory().await?;
370

            
371
        let (mut sender, receiver) = if have_directory {
372
            info!("Loaded a good directory from cache.");
373
            (None, None)
374
        } else {
375
            info!("Didn't get usable directory from cache.");
376
            let (sender, receiver) = oneshot::channel();
377
            (Some(sender), Some(receiver))
378
        };
379

            
380
        // Whether we loaded or not, we now start downloading.
381
        let dirmgr_weak = Arc::downgrade(self);
382
        self.runtime
383
            .spawn(async move {
384
                // NOTE: This is a daemon task.  It should eventually get
385
                // treated as one.
386

            
387
                // Don't warn when these are Error::ManagerDropped: that
388
                // means that the DirMgr has been shut down.
389
                if let Err(e) = Self::reload_until_owner(&dirmgr_weak, &mut sender).await {
390
                    match e {
391
                        Error::ManagerDropped => {}
392
                        _ => warn!("Unrecovered error while waiting for bootstrap: {}", e),
393
                    }
394
                } else if let Err(e) = Self::download_forever(dirmgr_weak, sender).await {
395
                    match e {
396
                        Error::ManagerDropped => {}
397
                        _ => warn!("Unrecovered error while downloading: {}", e),
398
                    }
399
                }
400
            })
401
            .map_err(|e| Error::from_spawn("directory updater task", e))?;
402

            
403
        if let Some(receiver) = receiver {
404
            match receiver.await {
405
                Ok(()) => {
406
                    info!("We have enough information to build circuits.");
407
                    // Disarm the RAII guard, since we succeeded.
408
                    resetter.disarm();
409
                }
410
                Err(_) => {
411
                    warn!("Bootstrapping task exited before finishing.");
412
                    return Err(Error::CantAdvanceState);
413
                }
414
            }
415
        }
416
        Ok(())
417
    }
418

            
419
    /// Returns `true` if a bootstrap attempt is in progress, or successfully completed.
420
    pub fn bootstrap_started(&self) -> bool {
421
        self.bootstrap_started.load(Ordering::SeqCst)
422
    }
423

            
424
    /// Return a new directory manager from a given configuration,
425
    /// bootstrapping from the network as necessary.
426
    pub async fn bootstrap_from_config(
427
        config: DirMgrConfig,
428
        runtime: R,
429
        circmgr: Arc<CircMgr<R>>,
430
    ) -> Result<Arc<Self>> {
431
        let dirmgr = Self::create_unbootstrapped(config, runtime, circmgr)?;
432

            
433
        dirmgr.bootstrap().await?;
434

            
435
        Ok(dirmgr)
436
    }
437

            
438
    /// Try forever to either lock the storage (and thereby become the
439
    /// owner), or to reload the database.
440
    ///
441
    /// If we have begin to have a bootstrapped directory, send a
442
    /// message using `on_complete`.
443
    ///
444
    /// If we eventually become the owner, return Ok().
445
    async fn reload_until_owner(
446
        weak: &Weak<Self>,
447
        on_complete: &mut Option<oneshot::Sender<()>>,
448
    ) -> Result<()> {
449
        let mut logged = false;
450
        let mut bootstrapped;
451
        let runtime;
452
        {
453
            let dirmgr = upgrade_weak_ref(weak)?;
454
            runtime = dirmgr.runtime.clone();
455
            bootstrapped = dirmgr.netdir.get().is_some();
456
        }
457

            
458
        loop {
459
            {
460
                let dirmgr = upgrade_weak_ref(weak)?;
461
                trace!("Trying to take ownership of the directory cache lock");
462
                if dirmgr.try_upgrade_to_readwrite()? {
463
                    // We now own the lock!  (Maybe we owned it before; the
464
                    // upgrade_to_readwrite() function is idempotent.)  We can
465
                    // do our own bootstrapping.
466
                    if logged {
467
                        info!("The previous owning process has given up the lock. We are now in charge of managing the directory.");
468
                    }
469
                    return Ok(());
470
                }
471
            }
472

            
473
            if !logged {
474
                logged = true;
475
                if bootstrapped {
476
                    info!("Another process is managing the directory. We'll use its cache.");
477
                } else {
478
                    info!("Another process is bootstrapping the directory. Waiting till it finishes or exits.");
479
                }
480
            }
481

            
482
            // We don't own the lock.  Somebody else owns the cache.  They
483
            // should be updating it.  Wait a bit, then try again.
484
            let pause = if bootstrapped {
485
                std::time::Duration::new(120, 0)
486
            } else {
487
                std::time::Duration::new(5, 0)
488
            };
489
            runtime.sleep(pause).await;
490
            // TODO: instead of loading the whole thing we should have a
491
            // database entry that says when the last update was, or use
492
            // our state functions.
493
            {
494
                let dirmgr = upgrade_weak_ref(weak)?;
495
                trace!("Trying to load from the directory cache");
496
                if dirmgr.load_directory().await? {
497
                    // Successfully loaded a bootstrapped directory.
498
                    if let Some(send_done) = on_complete.take() {
499
                        let _ = send_done.send(());
500
                    }
501
                    if !bootstrapped {
502
                        info!("The directory is now bootstrapped.");
503
                    }
504
                    bootstrapped = true;
505
                }
506
            }
507
        }
508
    }
509

            
510
    /// Try to fetch our directory info and keep it updated, indefinitely.
511
    ///
512
    /// If we have begin to have a bootstrapped directory, send a
513
    /// message using `on_complete`.
514
    async fn download_forever(
515
        weak: Weak<Self>,
516
        mut on_complete: Option<oneshot::Sender<()>>,
517
    ) -> Result<()> {
518
        let mut state: Box<dyn DirState> = Box::new(state::GetConsensusState::new(
519
            Weak::clone(&weak),
520
            CacheUsage::CacheOkay,
521
        )?);
522

            
523
        let runtime = {
524
            let dirmgr = upgrade_weak_ref(&weak)?;
525
            dirmgr.runtime.clone()
526
        };
527

            
528
        loop {
529
            let mut usable = false;
530

            
531
            let retry_config = {
532
                let dirmgr = upgrade_weak_ref(&weak)?;
533
                // TODO(nickm): instead of getting this every time we loop, it
534
                // might be a good idea to refresh it with each attempt, at
535
                // least at the point of checking the number of attempts.
536
                *dirmgr.config.get().schedule().retry_bootstrap()
537
            };
538
            let mut retry_delay = retry_config.schedule();
539

            
540
            'retry_attempt: for _ in retry_config.attempts() {
541
                let (newstate, recoverable_err) =
542
                    bootstrap::download(Weak::clone(&weak), state, &mut on_complete).await?;
543
                state = newstate;
544

            
545
                if let Some(err) = recoverable_err {
546
                    if state.is_ready(Readiness::Usable) {
547
                        usable = true;
548
                        info!("Unable to completely download a directory: {}.  Nevertheless, the directory is usable, so we'll pause for now.", err);
549
                        break 'retry_attempt;
550
                    }
551

            
552
                    let delay = retry_delay.next_delay(&mut rand::thread_rng());
553
                    warn!(
554
                        "Unable to download a usable directory: {}.  We will restart in {:?}.",
555
                        err, delay
556
                    );
557
                    runtime.sleep(delay).await;
558
                    state = state.reset()?;
559
                } else {
560
                    info!("Directory is complete.");
561
                    usable = true;
562
                    break 'retry_attempt;
563
                }
564
            }
565

            
566
            if !usable {
567
                // we ran out of attempts.
568
                warn!(
569
                    "We failed {} times to bootstrap a directory. We're going to give up.",
570
                    retry_config.n_attempts()
571
                );
572
                return Err(Error::CantAdvanceState);
573
            } else {
574
                // Report success, if appropriate.
575
                if let Some(send_done) = on_complete.take() {
576
                    let _ = send_done.send(());
577
                }
578
            }
579

            
580
            let reset_at = state.reset_time();
581
            match reset_at {
582
                Some(t) => runtime.sleep_until_wallclock(t).await,
583
                None => return Ok(()),
584
            }
585
            state = state.reset()?;
586
        }
587
    }
588

            
589
    /// Get a reference to the circuit manager, if we have one.
590
1
    fn circmgr(&self) -> Result<Arc<CircMgr<R>>> {
591
1
        self.circmgr
592
1
            .as_ref()
593
1
            .map(Arc::clone)
594
1
            .ok_or(Error::NoDownloadSupport)
595
1
    }
596

            
597
    /// Try to change our configuration to `new_config`.
598
    ///
599
    /// Actual behavior will depend on the value of `how`.
600
    pub fn reconfigure(
601
        &self,
602
        new_config: &DirMgrConfig,
603
        how: tor_config::Reconfigure,
604
    ) -> std::result::Result<(), tor_config::ReconfigureError> {
605
        let config = self.config.get();
606
        // We don't support changing these: doing so basically would require us
607
        // to abort all our in-progress downloads, since they might be based on
608
        // no-longer-viable information.
609
        if new_config.cache_path() != config.cache_path() {
610
            how.cannot_change("storage.cache_path")?;
611
        }
612
        if new_config.authorities() != config.authorities() {
613
            how.cannot_change("network.authorities")?;
614
        }
615

            
616
        if how == tor_config::Reconfigure::CheckAllOrNothing {
617
            return Ok(());
618
        }
619

            
620
        let params_changed = new_config.override_net_params() != config.override_net_params();
621

            
622
        self.config
623
            .map_and_replace(|cfg| cfg.update_from_config(new_config));
624

            
625
        if params_changed {
626
            let _ignore_err = self.netdir.mutate(|netdir| {
627
                netdir.replace_overridden_parameters(new_config.override_net_params());
628
                Ok(())
629
            });
630
            // (It's okay to ignore the error, since it just means that there
631
            // was no current netdir.)
632
            self.events.publish(DirEvent::NewConsensus);
633
        }
634

            
635
        Ok(())
636
    }
637

            
638
    /// Return a stream of [`DirBootstrapStatus`] events to tell us about changes
639
    /// in the latest directory's bootstrap status.
640
    ///
641
    /// Note that this stream can be lossy: the caller will not necessarily
642
    /// observe every event on the stream
643
2
    pub fn bootstrap_events(&self) -> event::DirBootstrapEvents {
644
2
        self.receive_status.clone()
645
2
    }
646

            
647
    /// Replace the latest status with `new_status` and broadcast to anybody
648
    /// watching via a [`DirBootstrapEvents`] stream.
649
7
    fn update_status(&self, new_status: DirStatus) {
650
7
        // TODO(nickm): can I kill off this lock by having something else own the sender?
651
7
        let mut sender = self.send_status.lock().expect("poisoned lock");
652
7
        let mut status = sender.borrow_mut();
653
7

            
654
7
        status.update(new_status);
655
7
    }
656

            
657
    /// Try to make this a directory manager with read-write access to its
658
    /// storage.
659
    ///
660
    /// Return true if we got the lock, or if we already had it.
661
    ///
662
    /// Return false if another process has the lock
663
    fn try_upgrade_to_readwrite(&self) -> Result<bool> {
664
        self.store
665
            .lock()
666
            .expect("Directory storage lock poisoned")
667
            .upgrade_to_readwrite()
668
    }
669

            
670
    /// Return a reference to the store, if it is currently read-write.
671
8
    fn store_if_rw(&self) -> Option<&Mutex<DynStore>> {
672
8
        let rw = !self
673
8
            .store
674
8
            .lock()
675
8
            .expect("Directory storage lock poisoned")
676
8
            .is_readonly();
677
8
        // A race-condition is possible here, but I believe it's harmless.
678
8
        if rw {
679
8
            Some(&self.store)
680
        } else {
681
            None
682
        }
683
8
    }
684

            
685
    /// Construct a DirMgr from a DirMgrConfig.
686
    ///
687
    /// If `offline` is set, opens the SQLite store read-only and sets the offline flag in the
688
    /// returned manager.
689
8
    fn from_config(
690
8
        config: DirMgrConfig,
691
8
        runtime: R,
692
8
        circmgr: Option<Arc<CircMgr<R>>>,
693
8
        offline: bool,
694
8
    ) -> Result<Self> {
695
8
        let store = Mutex::new(config.open_store(offline)?);
696
8
        let netdir = SharedMutArc::new();
697
8
        let events = event::FlagPublisher::new();
698
8

            
699
8
        let (send_status, receive_status) = postage::watch::channel();
700
8
        let send_status = Mutex::new(send_status);
701
8
        let receive_status = DirBootstrapEvents {
702
8
            inner: receive_status,
703
8
        };
704
8

            
705
8
        Ok(DirMgr {
706
8
            config: config.into(),
707
8
            store,
708
8
            netdir,
709
8
            events,
710
8
            send_status,
711
8
            receive_status,
712
8
            circmgr,
713
8
            runtime,
714
8
            offline,
715
8
            bootstrap_started: AtomicBool::new(false),
716
8
        })
717
8
    }
718

            
719
    /// Load the latest non-pending non-expired directory from the
720
    /// cache, if it is newer than the one we have.
721
    ///
722
    /// Return false if there is no such consensus.
723
    async fn load_directory(self: &Arc<Self>) -> Result<bool> {
724
        let state = state::GetConsensusState::new(Arc::downgrade(self), CacheUsage::CacheOnly)?;
725
        let _ = bootstrap::load(Arc::clone(self), Box::new(state)).await?;
726

            
727
        Ok(self.netdir.get().is_some())
728
    }
729

            
730
    /// Return an Arc handle to our latest directory, if we have one.
731
6
    pub fn opt_netdir(&self) -> Option<Arc<NetDir>> {
732
6
        self.netdir.get()
733
6
    }
734

            
735
    /// Return an Arc handle to our latest directory, returning an error if there is none.
736
    ///
737
    /// # Errors
738
    ///
739
    /// Errors with [`Error::DirectoryNotPresent`] if the `DirMgr` hasn't been bootstrapped yet.
740
    // TODO: Add variants of this that make sure that it's up-to-date?
741
    pub fn netdir(&self) -> Result<Arc<NetDir>> {
742
        self.opt_netdir().ok_or(Error::DirectoryNotPresent)
743
    }
744

            
745
    /// Return a new asynchronous stream that will receive notification
746
    /// whenever the consensus has changed.
747
    ///
748
    /// Multiple events may be batched up into a single item: each time
749
    /// this stream yields an event, all you can assume is that the event has
750
    /// occurred at least once.
751
    pub fn events(&self) -> impl futures::Stream<Item = DirEvent> {
752
        self.events.subscribe()
753
    }
754

            
755
    /// Try to load the text of a single document described by `doc` from
756
    /// storage.
757
3
    pub fn text(&self, doc: &DocId) -> Result<Option<DocumentText>> {
758
3
        use itertools::Itertools;
759
3
        let mut result = HashMap::new();
760
3
        let query = (*doc).into();
761
3
        self.load_documents_into(&query, &mut result)?;
762
3
        let item = result.into_iter().at_most_one().map_err(|_| {
763
            Error::CacheCorruption("Found more than one entry in storage for given docid")
764
3
        })?;
765
3
        if let Some((docid, doctext)) = item {
766
2
            if &docid != doc {
767
                return Err(Error::CacheCorruption(
768
                    "Item from storage had incorrect docid.",
769
                ));
770
2
            }
771
2
            Ok(Some(doctext))
772
        } else {
773
1
            Ok(None)
774
        }
775
3
    }
776

            
777
    /// Load the text for a collection of documents.
778
    ///
779
    /// If many of the documents have the same type, this can be more
780
    /// efficient than calling [`text`](Self::text).
781
1
    pub fn texts<T>(&self, docs: T) -> Result<HashMap<DocId, DocumentText>>
782
1
    where
783
1
        T: IntoIterator<Item = DocId>,
784
1
    {
785
1
        let partitioned = docid::partition_by_type(docs);
786
1
        let mut result = HashMap::new();
787
3
        for (_, query) in partitioned.into_iter() {
788
3
            self.load_documents_into(&query, &mut result)?;
789
        }
790
1
        Ok(result)
791
1
    }
792

            
793
    /// Load all the documents for a single DocumentQuery from the store.
794
12
    fn load_documents_into(
795
12
        &self,
796
12
        query: &DocQuery,
797
12
        result: &mut HashMap<DocId, DocumentText>,
798
12
    ) -> Result<()> {
799
12
        use DocQuery::*;
800
12
        let store = self.store.lock().expect("Directory storage lock poisoned");
801
12
        match query {
802
            LatestConsensus {
803
1
                flavor,
804
1
                cache_usage,
805
1
            } => {
806
1
                if *cache_usage == CacheUsage::MustDownload {
807
                    // Do nothing: we don't want a cached consensus.
808
                    trace!("MustDownload is set; not checking for cached consensus.");
809
1
                } else if let Some(c) =
810
1
                    store.latest_consensus(*flavor, cache_usage.pending_requirement())?
811
                {
812
1
                    trace!("Found a reasonable consensus in the cache");
813
1
                    let id = DocId::LatestConsensus {
814
1
                        flavor: *flavor,
815
1
                        cache_usage: *cache_usage,
816
1
                    };
817
1
                    result.insert(id, c.into());
818
                }
819
            }
820
1
            AuthCert(ids) => result.extend(
821
1
                store
822
1
                    .authcerts(ids)?
823
1
                    .into_iter()
824
1
                    .map(|(id, c)| (DocId::AuthCert(id), DocumentText::from_string(c))),
825
            ),
826
9
            Microdesc(digests) => {
827
9
                result.extend(
828
9
                    store
829
9
                        .microdescs(digests)?
830
9
                        .into_iter()
831
16
                        .map(|(id, md)| (DocId::Microdesc(id), DocumentText::from_string(md))),
832
                );
833
            }
834
            #[cfg(feature = "routerdesc")]
835
1
            RouterDesc(digests) => result.extend(
836
1
                store
837
1
                    .routerdescs(digests)?
838
1
                    .into_iter()
839
1
                    .map(|(id, rd)| (DocId::RouterDesc(id), DocumentText::from_string(rd))),
840
            ),
841
        }
842
12
        Ok(())
843
12
    }
844

            
845
    /// Convert a DocQuery into a set of ClientRequests, suitable for sending
846
    /// to a directory cache.
847
    ///
848
    /// This conversion has to be a function of the dirmgr, since it may
849
    /// require knowledge about our current state.
850
7
    fn query_into_requests(&self, q: DocQuery) -> Result<Vec<ClientRequest>> {
851
7
        let mut res = Vec::new();
852
9
        for q in q.split_for_download() {
853
9
            match q {
854
2
                DocQuery::LatestConsensus { flavor, .. } => {
855
2
                    res.push(self.make_consensus_request(flavor)?);
856
                }
857
1
                DocQuery::AuthCert(ids) => {
858
1
                    res.push(ClientRequest::AuthCert(ids.into_iter().collect()));
859
1
                }
860
4
                DocQuery::Microdesc(ids) => {
861
4
                    res.push(ClientRequest::Microdescs(ids.into_iter().collect()));
862
4
                }
863
                #[cfg(feature = "routerdesc")]
864
2
                DocQuery::RouterDesc(ids) => {
865
2
                    res.push(ClientRequest::RouterDescs(ids.into_iter().collect()));
866
2
                }
867
            }
868
        }
869
7
        Ok(res)
870
7
    }
871

            
872
    /// Construct an appropriate ClientRequest to download a consensus
873
    /// of the given flavor.
874
4
    fn make_consensus_request(&self, flavor: ConsensusFlavor) -> Result<ClientRequest> {
875
4
        #![allow(clippy::unnecessary_wraps)]
876
4
        let mut request = tor_dirclient::request::ConsensusRequest::new(flavor);
877
4

            
878
4
        let r = self.store.lock().expect("Directory storage lock poisoned");
879
4
        match r.latest_consensus_meta(flavor) {
880
2
            Ok(Some(meta)) => {
881
2
                request.set_last_consensus_date(meta.lifetime().valid_after());
882
2
                request.push_old_consensus_digest(*meta.sha3_256_of_signed());
883
2
            }
884
2
            Ok(None) => {}
885
            Err(e) => {
886
                warn!("Error loading directory metadata: {}", e);
887
            }
888
        }
889

            
890
4
        Ok(ClientRequest::Consensus(request))
891
4
    }
892

            
893
    /// Given a request we sent and the response we got from a
894
    /// directory server, see whether we should expand that response
895
    /// into "something larger".
896
    ///
897
    /// Currently, this handles expanding consensus diffs, and nothing
898
    /// else.  We do it at this stage of our downloading operation
899
    /// because it requires access to the store.
900
    fn expand_response_text(&self, req: &ClientRequest, text: String) -> Result<String> {
901
6
        if let ClientRequest::Consensus(req) = req {
902
4
            if tor_consdiff::looks_like_diff(&text) {
903
2
                if let Some(old_d) = req.old_consensus_digests().next() {
904
2
                    let db_val = {
905
2
                        let s = self.store.lock().expect("Directory storage lock poisoned");
906
2
                        s.consensus_by_sha3_digest_of_signed_part(old_d)?
907
                    };
908
2
                    if let Some((old_consensus, meta)) = db_val {
909
2
                        info!("Applying a consensus diff");
910
2
                        let new_consensus = tor_consdiff::apply_diff(
911
2
                            old_consensus.as_str()?,
912
2
                            &text,
913
2
                            Some(*meta.sha3_256_of_signed()),
914
                        )?;
915
2
                        new_consensus.check_digest()?;
916
1
                        return Ok(new_consensus.to_string());
917
                    }
918
                }
919
                return Err(Error::Unwanted(
920
                    "Received a consensus diff we did not ask for",
921
                ));
922
2
            }
923
2
        }
924
4
        Ok(text)
925
6
    }
926
}
927

            
928
/// A degree of readiness for a given directory state object.
929
#[derive(Debug, Copy, Clone)]
930
enum Readiness {
931
    /// There is no more information to download.
932
    Complete,
933
    /// There is more information to download, but we don't need to
934
    Usable,
935
}
936

            
937
/// A "state" object used to represent our progress in downloading a
938
/// directory.
939
///
940
/// These state objects are not meant to know about the network, or
941
/// how to fetch documents at all.  Instead, they keep track of what
942
/// information they are missing, and what to do when they get that
943
/// information.
944
///
945
/// Every state object has two possible transitions: "resetting", and
946
/// "advancing".  Advancing happens when a state has no more work to
947
/// do, and needs to transform into a different kind of object.
948
/// Resetting happens when this state needs to go back to an initial
949
/// state in order to start over -- either because of an error or
950
/// because the information it has downloaded is no longer timely.
951
trait DirState: Send {
952
    /// Return a human-readable description of this state.
953
    fn describe(&self) -> String;
954
    /// Return a list of the documents we're missing.
955
    ///
956
    /// If every document on this list were to be loaded or downloaded, then
957
    /// the state should either become "ready to advance", or "complete."
958
    ///
959
    /// This list should never _grow_ on a given state; only advancing
960
    /// or resetting the state should add new DocIds that weren't
961
    /// there before.
962
    fn missing_docs(&self) -> Vec<DocId>;
963
    /// Describe whether this state has reached `ready` status.
964
    fn is_ready(&self, ready: Readiness) -> bool;
965
    /// Return true if this state can advance to another state via its
966
    /// `advance` method.
967
    fn can_advance(&self) -> bool;
968
    /// Add one or more documents from our cache; returns 'true' if there
969
    /// was any change in this state.
970
    ///
971
    /// If `storage` is provided, then we should write any state changes into
972
    /// it.  (We don't read from it in this method.)
973
    fn add_from_cache(
974
        &mut self,
975
        docs: HashMap<DocId, DocumentText>,
976
        storage: Option<&Mutex<DynStore>>,
977
    ) -> Result<bool>;
978

            
979
    /// Add information that we have just downloaded to this state; returns
980
    /// 'true' if there as any change in this state.
981
    ///
982
    /// This method receives a copy of the original request, and
983
    /// should reject any documents that do not pertain to it.
984
    ///
985
    /// If `storage` is provided, then we should write any accepted documents
986
    /// into `storage` so they can be saved in a cache.
987
    // TODO: It might be good to say "there was a change but also an
988
    // error" in this API if possible.
989
    // TODO: It would be better to not have this function be async,
990
    // once the `must_not_suspend` lint is stable.
991
    // TODO: this should take a "DirSource" too.
992
    fn add_from_download(
993
        &mut self,
994
        text: &str,
995
        request: &ClientRequest,
996
        storage: Option<&Mutex<DynStore>>,
997
    ) -> Result<bool>;
998
    /// Return a summary of this state as a [`DirStatus`].
999
    fn bootstrap_status(&self) -> event::DirStatus;

            
    /// Return a configuration for attempting downloads.
    fn dl_config(&self) -> Result<DownloadSchedule>;
    /// If possible, advance to the next state.
    fn advance(self: Box<Self>) -> Result<Box<dyn DirState>>;
    /// Return a time (if any) when downloaders should stop attempting to
    /// advance this state, and should instead reset it and start over.
    fn reset_time(&self) -> Option<SystemTime>;
    /// Reset this state and start over.
    fn reset(self: Box<Self>) -> Result<Box<dyn DirState>>;
}

            
/// Try to upgrade a weak reference to a DirMgr, and give an error on
/// failure.
7
fn upgrade_weak_ref<T>(weak: &Weak<T>) -> Result<Arc<T>> {
7
    Weak::upgrade(weak).ok_or(Error::ManagerDropped)
7
}

            
#[cfg(test)]
mod test {
    #![allow(clippy::unwrap_used)]
    use super::*;
    use crate::docmeta::{AuthCertMeta, ConsensusMeta};
    use std::time::Duration;
    use tempfile::TempDir;
    use tor_netdoc::doc::{authcert::AuthCertKeyIds, netstatus::Lifetime};

            
    pub(crate) fn new_mgr<R: Runtime>(runtime: R) -> (TempDir, DirMgr<R>) {
        let dir = TempDir::new().unwrap();
        let config = DirMgrConfig::builder()
            .cache_path(dir.path())
            .build()
            .unwrap();
        let dirmgr = DirMgr::from_config(config, runtime, None, false).unwrap();

            
        (dir, dirmgr)
    }

            
    #[test]
    fn failing_accessors() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let (_tempdir, mgr) = new_mgr(rt);

            
            assert!(mgr.circmgr().is_err());
            assert!(mgr.opt_netdir().is_none());
        });
    }

            
    #[test]
    fn load_and_store_internals() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let (_tempdir, mgr) = new_mgr(rt);

            
            let now = SystemTime::now();
            let tomorrow = now + Duration::from_secs(86400);
            let later = tomorrow + Duration::from_secs(86400);

            
            // Seed the storage with a bunch of junk.
            let d1 = [5_u8; 32];
            let d2 = [7; 32];
            let d3 = [42; 32];
            let d4 = [99; 20];
            let d5 = [12; 20];
            let certid1 = AuthCertKeyIds {
                id_fingerprint: d4.into(),
                sk_fingerprint: d5.into(),
            };
            let certid2 = AuthCertKeyIds {
                id_fingerprint: d5.into(),
                sk_fingerprint: d4.into(),
            };

            
            {
                let mut store = mgr.store.lock().unwrap();

            
                store
                    .store_microdescs(
                        &[
                            ("Fake micro 1", &d1),
                            ("Fake micro 2", &d2),
                            ("Fake micro 3", &d3),
                        ],
                        now,
                    )
                    .unwrap();

            
                #[cfg(feature = "routerdesc")]
                store
                    .store_routerdescs(&[("Fake rd1", now, &d4), ("Fake rd2", now, &d5)])
                    .unwrap();

            
                store
                    .store_authcerts(&[
                        (
                            AuthCertMeta::new(certid1, now, tomorrow),
                            "Fake certificate one",
                        ),
                        (
                            AuthCertMeta::new(certid2, now, tomorrow),
                            "Fake certificate two",
                        ),
                    ])
                    .unwrap();

            
                let cmeta = ConsensusMeta::new(
                    Lifetime::new(now, tomorrow, later).unwrap(),
                    [102; 32],
                    [103; 32],
                );
                store
                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
                    .unwrap();
            }

            
            // Try to get it with text().
            let t1 = mgr.text(&DocId::Microdesc(d1)).unwrap().unwrap();
            assert_eq!(t1.as_str(), Ok("Fake micro 1"));

            
            let t2 = mgr
                .text(&DocId::LatestConsensus {
                    flavor: ConsensusFlavor::Microdesc,
                    cache_usage: CacheUsage::CacheOkay,
                })
                .unwrap()
                .unwrap();
            assert_eq!(t2.as_str(), Ok("Fake consensus!"));

            
            let t3 = mgr.text(&DocId::Microdesc([255; 32])).unwrap();
            assert!(t3.is_none());

            
            // Now try texts()
            let d_bogus = DocId::Microdesc([255; 32]);
            let res = mgr
                .texts(vec![
                    DocId::Microdesc(d2),
                    DocId::Microdesc(d3),
                    d_bogus,
                    DocId::AuthCert(certid2),
                    #[cfg(feature = "routerdesc")]
                    DocId::RouterDesc(d5),
                ])
                .unwrap();
            assert_eq!(
                res.get(&DocId::Microdesc(d2)).unwrap().as_str(),
                Ok("Fake micro 2")
            );
            assert_eq!(
                res.get(&DocId::Microdesc(d3)).unwrap().as_str(),
                Ok("Fake micro 3")
            );
            assert!(res.get(&d_bogus).is_none());
            assert_eq!(
                res.get(&DocId::AuthCert(certid2)).unwrap().as_str(),
                Ok("Fake certificate two")
            );
            #[cfg(feature = "routerdesc")]
            assert_eq!(
                res.get(&DocId::RouterDesc(d5)).unwrap().as_str(),
                Ok("Fake rd2")
            );
        });
    }

            
    #[test]
    fn make_consensus_request() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let (_tempdir, mgr) = new_mgr(rt);

            
            let now = SystemTime::now();
            let tomorrow = now + Duration::from_secs(86400);
            let later = tomorrow + Duration::from_secs(86400);

            
            // Try with an empty store.
            let req = mgr
                .make_consensus_request(ConsensusFlavor::Microdesc)
                .unwrap();
            match req {
                ClientRequest::Consensus(r) => {
                    assert_eq!(r.old_consensus_digests().count(), 0);
                    assert_eq!(r.last_consensus_date(), None);
                }
                _ => panic!("Wrong request type"),
            }

            
            // Add a fake consensus record.
            let d_prev = [42; 32];
            {
                let mut store = mgr.store.lock().unwrap();

            
                let cmeta = ConsensusMeta::new(
                    Lifetime::new(now, tomorrow, later).unwrap(),
                    d_prev,
                    [103; 32],
                );
                store
                    .store_consensus(&cmeta, ConsensusFlavor::Microdesc, false, "Fake consensus!")
                    .unwrap();
            }

            
            // Now try again.
            let req = mgr
                .make_consensus_request(ConsensusFlavor::Microdesc)
                .unwrap();
            match req {
                ClientRequest::Consensus(r) => {
                    let ds: Vec<_> = r.old_consensus_digests().collect();
                    assert_eq!(ds.len(), 1);
                    assert_eq!(ds[0], &d_prev);
                    assert_eq!(r.last_consensus_date(), Some(now));
                }
                _ => panic!("Wrong request type"),
            }
        });
    }

            
    #[test]
    fn make_other_requests() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            use rand::Rng;
            let (_tempdir, mgr) = new_mgr(rt);

            
            let certid1 = AuthCertKeyIds {
                id_fingerprint: [99; 20].into(),
                sk_fingerprint: [100; 20].into(),
            };
            let mut rng = rand::thread_rng();
            #[cfg(feature = "routerdesc")]
            let rd_ids: Vec<[u8; 20]> = (0..1000).map(|_| rng.gen()).collect();
            let md_ids: Vec<[u8; 32]> = (0..1000).map(|_| rng.gen()).collect();

            
            // Try an authcert.
            let query = DocQuery::AuthCert(vec![certid1]);
            let reqs = mgr.query_into_requests(query).unwrap();
            assert_eq!(reqs.len(), 1);
            let req = &reqs[0];
            if let ClientRequest::AuthCert(r) = req {
                assert_eq!(r.keys().next(), Some(&certid1));
            } else {
                panic!();
            }

            
            // Try a bunch of mds.
            let query = DocQuery::Microdesc(md_ids);
            let reqs = mgr.query_into_requests(query).unwrap();
            assert_eq!(reqs.len(), 2);
            assert!(matches!(reqs[0], ClientRequest::Microdescs(_)));

            
            // Try a bunch of rds.
            #[cfg(feature = "routerdesc")]
            {
                let query = DocQuery::RouterDesc(rd_ids);
                let reqs = mgr.query_into_requests(query).unwrap();
                assert_eq!(reqs.len(), 2);
                assert!(matches!(reqs[0], ClientRequest::RouterDescs(_)));
            }
        });
    }

            
    #[test]
    fn expand_response() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let (_tempdir, mgr) = new_mgr(rt);

            
            // Try a simple request: nothing should happen.
            let q = DocId::Microdesc([99; 32]).into();
            let r = &mgr.query_into_requests(q).unwrap()[0];
            let expanded = mgr.expand_response_text(r, "ABC".to_string());
            assert_eq!(&expanded.unwrap(), "ABC");

            
            // Try a consensus response that doesn't look like a diff in
            // response to a query that doesn't ask for one.
            let latest_id = DocId::LatestConsensus {
                flavor: ConsensusFlavor::Microdesc,
                cache_usage: CacheUsage::CacheOkay,
            };
            let q: DocQuery = latest_id.into();
            let r = &mgr.query_into_requests(q.clone()).unwrap()[0];
            let expanded = mgr.expand_response_text(r, "DEF".to_string());
            assert_eq!(&expanded.unwrap(), "DEF");

            
            // Now stick some metadata and a string into the storage so that
            // we can ask for a diff.
            {
                let mut store = mgr.store.lock().unwrap();
                let now = SystemTime::now();
                let day = Duration::from_secs(86400);
                let d_in = [0x99; 32]; // This one, we can fake.
                let cmeta = ConsensusMeta::new(
                    Lifetime::new(now, now + day, now + 2 * day).unwrap(),
                    d_in,
                    d_in,
                );
                store
                    .store_consensus(
                        &cmeta,
                        ConsensusFlavor::Microdesc,
                        false,
                        "line 1\nline2\nline 3\n",
                    )
                    .unwrap();
            }

            
            // Try expanding something that isn't a consensus, even if we'd like
            // one.
            let r = &mgr.query_into_requests(q).unwrap()[0];
            let expanded = mgr.expand_response_text(r, "hello".to_string());
            assert_eq!(&expanded.unwrap(), "hello");

            
            // Finally, try "expanding" a diff (by applying it and checking the digest.
            let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 8382374ca766873eb0d2530643191c6eaa2c5e04afa554cbac349b5d0592d300
2c
replacement line
.
".to_string();
            let expanded = mgr.expand_response_text(r, diff);

            
            assert_eq!(expanded.unwrap(), "line 1\nreplacement line\nline 3\n");

            
            // If the digest is wrong, that should get rejected.
            let diff = "network-status-diff-version 1
hash 9999999999999999999999999999999999999999999999999999999999999999 9999999999999999999999999999999999999999999999999999999999999999
2c
replacement line
.
".to_string();
            let expanded = mgr.expand_response_text(r, diff);
            assert!(expanded.is_err());
        });
    }

            
    #[test]
    #[allow(clippy::bool_assert_comparison)]
    fn bool_resetter_works() {
        let bool = AtomicBool::new(false);
        fn example(bool: &AtomicBool) {
            bool.store(true, Ordering::SeqCst);
            let _resetter = BoolResetter {
                inner: bool,
                reset_to: false,
                ordering: Ordering::SeqCst,
            };
        }
        fn example_disarm(bool: &AtomicBool) {
            bool.store(true, Ordering::SeqCst);
            let resetter = BoolResetter {
                inner: bool,
                reset_to: false,
                ordering: Ordering::SeqCst,
            };
            resetter.disarm();
        }
        example(&bool);
        assert_eq!(bool.load(Ordering::SeqCst), false);
        example_disarm(&bool);
        assert_eq!(bool.load(Ordering::SeqCst), true);
    }
}