1
//! Implementation for the primary directory state machine.
2
//!
3
//! There are three (active) states that a download can be in: looking
4
//! for a consensus ([`GetConsensusState`]), looking for certificates
5
//! to validate that consensus ([`GetCertsState`]), and looking for
6
//! microdescriptors ([`GetMicrodescsState`]).
7
//!
8
//! These states have no contact with the network, and are purely
9
//! reactive to other code that drives them.  See the
10
//! [`bootstrap`](crate::bootstrap) module for functions that actually
11
//! load or download directory information.
12

            
13
use rand::Rng;
14
use std::collections::{HashMap, HashSet};
15
use std::fmt::Debug;
16
use std::sync::{Arc, Mutex, Weak};
17
use std::time::{Duration, SystemTime};
18
use time::OffsetDateTime;
19
use tor_error::internal;
20
use tor_netdir::{MdReceiver, NetDir, PartialNetDir};
21
use tor_netdoc::doc::netstatus::Lifetime;
22
use tracing::{info, warn};
23

            
24
use crate::event::{DirStatus, DirStatusInner};
25

            
26
use crate::storage::{DynStore, EXPIRATION_DEFAULTS};
27
use crate::{
28
    docmeta::{AuthCertMeta, ConsensusMeta},
29
    retry::DownloadSchedule,
30
    shared_ref::SharedMutArc,
31
    CacheUsage, ClientRequest, DirMgrConfig, DirState, DocId, DocumentText, Error, Readiness,
32
    Result,
33
};
34
use crate::{DirEvent, DocSource};
35
use tor_checkable::{ExternallySigned, SelfSigned, Timebound};
36
use tor_llcrypto::pk::rsa::RsaIdentity;
37
use tor_netdoc::doc::{
38
    microdesc::{MdDigest, Microdesc},
39
    netstatus::MdConsensus,
40
};
41
use tor_netdoc::{
42
    doc::{
43
        authcert::{AuthCert, AuthCertKeyIds},
44
        microdesc::MicrodescReader,
45
        netstatus::{ConsensusFlavor, UnvalidatedMdConsensus},
46
    },
47
    AllowAnnotations,
48
};
49
use tor_rtcompat::Runtime;
50

            
51
/// An object where we can put a usable netdir.
52
///
53
/// Note that there's only one implementation for this trait: DirMgr.
54
/// We make this a trait anyway to make sure that the different states
55
/// in this module can _only_ interact with the DirMgr through
56
/// modifying the NetDir and looking at the configuration.
57
pub(crate) trait WriteNetDir: 'static + Sync + Send {
58
    /// Return a DirMgrConfig to use when asked how to retry downloads,
59
    /// or when we need to find a list of descriptors.
60
    fn config(&self) -> Arc<DirMgrConfig>;
61

            
62
    /// Return a reference where we can write or modify a NetDir.
63
    fn netdir(&self) -> &SharedMutArc<NetDir>;
64

            
65
    /// Called to note that the consensus stored in [`Self::netdir()`] has been
66
    /// changed.
67
    fn netdir_consensus_changed(&self);
68

            
69
    /// Called to note that the descriptors stored in
70
    /// [`Self::netdir()`] have been changed.
71
    fn netdir_descriptors_changed(&self);
72

            
73
    /// Checks whether the given `netdir` is ready to replace the previous
74
    /// one.
75
    ///
76
    /// This is in addition to checks used when upgrading from a PartialNetDir.
77
1
    fn netdir_is_sufficient(&self, _netdir: &NetDir) -> bool {
78
1
        true
79
1
    }
80

            
81
    /// Called to find the current time.
82
    ///
83
    /// This is just `SystemTime::now()` in production, but for
84
    /// testing it is helpful to be able to mock our our current view
85
    /// of the time.
86
    fn now(&self) -> SystemTime;
87
}
88

            
89
impl<R: Runtime> WriteNetDir for crate::DirMgr<R> {
90
    fn config(&self) -> Arc<DirMgrConfig> {
91
        self.config.get()
92
    }
93
    fn netdir(&self) -> &SharedMutArc<NetDir> {
94
        &self.netdir
95
    }
96
    fn netdir_consensus_changed(&self) {
97
        self.events.publish(DirEvent::NewConsensus);
98
    }
99
    fn netdir_descriptors_changed(&self) {
100
        self.events.publish(DirEvent::NewDescriptors);
101
    }
102
    fn netdir_is_sufficient(&self, netdir: &NetDir) -> bool {
103
        match &self.circmgr {
104
            Some(circmgr) => circmgr.netdir_is_sufficient(netdir),
105
            None => true, // no circmgr? then we can use anything.
106
        }
107
    }
108
    fn now(&self) -> SystemTime {
109
        SystemTime::now()
110
    }
111
}
112

            
113
/// Initial state: fetching or loading a consensus directory.
114
#[derive(Clone, Debug)]
115
pub(crate) struct GetConsensusState<DM: WriteNetDir> {
116
    /// How should we get the consensus from the cache, if at all?
117
    cache_usage: CacheUsage,
118

            
119
    /// If present, a time after which we want our consensus to have
120
    /// been published.
121
    //
122
    // TODO: This is not yet used everywhere it could be.  In the future maybe
123
    // it should be inserted into the DocId::LatestConsensus  alternative rather
124
    // than being recalculated in make_consensus_request,
125
    after: Option<SystemTime>,
126

            
127
    /// If present, our next state.
128
    ///
129
    /// (This is present once we have a consensus.)
130
    next: Option<GetCertsState<DM>>,
131

            
132
    /// A list of RsaIdentity for the authorities that we believe in.
133
    ///
134
    /// No consensus can be valid unless it purports to be signed by
135
    /// more than half of these authorities.
136
    authority_ids: Vec<RsaIdentity>,
137

            
138
    /// A weak reference to the directory manager that wants us to
139
    /// fetch this information.  When this references goes away, we exit.
140
    writedir: Weak<DM>,
141
}
142

            
143
impl<DM: WriteNetDir> GetConsensusState<DM> {
144
    /// Create a new GetConsensusState from a weak reference to a
145
    /// directory manager and a `cache_usage` flag.
146
7
    pub(crate) fn new(writedir: Weak<DM>, cache_usage: CacheUsage) -> Result<Self> {
147
7
        let (authority_ids, after) = if let Some(writedir) = Weak::upgrade(&writedir) {
148
7
            let ids: Vec<_> = writedir
149
7
                .config()
150
7
                .authorities()
151
7
                .iter()
152
21
                .map(|auth| *auth.v3ident())
153
7
                .collect();
154
7
            let after = writedir
155
7
                .netdir()
156
7
                .get()
157
7
                .map(|nd| nd.lifetime().valid_after());
158
7

            
159
7
            (ids, after)
160
        } else {
161
            return Err(Error::ManagerDropped);
162
        };
163
7
        Ok(GetConsensusState {
164
7
            cache_usage,
165
7
            after,
166
7
            next: None,
167
7
            authority_ids,
168
7
            writedir,
169
7
        })
170
7
    }
171
}
172

            
173
impl<DM: WriteNetDir> DirState for GetConsensusState<DM> {
174
4
    fn describe(&self) -> String {
175
4
        if self.next.is_some() {
176
1
            "About to fetch certificates."
177
        } else {
178
3
            match self.cache_usage {
179
                CacheUsage::CacheOnly => "Looking for a cached consensus.",
180
3
                CacheUsage::CacheOkay => "Looking for a consensus.",
181
                CacheUsage::MustDownload => "Downloading a consensus.",
182
            }
183
        }
184
4
        .to_string()
185
4
    }
186
2
    fn missing_docs(&self) -> Vec<DocId> {
187
2
        if self.can_advance() {
188
1
            return Vec::new();
189
1
        }
190
1
        let flavor = ConsensusFlavor::Microdesc;
191
1
        vec![DocId::LatestConsensus {
192
1
            flavor,
193
1
            cache_usage: self.cache_usage,
194
1
        }]
195
2
    }
196
2
    fn is_ready(&self, _ready: Readiness) -> bool {
197
2
        false
198
2
    }
199
5
    fn can_advance(&self) -> bool {
200
5
        self.next.is_some()
201
5
    }
202
    fn bootstrap_status(&self) -> DirStatus {
203
1
        if let Some(next) = &self.next {
204
            next.bootstrap_status()
205
        } else {
206
1
            DirStatusInner::NoConsensus { after: self.after }.into()
207
        }
208
1
    }
209
    fn dl_config(&self) -> Result<DownloadSchedule> {
210
1
        if let Some(wd) = Weak::upgrade(&self.writedir) {
211
1
            Ok(*wd.config().schedule().retry_consensus())
212
        } else {
213
            Err(Error::ManagerDropped)
214
        }
215
1
    }
216
1
    fn add_from_cache(
217
1
        &mut self,
218
1
        docs: HashMap<DocId, DocumentText>,
219
1
        _storage: Option<&Mutex<DynStore>>,
220
1
    ) -> Result<bool> {
221
1
        let text = match docs.into_iter().next() {
222
            None => return Ok(false),
223
            Some((
224
                DocId::LatestConsensus {
225
                    flavor: ConsensusFlavor::Microdesc,
226
                    ..
227
                },
228
1
                text,
229
1
            )) => text,
230
            _ => return Err(Error::Unwanted("Not an md consensus")),
231
        };
232

            
233
1
        let source = DocSource::LocalCache;
234
1
        self.add_consensus_text(source, text.as_str().map_err(Error::BadUtf8InCache)?)
235
1
            .map(|meta| meta.is_some())
236
1
    }
237
5
    fn add_from_download(
238
5
        &mut self,
239
5
        text: &str,
240
5
        _request: &ClientRequest,
241
5
        storage: Option<&Mutex<DynStore>>,
242
5
    ) -> Result<bool> {
243
5
        let source = DocSource::DirServer {};
244
5
        if let Some(meta) = self.add_consensus_text(source, text)? {
245
3
            if let Some(store) = storage {
246
1
                let mut w = store.lock().expect("Directory storage lock poisoned");
247
1
                w.store_consensus(meta, ConsensusFlavor::Microdesc, true, text)?;
248
2
            }
249
3
            Ok(true)
250
        } else {
251
            Ok(false)
252
        }
253
5
    }
254
3
    fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
255
3
        Ok(match self.next {
256
3
            Some(next) => Box::new(next),
257
            None => self,
258
        })
259
3
    }
260
1
    fn reset_time(&self) -> Option<SystemTime> {
261
1
        None
262
1
    }
263
    fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
264
        Ok(self)
265
    }
266
}
267

            
268
impl<DM: WriteNetDir> GetConsensusState<DM> {
269
    /// Helper: try to set the current consensus text from an input
270
    /// string `text`.  Refuse it if the authorities could never be
271
    /// correct, or if it is ill-formed.
272
6
    fn add_consensus_text(
273
6
        &mut self,
274
6
        source: DocSource,
275
6
        text: &str,
276
6
    ) -> Result<Option<&ConsensusMeta>> {
277
        // Try to parse it and get its metadata.
278
5
        let (consensus_meta, unvalidated) = {
279
5
            let (signedval, remainder, parsed) =
280
6
                MdConsensus::parse(text).map_err(|e| Error::from_netdoc(source.clone(), e))?;
281
5
            let now = current_time(&self.writedir)?;
282
5
            if let Ok(timely) = parsed.check_valid_at(&now) {
283
5
                let meta = ConsensusMeta::from_unvalidated(signedval, remainder, &timely);
284
5
                (meta, timely)
285
            } else {
286
                return Ok(None);
287
            }
288
        };
289

            
290
        // Check out what authorities we believe in, and see if enough
291
        // of them are purported to have signed this consensus.
292
5
        let n_authorities = self.authority_ids.len() as u16;
293
5
        let unvalidated = unvalidated.set_n_authorities(n_authorities);
294
5

            
295
5
        let id_refs: Vec<_> = self.authority_ids.iter().collect();
296
5
        if !unvalidated.authorities_are_correct(&id_refs[..]) {
297
1
            return Err(Error::UnrecognizedAuthorities);
298
4
        }
299
4

            
300
4
        // Make a set of all the certificates we want -- the subset of
301
4
        // those listed on the consensus that we would indeed accept as
302
4
        // authoritative.
303
4
        let desired_certs = unvalidated
304
4
            .signing_cert_ids()
305
12
            .filter(|m| self.recognizes_authority(&m.id_fingerprint))
306
4
            .collect();
307
4

            
308
4
        self.next = Some(GetCertsState {
309
4
            cache_usage: self.cache_usage,
310
4
            consensus_source: source,
311
4
            unvalidated,
312
4
            consensus_meta,
313
4
            missing_certs: desired_certs,
314
4
            certs: Vec::new(),
315
4
            writedir: Weak::clone(&self.writedir),
316
4
        });
317
4

            
318
4
        // Unwrap should be safe because `next` was just assigned
319
4
        #[allow(clippy::unwrap_used)]
320
4
        Ok(Some(&self.next.as_ref().unwrap().consensus_meta))
321
6
    }
322

            
323
    /// Return true if `id` is an authority identity we recognize
324
12
    fn recognizes_authority(&self, id: &RsaIdentity) -> bool {
325
20
        self.authority_ids.iter().any(|auth| auth == id)
326
12
    }
327
}
328

            
329
/// Second state: fetching or loading authority certificates.
330
///
331
/// TODO: we should probably do what C tor does, and try to use the
332
/// same directory that gave us the consensus.
333
///
334
/// TODO SECURITY: This needs better handling for the DOS attack where
335
/// we are given a bad consensus signed with fictional certificates
336
/// that we can never find.
337
#[derive(Clone, Debug)]
338
struct GetCertsState<DM: WriteNetDir> {
339
    /// The cache usage we had in mind when we began.  Used to reset.
340
    cache_usage: CacheUsage,
341
    /// Where did we get our consensus?
342
    consensus_source: DocSource,
343
    /// The consensus that we are trying to validate.
344
    unvalidated: UnvalidatedMdConsensus,
345
    /// Metadata for the consensus.
346
    consensus_meta: ConsensusMeta,
347
    /// A set of the certificate keypairs for the certificates we don't
348
    /// have yet.
349
    missing_certs: HashSet<AuthCertKeyIds>,
350
    /// A list of the certificates we've been able to load or download.
351
    certs: Vec<AuthCert>,
352
    /// Reference to our directory manager.
353
    writedir: Weak<DM>,
354
}
355

            
356
impl<DM: WriteNetDir> DirState for GetCertsState<DM> {
357
2
    fn describe(&self) -> String {
358
2
        let total = self.certs.len() + self.missing_certs.len();
359
2
        format!(
360
2
            "Downloading certificates for consensus (we are missing {}/{}).",
361
2
            self.missing_certs.len(),
362
2
            total
363
2
        )
364
2
    }
365
5
    fn missing_docs(&self) -> Vec<DocId> {
366
5
        self.missing_certs
367
5
            .iter()
368
6
            .map(|id| DocId::AuthCert(*id))
369
5
            .collect()
370
5
    }
371
2
    fn is_ready(&self, _ready: Readiness) -> bool {
372
2
        false
373
2
    }
374
4
    fn can_advance(&self) -> bool {
375
4
        self.unvalidated.key_is_correct(&self.certs[..]).is_ok()
376
4
    }
377
2
    fn bootstrap_status(&self) -> DirStatus {
378
2
        let n_certs = self.certs.len();
379
2
        let n_missing_certs = self.missing_certs.len();
380
2
        let total_certs = n_missing_certs + n_certs;
381
2
        DirStatusInner::FetchingCerts {
382
2
            lifetime: self.consensus_meta.lifetime().clone(),
383
2
            n_certs: (n_certs as u16, total_certs as u16),
384
2
        }
385
2
        .into()
386
2
    }
387
    fn dl_config(&self) -> Result<DownloadSchedule> {
388
1
        if let Some(wd) = Weak::upgrade(&self.writedir) {
389
1
            Ok(*wd.config().schedule().retry_certs())
390
        } else {
391
            Err(Error::ManagerDropped)
392
        }
393
1
    }
394
1
    fn add_from_cache(
395
1
        &mut self,
396
1
        docs: HashMap<DocId, DocumentText>,
397
1
        _storage: Option<&Mutex<DynStore>>,
398
1
    ) -> Result<bool> {
399
1
        let mut changed = false;
400
        // Here we iterate over the documents we want, taking them from
401
        // our input and remembering them.
402
2
        for id in &self.missing_docs() {
403
2
            if let Some(cert) = docs.get(id) {
404
1
                let text = cert.as_str().map_err(Error::BadUtf8InCache)?;
405
1
                let parsed = AuthCert::parse(text)
406
1
                    .map_err(|e| Error::from_netdoc(DocSource::LocalCache, e))?
407
1
                    .check_signature()?;
408
1
                let now = current_time(&self.writedir)?;
409
1
                if let Ok(cert) = parsed.check_valid_at(&now) {
410
1
                    self.missing_certs.remove(cert.key_ids());
411
1
                    self.certs.push(cert);
412
1
                    changed = true;
413
1
                } else {
414
                    warn!("Got a cert from our cache that we couldn't parse");
415
                }
416
1
            }
417
        }
418
1
        Ok(changed)
419
1
    }
420
2
    fn add_from_download(
421
2
        &mut self,
422
2
        text: &str,
423
2
        request: &ClientRequest,
424
2
        storage: Option<&Mutex<DynStore>>,
425
2
    ) -> Result<bool> {
426
2
        let asked_for: HashSet<_> = match request {
427
2
            ClientRequest::AuthCert(a) => a.keys().collect(),
428
            _ => return Err(internal!("expected an AuthCert request").into()),
429
        };
430

            
431
2
        let mut newcerts = Vec::new();
432
2
        for cert in AuthCert::parse_multiple(text) {
433
2
            if let Ok(parsed) = cert {
434
2
                let s = parsed
435
2
                    .within(text)
436
2
                    .expect("Certificate was not in input as expected");
437
2
                if let Ok(wellsigned) = parsed.check_signature() {
438
2
                    let now = current_time(&self.writedir)?;
439
2
                    if let Ok(timely) = wellsigned.check_valid_at(&now) {
440
2
                        newcerts.push((timely, s));
441
2
                    }
442
                } else {
443
                    // TODO: note the source.
444
                    warn!("Badly signed certificate received and discarded.");
445
                }
446
            } else {
447
                // TODO: note the source.
448
                warn!("Unparsable certificate received and discarded.");
449
            }
450
        }
451

            
452
        // Now discard any certs we didn't ask for.
453
2
        let len_orig = newcerts.len();
454
2
        newcerts.retain(|(cert, _)| asked_for.contains(cert.key_ids()));
455
2
        if newcerts.len() != len_orig {
456
1
            warn!("Discarding certificates that we didn't ask for.");
457
1
        }
458

            
459
        // We want to exit early if we aren't saving any certificates.
460
2
        if newcerts.is_empty() {
461
1
            return Ok(false);
462
1
        }
463

            
464
1
        if let Some(store) = storage {
465
            // Write the certificates to the store.
466
1
            let v: Vec<_> = newcerts[..]
467
1
                .iter()
468
1
                .map(|(cert, s)| (AuthCertMeta::from_authcert(cert), *s))
469
1
                .collect();
470
1
            let mut w = store.lock().expect("Directory storage lock poisoned");
471
1
            w.store_authcerts(&v[..])?;
472
        }
473

            
474
        // Remember the certificates in this state, and remove them
475
        // from our list of missing certs.
476
1
        let mut changed = false;
477
2
        for (cert, _) in newcerts {
478
1
            let ids = cert.key_ids();
479
1
            if self.missing_certs.contains(ids) {
480
1
                self.missing_certs.remove(ids);
481
1
                self.certs.push(cert);
482
1
                changed = true;
483
1
            }
484
        }
485

            
486
1
        Ok(changed)
487
2
    }
488
1
    fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
489
1
        if self.can_advance() {
490
1
            let consensus_source = self.consensus_source.clone();
491
1
            let validated = self
492
1
                .unvalidated
493
1
                .check_signature(&self.certs[..])
494
1
                .map_err(|e| Error::from_netdoc(consensus_source, e))?;
495
1
            Ok(Box::new(GetMicrodescsState::new(
496
1
                self.cache_usage,
497
1
                validated,
498
1
                self.consensus_meta,
499
1
                self.writedir,
500
1
            )?))
501
        } else {
502
            Ok(self)
503
        }
504
1
    }
505
1
    fn reset_time(&self) -> Option<SystemTime> {
506
1
        Some(self.consensus_meta.lifetime().valid_until())
507
1
    }
508
1
    fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
509
1
        Ok(Box::new(GetConsensusState::new(
510
1
            self.writedir,
511
1
            self.cache_usage,
512
1
        )?))
513
1
    }
514
}
515

            
516
/// Final state: we're fetching or loading microdescriptors
517
#[derive(Debug, Clone)]
518
struct GetMicrodescsState<DM: WriteNetDir> {
519
    /// How should we get the consensus from the cache, if at all?
520
    cache_usage: CacheUsage,
521
    /// The digests of the microdescriptors we are missing.
522
    missing: HashSet<MdDigest>,
523
    /// Total number of microdescriptors listed in the consensus.
524
    n_microdescs: usize,
525
    /// The dirmgr to inform about a usable directory.
526
    writedir: Weak<DM>,
527
    /// The current status of our netdir, if it is not yet ready to become the
528
    /// main netdir in use for the TorClient.
529
    partial: Option<PendingNetDir>,
530
    /// Metadata for the current consensus.
531
    meta: ConsensusMeta,
532
    /// A pending list of microdescriptor digests whose
533
    /// "last-listed-at" times we should update.
534
    newly_listed: Vec<MdDigest>,
535
    /// A time after which we should try to replace this directory and
536
    /// find a new one.  Since this is randomized, we only compute it
537
    /// once.
538
    reset_time: SystemTime,
539
    /// If true, we should tell the storage to expire any outdated
540
    /// information when we finish getting a usable consensus.
541
    ///
542
    /// Only cleared for testing.
543
    expire_when_complete: bool,
544
}
545

            
546
/// A network directory that is not yet ready to become _the_ current network directory.
547
#[derive(Debug, Clone)]
548
enum PendingNetDir {
549
    /// A NetDir for which we have a consensus, but not enough microdescriptors.
550
    Partial(PartialNetDir),
551
    /// A NetDir that is "good enough to build circuits", but which we can't yet
552
    /// use because our `writedir` says that it isn't yet sufficient. Probably
553
    /// that is because we're waiting to download a microdescriptor for one or
554
    /// more primary guards.
555
    WaitingForGuards(NetDir),
556
}
557

            
558
impl PendingNetDir {
559
    /// Add the provided microdescriptor to this pending directory.
560
    ///
561
    /// Return true if we indeed wanted (and added) this descriptor.
562
4
    fn add_microdesc(&mut self, md: Microdesc) -> bool {
563
4
        match self {
564
4
            PendingNetDir::Partial(partial) => partial.add_microdesc(md),
565
            PendingNetDir::WaitingForGuards(netdir) => netdir.add_microdesc(md),
566
        }
567
4
    }
568

            
569
    /// Try to move `self` as far as possible towards a complete, netdir with
570
    /// enough directory information (according to `writedir`).
571
    ///
572
    /// On success, return `Ok(netdir)` with a new usable [`NetDir`].  On error,
573
    /// return a [`PendingNetDir`] representing any progress we were able to
574
    /// make.
575
5
    fn upgrade<WD: WriteNetDir>(mut self, writedir: &WD) -> std::result::Result<NetDir, Self> {
576
6
        loop {
577
6
            match self {
578
5
                PendingNetDir::Partial(partial) => match partial.unwrap_if_sufficient() {
579
1
                    Ok(netdir) => {
580
1
                        self = PendingNetDir::WaitingForGuards(netdir);
581
1
                    }
582
4
                    Err(partial) => return Err(PendingNetDir::Partial(partial)),
583
                },
584
1
                PendingNetDir::WaitingForGuards(netdir) => {
585
1
                    if writedir.netdir_is_sufficient(&netdir) {
586
1
                        return Ok(netdir);
587
                    } else {
588
                        return Err(PendingNetDir::WaitingForGuards(netdir));
589
                    }
590
                }
591
            }
592
        }
593
5
    }
594
}
595

            
596
impl<DM: WriteNetDir> GetMicrodescsState<DM> {
597
    /// Create a new [`GetMicrodescsState`] from a provided
598
    /// microdescriptor consensus.
599
3
    fn new(
600
3
        cache_usage: CacheUsage,
601
3
        consensus: MdConsensus,
602
3
        meta: ConsensusMeta,
603
3
        writedir: Weak<DM>,
604
3
    ) -> Result<Self> {
605
3
        let reset_time = consensus.lifetime().valid_until();
606
3
        let n_microdescs = consensus.relays().len();
607

            
608
3
        let partial_dir = match Weak::upgrade(&writedir) {
609
3
            Some(wd) => {
610
3
                let config = wd.config();
611
3
                let params = config.override_net_params();
612
3
                let mut dir = PartialNetDir::new(consensus, Some(params));
613
3
                if let Some(old_dir) = wd.netdir().get() {
614
                    dir.fill_from_previous_netdir(&old_dir);
615
3
                }
616
3
                dir
617
            }
618
            None => return Err(Error::ManagerDropped),
619
        };
620

            
621
3
        let missing = partial_dir.missing_microdescs().map(Clone::clone).collect();
622
3
        let mut result = GetMicrodescsState {
623
3
            cache_usage,
624
3
            n_microdescs,
625
3
            missing,
626
3
            writedir,
627
3
            partial: Some(PendingNetDir::Partial(partial_dir)),
628
3
            meta,
629
3
            newly_listed: Vec::new(),
630
3
            reset_time,
631
3
            expire_when_complete: true,
632
3
        };
633
3

            
634
3
        result.consider_upgrade();
635
3
        Ok(result)
636
3
    }
637

            
638
    /// Add a bunch of microdescriptors to the in-progress netdir.
639
    ///
640
    /// Return true if the netdir has just become usable.
641
    fn register_microdescs<I>(&mut self, mds: I) -> bool
642
    where
643
        I: IntoIterator<Item = Microdesc>,
644
    {
645
2
        if let Some(p) = &mut self.partial {
646
6
            for md in mds {
647
4
                self.newly_listed.push(*md.digest());
648
4
                p.add_microdesc(md);
649
4
            }
650
2
            return self.consider_upgrade();
651
        } else if let Some(wd) = Weak::upgrade(&self.writedir) {
652
            let _ = wd.netdir().mutate(|netdir| {
653
                for md in mds {
654
                    netdir.add_microdesc(md);
655
                }
656
                wd.netdir_descriptors_changed();
657
                Ok(())
658
            });
659
        }
660
        false
661
2
    }
662

            
663
    /// Check whether this netdir we're building has _just_ become
664
    /// usable when it was not previously usable.  If so, tell the
665
    /// dirmgr about it and return true; otherwise return false.
666
    fn consider_upgrade(&mut self) -> bool {
667
5
        if let Some(p) = self.partial.take() {
668
5
            if let Some(wd) = Weak::upgrade(&self.writedir) {
669
5
                match p.upgrade(wd.as_ref()) {
670
1
                    Ok(mut netdir) => {
671
1
                        self.reset_time = pick_download_time(netdir.lifetime());
672
1
                        // We re-set the parameters here, in case they have been
673
1
                        // reconfigured.
674
1
                        netdir.replace_overridden_parameters(wd.config().override_net_params());
675
1
                        wd.netdir().replace(netdir);
676
1
                        wd.netdir_consensus_changed();
677
1
                        wd.netdir_descriptors_changed();
678
1
                        return true;
679
                    }
680
4
                    Err(pending) => self.partial = Some(pending),
681
                }
682
            }
683
        }
684
4
        false
685
5
    }
686

            
687
    /// Mark the consensus that we're getting MDs for as non-pending in the
688
    /// storage.
689
    ///
690
    /// Called when a consensus is no longer pending.
691
    fn mark_consensus_usable(&self, storage: Option<&Mutex<DynStore>>) -> Result<()> {
692
1
        if let Some(store) = storage {
693
1
            let mut store = store.lock().expect("Directory storage lock poisoned");
694
1
            info!("Marked consensus usable.");
695
1
            store.mark_consensus_usable(&self.meta)?;
696
            // Now that a consensus is usable, older consensuses may
697
            // need to expire.
698
1
            if self.expire_when_complete {
699
                store.expire_all(&EXPIRATION_DEFAULTS)?;
700
1
            }
701
        }
702
1
        Ok(())
703
1
    }
704
}
705

            
706
impl<DM: WriteNetDir> DirState for GetMicrodescsState<DM> {
707
2
    fn describe(&self) -> String {
708
2
        format!(
709
2
            "Downloading microdescriptors (we are missing {}).",
710
2
            self.missing.len()
711
2
        )
712
2
    }
713
3
    fn missing_docs(&self) -> Vec<DocId> {
714
7
        self.missing.iter().map(|d| DocId::Microdesc(*d)).collect()
715
3
    }
716
9
    fn is_ready(&self, ready: Readiness) -> bool {
717
9
        match ready {
718
3
            Readiness::Complete => self.missing.is_empty(),
719
6
            Readiness::Usable => self.partial.is_none(),
720
        }
721
9
    }
722
2
    fn can_advance(&self) -> bool {
723
2
        false
724
2
    }
725
2
    fn bootstrap_status(&self) -> DirStatus {
726
2
        let n_present = self.n_microdescs - self.missing.len();
727
2
        DirStatusInner::Validated {
728
2
            lifetime: self.meta.lifetime().clone(),
729
2
            n_mds: (n_present as u32, self.n_microdescs as u32),
730
2
            usable: self.is_ready(Readiness::Usable),
731
2
        }
732
2
        .into()
733
2
    }
734
    fn dl_config(&self) -> Result<DownloadSchedule> {
735
1
        if let Some(wd) = Weak::upgrade(&self.writedir) {
736
1
            Ok(*wd.config().schedule().retry_microdescs())
737
        } else {
738
            Err(Error::ManagerDropped)
739
        }
740
1
    }
741
1
    fn add_from_cache(
742
1
        &mut self,
743
1
        docs: HashMap<DocId, DocumentText>,
744
1
        storage: Option<&Mutex<DynStore>>,
745
1
    ) -> Result<bool> {
746
1
        let mut microdescs = Vec::new();
747
2
        for (id, text) in docs {
748
1
            if let DocId::Microdesc(digest) = id {
749
1
                if !self.missing.remove(&digest) {
750
                    warn!("Bug: loaded a microdesc that we didn't want from the cache.");
751
                    continue;
752
1
                }
753
1
                if let Ok(md) = Microdesc::parse(text.as_str().map_err(Error::BadUtf8InCache)?) {
754
1
                    if md.digest() == &digest {
755
1
                        microdescs.push(md);
756
1
                        continue;
757
                    }
758
                }
759
                warn!("Found a mismatched microdescriptor in cache; ignoring");
760
            }
761
        }
762

            
763
1
        let changed = !microdescs.is_empty();
764
1
        if self.register_microdescs(microdescs) {
765
            // Just stopped being pending.
766
            self.mark_consensus_usable(storage)?;
767
1
        }
768

            
769
1
        Ok(changed)
770
1
    }
771

            
772
1
    fn add_from_download(
773
1
        &mut self,
774
1
        text: &str,
775
1
        request: &ClientRequest,
776
1
        storage: Option<&Mutex<DynStore>>,
777
1
    ) -> Result<bool> {
778
1
        let requested: HashSet<_> = if let ClientRequest::Microdescs(req) = request {
779
1
            req.digests().collect()
780
        } else {
781
            return Err(internal!("expected a microdesc request").into());
782
        };
783
1
        let mut new_mds = Vec::new();
784
3
        for anno in MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed).flatten() {
785
3
            let txt = anno
786
3
                .within(text)
787
3
                .expect("annotation not from within text as expected");
788
3
            let md = anno.into_microdesc();
789
3
            if !requested.contains(md.digest()) {
790
                warn!(
791
                    "Received microdescriptor we did not ask for: {:?}",
792
                    md.digest()
793
                );
794
                continue;
795
3
            }
796
3
            self.missing.remove(md.digest());
797
3
            new_mds.push((txt, md));
798
        }
799

            
800
1
        let mark_listed = self.meta.lifetime().valid_after();
801
1
        if let Some(store) = storage {
802
1
            let mut s = store
803
1
                .lock()
804
1
                //.get_mut()
805
1
                .expect("Directory storage lock poisoned");
806
1
            if !self.newly_listed.is_empty() {
807
1
                s.update_microdescs_listed(&self.newly_listed, mark_listed)?;
808
1
                self.newly_listed.clear();
809
            }
810
1
            if !new_mds.is_empty() {
811
1
                s.store_microdescs(
812
1
                    &new_mds
813
1
                        .iter()
814
3
                        .map(|(text, md)| (*text, md.digest()))
815
1
                        .collect::<Vec<_>>(),
816
1
                    mark_listed,
817
1
                )?;
818
            }
819
        }
820
3
        if self.register_microdescs(new_mds.into_iter().map(|(_, md)| md)) {
821
            // Just stopped being pending.
822
1
            self.mark_consensus_usable(storage)?;
823
        }
824
1
        Ok(true)
825
1
    }
826
    fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
827
        Ok(self)
828
    }
829
1
    fn reset_time(&self) -> Option<SystemTime> {
830
1
        Some(self.reset_time)
831
1
    }
832
1
    fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
833
1
        let cache_usage = if self.cache_usage == CacheUsage::CacheOnly {
834
            // Cache only means we can't ever download.
835
            CacheUsage::CacheOnly
836
1
        } else if self.is_ready(Readiness::Usable) {
837
            // If we managed to bootstrap a usable consensus, then we won't
838
            // accept our next consensus from the cache.
839
            CacheUsage::MustDownload
840
        } else {
841
            // If we didn't manage to bootstrap a usable consensus, then we can
842
            // indeed try again with the one in the cache.
843
            // TODO(nickm) is this right?
844
1
            CacheUsage::CacheOkay
845
        };
846
1
        Ok(Box::new(GetConsensusState::new(
847
1
            self.writedir,
848
1
            cache_usage,
849
1
        )?))
850
1
    }
851
}
852

            
853
/// Choose a random download time to replace a consensus whose lifetime
854
/// is `lifetime`.
855
101
fn pick_download_time(lifetime: &Lifetime) -> SystemTime {
856
101
    let (lowbound, uncertainty) = client_download_range(lifetime);
857
101
    let zero = Duration::new(0, 0);
858
101
    let t = lowbound + rand::thread_rng().gen_range(zero..uncertainty);
859
101
    info!("The current consensus is fresh until {}, and valid until {}. I've picked {} as the earliest time to replace it.",
860
          OffsetDateTime::from(lifetime.fresh_until()),
861
          OffsetDateTime::from(lifetime.valid_until()),
862
          OffsetDateTime::from(t));
863
101
    t
864
101
}
865

            
866
/// Based on the lifetime for a consensus, return the time range during which
867
/// clients should fetch the next one.
868
102
fn client_download_range(lt: &Lifetime) -> (SystemTime, Duration) {
869
102
    let valid_after = lt.valid_after();
870
102
    let fresh_until = lt.fresh_until();
871
102
    let valid_until = lt.valid_until();
872
102
    let voting_interval = fresh_until
873
102
        .duration_since(valid_after)
874
102
        .expect("valid-after must precede fresh-until");
875
102
    let whole_lifetime = valid_until
876
102
        .duration_since(valid_after)
877
102
        .expect("valid-after must precede valid-until");
878
102

            
879
102
    // From dir-spec:
880
102
    // "This time is chosen uniformly at random from the interval
881
102
    // between the time 3/4 into the first interval after the
882
102
    // consensus is no longer fresh, and 7/8 of the time remaining
883
102
    // after that before the consensus is invalid."
884
102
    let lowbound = voting_interval + (voting_interval * 3) / 4;
885
102
    let remainder = whole_lifetime - lowbound;
886
102
    let uncertainty = (remainder * 7) / 8;
887
102

            
888
102
    (valid_after + lowbound, uncertainty)
889
102
}
890

            
891
/// Helper: call `now` on a Weak<WriteNetDir>.
892
fn current_time<DM: WriteNetDir>(writedir: &Weak<DM>) -> Result<SystemTime> {
893
8
    if let Some(writedir) = Weak::upgrade(writedir) {
894
8
        Ok(writedir.now())
895
    } else {
896
        Err(Error::ManagerDropped)
897
    }
898
8
}
899

            
900
#[cfg(test)]
901
mod test {
902
    #![allow(clippy::unwrap_used)]
903
    #![allow(clippy::cognitive_complexity)]
904
    use super::*;
905
    use crate::{Authority, DownloadScheduleConfig};
906
    use std::convert::TryInto;
907
    use std::sync::{
908
        atomic::{self, AtomicBool},
909
        Arc,
910
    };
911
    use tempfile::TempDir;
912
    use time::macros::datetime;
913
    use tor_netdoc::doc::authcert::AuthCertKeyIds;
914

            
915
    #[test]
916
    fn download_schedule() {
917
        let va = datetime!(2008-08-02 20:00 UTC).into();
918
        let fu = datetime!(2008-08-02 21:00 UTC).into();
919
        let vu = datetime!(2008-08-02 23:00 UTC).into();
920
        let lifetime = Lifetime::new(va, fu, vu).unwrap();
921

            
922
        let expected_start: SystemTime = datetime!(2008-08-02 21:45 UTC).into();
923
        let expected_range = Duration::from_millis((75 * 60 * 1000) * 7 / 8);
924

            
925
        let (start, range) = client_download_range(&lifetime);
926
        assert_eq!(start, expected_start);
927
        assert_eq!(range, expected_range);
928

            
929
        for _ in 0..100 {
930
            let when = pick_download_time(&lifetime);
931
            assert!(when > va);
932
            assert!(when >= expected_start);
933
            assert!(when < vu);
934
            assert!(when <= expected_start + range);
935
        }
936
    }
937

            
938
    /// Makes a memory-backed storage.
939
    fn temp_store() -> (TempDir, Mutex<DynStore>) {
940
        let tempdir = TempDir::new().unwrap();
941

            
942
        let store = crate::storage::SqliteStore::from_path(tempdir.path(), false).unwrap();
943

            
944
        (tempdir, Mutex::new(Box::new(store)))
945
    }
946

            
947
    struct DirRcv {
948
        cfg: Arc<DirMgrConfig>,
949
        netdir: SharedMutArc<NetDir>,
950
        consensus_changed: AtomicBool,
951
        descriptors_changed: AtomicBool,
952
        now: SystemTime,
953
    }
954

            
955
    impl DirRcv {
956
        fn new(now: SystemTime, authorities: Option<Vec<Authority>>) -> Self {
957
            let mut netcfg = crate::NetworkConfig::builder();
958
            netcfg.fallback_caches(vec![]);
959
            if let Some(a) = authorities {
960
                netcfg.authorities(a);
961
            }
962
            let cfg = DirMgrConfig::builder()
963
                .cache_path("/we_will_never_use_this/")
964
                .network_config(netcfg.build().unwrap())
965
                .build()
966
                .unwrap();
967
            let cfg = Arc::new(cfg);
968
            DirRcv {
969
                now,
970
                cfg,
971
                netdir: Default::default(),
972
                consensus_changed: false.into(),
973
                descriptors_changed: false.into(),
974
            }
975
        }
976
    }
977

            
978
    impl WriteNetDir for DirRcv {
979
        fn config(&self) -> Arc<DirMgrConfig> {
980
            Arc::clone(&self.cfg)
981
        }
982
        fn netdir(&self) -> &SharedMutArc<NetDir> {
983
            &self.netdir
984
        }
985
        fn netdir_consensus_changed(&self) {
986
            self.consensus_changed.store(true, atomic::Ordering::SeqCst);
987
        }
988
        fn netdir_descriptors_changed(&self) {
989
            self.descriptors_changed
990
                .store(true, atomic::Ordering::SeqCst);
991
        }
992
        fn now(&self) -> SystemTime {
993
            self.now
994
        }
995
    }
996

            
997
    // Test data
998
    const CONSENSUS: &str = include_str!("../testdata/mdconsensus1.txt");
999
    const CONSENSUS2: &str = include_str!("../testdata/mdconsensus2.txt");
    const AUTHCERT_5696: &str = include_str!("../testdata/cert-5696.txt");
    const AUTHCERT_5A23: &str = include_str!("../testdata/cert-5A23.txt");
    #[allow(unused)]
    const AUTHCERT_7C47: &str = include_str!("../testdata/cert-7C47.txt");
    fn test_time() -> SystemTime {
        datetime!(2020-08-07 12:42:45 UTC).into()
    }
    fn rsa(s: &str) -> RsaIdentity {
        RsaIdentity::from_hex(s).unwrap()
    }
    fn test_authorities() -> Vec<Authority> {
        fn a(s: &str) -> Authority {
            Authority::builder()
                .name("ignore")
                .v3ident(rsa(s))
                .build()
                .unwrap()
        }
        vec![
            a("5696AB38CB3852AFA476A5C07B2D4788963D5567"),
            a("5A23BA701776C9C1AB1C06E734E92AB3D5350D64"),
            // This is an authority according to the consensus, but we'll
            // pretend we don't recognize it, to make sure that we
            // don't fetch or accept it.
            // a("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
        ]
    }
    fn authcert_id_5696() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("5696ab38cb3852afa476a5c07b2d4788963d5567"),
            sk_fingerprint: rsa("f6ed4aa64d83caede34e19693a7fcf331aae8a6a"),
        }
    }
    fn authcert_id_5a23() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("5a23ba701776c9c1ab1c06e734e92ab3d5350d64"),
            sk_fingerprint: rsa("d08e965cc6dcb6cb6ed776db43e616e93af61177"),
        }
    }
    // remember, we're saying that we don't recognize this one as an authority.
    fn authcert_id_7c47() -> AuthCertKeyIds {
        AuthCertKeyIds {
            id_fingerprint: rsa("7C47DCB4A90E2C2B7C7AD27BD641D038CF5D7EBE"),
            sk_fingerprint: rsa("D3C013E0E6C82E246090D1C0798B75FCB7ACF120"),
        }
    }
    fn microdescs() -> HashMap<MdDigest, String> {
        const MICRODESCS: &str = include_str!("../testdata/microdescs.txt");
        let text = MICRODESCS;
        MicrodescReader::new(text, &AllowAnnotations::AnnotationsNotAllowed)
            .map(|res| {
                let anno = res.unwrap();
                let text = anno.within(text).unwrap();
                let md = anno.into_microdesc();
                (*md.digest(), text.to_owned())
            })
            .collect()
    }

            
    #[test]
    fn get_consensus_state() {
        let rcv = Arc::new(DirRcv::new(test_time(), None));

            
        let (_tempdir, store) = temp_store();

            
        let mut state =
            GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();

            
        // Is description okay?
        assert_eq!(&state.describe(), "Looking for a consensus.");

            
        // Basic properties: without a consensus it is not ready to advance.
        assert!(!state.can_advance());
        assert!(!state.is_ready(Readiness::Complete));
        assert!(!state.is_ready(Readiness::Usable));

            
        // Basic properties: it doesn't want to reset.
        assert!(state.reset_time().is_none());

            
        // Its starting DirStatus is "fetching a consensus".
        assert_eq!(state.bootstrap_status().to_string(), "fetching a consensus");

            
        // Download configuration is simple: only 1 request can be done in
        // parallel.  It uses a consensus retry schedule.
        let retry = state.dl_config().unwrap();
        assert_eq!(&retry, DownloadScheduleConfig::default().retry_consensus());

            
        // Do we know what we want?
        let docs = state.missing_docs();
        assert_eq!(docs.len(), 1);
        let docid = docs[0];

            
        assert!(matches!(
            docid,
            DocId::LatestConsensus {
                flavor: ConsensusFlavor::Microdesc,
                cache_usage: CacheUsage::CacheOkay,
            }
        ));

            
        // Now suppose that we get some complete junk from a download.
        let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
        let req = crate::docid::ClientRequest::Consensus(req);
        let outcome = state.add_from_download("this isn't a consensus", &req, Some(&store));
        assert!(matches!(outcome, Err(Error::NetDocError { .. })));
        // make sure it wasn't stored...
        assert!(store
            .lock()
            .unwrap()
            .latest_consensus(ConsensusFlavor::Microdesc, None)
            .unwrap()
            .is_none());

            
        // Now try again, with a real consensus... but the wrong authorities.
        let outcome = state.add_from_download(CONSENSUS, &req, Some(&store));
        assert!(matches!(outcome, Err(Error::UnrecognizedAuthorities)));
        assert!(store
            .lock()
            .unwrap()
            .latest_consensus(ConsensusFlavor::Microdesc, None)
            .unwrap()
            .is_none());

            
        // Great. Change the receiver to use a configuration where these test
        // authorities are recognized.
        let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));

            
        let mut state =
            GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();
        let outcome = state.add_from_download(CONSENSUS, &req, Some(&store));
        assert!(outcome.unwrap());
        assert!(store
            .lock()
            .unwrap()
            .latest_consensus(ConsensusFlavor::Microdesc, None)
            .unwrap()
            .is_some());

            
        // And with that, we should be asking for certificates
        assert!(state.can_advance());
        assert_eq!(&state.describe(), "About to fetch certificates.");
        assert_eq!(state.missing_docs(), Vec::new());
        let next = Box::new(state).advance().unwrap();
        assert_eq!(
            &next.describe(),
            "Downloading certificates for consensus (we are missing 2/2)."
        );

            
        // Try again, but this time get the state from the cache.
        let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
        let mut state =
            GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();
        let text: crate::storage::InputString = CONSENSUS.to_owned().into();
        let map = vec![(docid, text.into())].into_iter().collect();
        let outcome = state.add_from_cache(map, None);
        assert!(outcome.unwrap());
        assert!(state.can_advance());
    }

            
    #[test]
    fn get_certs_state() {
        /// Construct a GetCertsState with our test data
        fn new_getcerts_state() -> (Arc<DirRcv>, Box<dyn DirState>) {
            let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
            let mut state =
                GetConsensusState::new(Arc::downgrade(&rcv), CacheUsage::CacheOkay).unwrap();
            let req = tor_dirclient::request::ConsensusRequest::new(ConsensusFlavor::Microdesc);
            let req = crate::docid::ClientRequest::Consensus(req);
            let outcome = state.add_from_download(CONSENSUS, &req, None);
            assert!(outcome.unwrap());
            (rcv, Box::new(state).advance().unwrap())
        }

            
        let (_tempdir, store) = temp_store();
        let (_rcv, mut state) = new_getcerts_state();
        // Basic properties: description, status, reset time.
        assert_eq!(
            &state.describe(),
            "Downloading certificates for consensus (we are missing 2/2)."
        );
        assert!(!state.can_advance());
        assert!(!state.is_ready(Readiness::Complete));
        assert!(!state.is_ready(Readiness::Usable));
        let consensus_expires = datetime!(2020-08-07 12:43:20 UTC).into();
        assert_eq!(state.reset_time(), Some(consensus_expires));
        let retry = state.dl_config().unwrap();
        assert_eq!(&retry, DownloadScheduleConfig::default().retry_certs());

            
        // Bootstrap status okay?
        assert_eq!(
            state.bootstrap_status().to_string(),
            "fetching authority certificates (0/2)"
        );

            
        // Check that we get the right list of missing docs.
        let missing = state.missing_docs();
        assert_eq!(missing.len(), 2); // We are missing two certificates.
        assert!(missing.contains(&DocId::AuthCert(authcert_id_5696())));
        assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
        // we don't ask for this one because we don't recognize its authority
        assert!(!missing.contains(&DocId::AuthCert(authcert_id_7c47())));

            
        // Add one from the cache; make sure the list is still right
        let text1: crate::storage::InputString = AUTHCERT_5696.to_owned().into();
        // let text2: crate::storage::InputString = AUTHCERT_5A23.to_owned().into();
        let docs = vec![(DocId::AuthCert(authcert_id_5696()), text1.into())]
            .into_iter()
            .collect();
        let outcome = state.add_from_cache(docs, None);
        assert!(outcome.unwrap()); // no error, and something changed.
        assert!(!state.can_advance()); // But we aren't done yet.
        let missing = state.missing_docs();
        assert_eq!(missing.len(), 1); // Now we're only missing one!
        assert!(missing.contains(&DocId::AuthCert(authcert_id_5a23())));
        assert_eq!(
            state.bootstrap_status().to_string(),
            "fetching authority certificates (1/2)"
        );

            
        // Now try to add the other from a download ... but fail
        // because we didn't ask for it.
        let mut req = tor_dirclient::request::AuthCertRequest::new();
        req.push(authcert_id_5696()); // it's the wrong id.
        let req = ClientRequest::AuthCert(req);
        let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store));
        assert!(!outcome.unwrap()); // no error, but nothing changed.
        let missing2 = state.missing_docs();
        assert_eq!(missing, missing2); // No change.
        assert!(store
            .lock()
            .unwrap()
            .authcerts(&[authcert_id_5a23()])
            .unwrap()
            .is_empty());

            
        // Now try to add the other from a download ... for real!
        let mut req = tor_dirclient::request::AuthCertRequest::new();
        req.push(authcert_id_5a23()); // Right idea this time!
        let req = ClientRequest::AuthCert(req);
        let outcome = state.add_from_download(AUTHCERT_5A23, &req, Some(&store));
        assert!(outcome.unwrap()); // No error, _and_ something changed!
        let missing3 = state.missing_docs();
        assert!(missing3.is_empty());
        assert!(state.can_advance());
        assert!(!store
            .lock()
            .unwrap()
            .authcerts(&[authcert_id_5a23()])
            .unwrap()
            .is_empty());

            
        let next = state.advance().unwrap();
        assert_eq!(
            &next.describe(),
            "Downloading microdescriptors (we are missing 6)."
        );

            
        // If we start from scratch and reset, we're back in GetConsensus.
        let (_rcv, state) = new_getcerts_state();
        let state = state.reset().unwrap();
        assert_eq!(&state.describe(), "Looking for a consensus.");

            
        // TODO: I'd like even more tests to make sure that we never
        // accept a certificate for an authority we don't believe in.
    }

            
    #[test]
    fn get_microdescs_state() {
        /// Construct a GetCertsState with our test data
        fn new_getmicrodescs_state() -> (Arc<DirRcv>, GetMicrodescsState<DirRcv>) {
            let rcv = Arc::new(DirRcv::new(test_time(), Some(test_authorities())));
            let (signed, rest, consensus) = MdConsensus::parse(CONSENSUS2).unwrap();
            let consensus = consensus
                .dangerously_assume_timely()
                .dangerously_assume_wellsigned();
            let meta = ConsensusMeta::from_consensus(signed, rest, &consensus);
            let state = GetMicrodescsState::new(
                CacheUsage::CacheOkay,
                consensus,
                meta,
                Arc::downgrade(&rcv),
            )
            .unwrap();

            
            (rcv, state)
        }
        fn d64(s: &str) -> MdDigest {
            base64::decode(s).unwrap().try_into().unwrap()
        }

            
        // If we start from scratch and reset, we're back in GetConsensus.
        let (_rcv, state) = new_getmicrodescs_state();
        let state = Box::new(state).reset().unwrap();
        assert_eq!(&state.describe(), "Looking for a consensus.");

            
        // Check the basics.
        let (_rcv, mut state) = new_getmicrodescs_state();
        assert_eq!(
            &state.describe(),
            "Downloading microdescriptors (we are missing 4)."
        );
        assert!(!state.can_advance());
        assert!(!state.is_ready(Readiness::Complete));
        assert!(!state.is_ready(Readiness::Usable));
        {
            let reset_time = state.reset_time().unwrap();
            let fresh_until: SystemTime = datetime!(2021-10-27 21:27:00 UTC).into();
            let valid_until: SystemTime = datetime!(2021-10-27 21:27:20 UTC).into();
            assert!(reset_time >= fresh_until);
            assert!(reset_time <= valid_until);
        }
        let retry = state.dl_config().unwrap();
        assert_eq!(&retry, DownloadScheduleConfig::default().retry_microdescs());
        assert_eq!(
            state.bootstrap_status().to_string(),
            "fetching microdescriptors (0/4)"
        );

            
        // Now check whether we're missing all the right microdescs.
        let missing = state.missing_docs();
        let md_text = microdescs();
        assert_eq!(missing.len(), 4);
        assert_eq!(md_text.len(), 4);
        let md1 = d64("LOXRj8YZP0kwpEAsYOvBZWZWGoWv5b/Bp2Mz2Us8d8g");
        let md2 = d64("iOhVp33NyZxMRDMHsVNq575rkpRViIJ9LN9yn++nPG0");
        let md3 = d64("/Cd07b3Bl0K0jX2/1cAvsYXJJMi5d8UBU+oWKaLxoGo");
        let md4 = d64("z+oOlR7Ga6cg9OoC/A3D3Ey9Rtc4OldhKlpQblMfQKo");
        for md_digest in [md1, md2, md3, md4] {
            assert!(missing.contains(&DocId::Microdesc(md_digest)));
            assert!(md_text.contains_key(&md_digest));
        }

            
        // Try adding a microdesc from the cache.
        let (_tempdir, store) = temp_store();
        let doc1: crate::storage::InputString = md_text.get(&md1).unwrap().clone().into();
        let docs = vec![(DocId::Microdesc(md1), doc1.into())]
            .into_iter()
            .collect();
        let outcome = state.add_from_cache(docs, Some(&store));
        assert!(outcome.unwrap()); // successfully loaded one MD.
        assert!(!state.can_advance());
        assert!(!state.is_ready(Readiness::Complete));
        assert!(!state.is_ready(Readiness::Usable));

            
        // Now we should be missing 3.
        let missing = state.missing_docs();
        assert_eq!(missing.len(), 3);
        assert!(!missing.contains(&DocId::Microdesc(md1)));
        assert_eq!(
            state.bootstrap_status().to_string(),
            "fetching microdescriptors (1/4)"
        );

            
        // Try adding the rest as if from a download.
        let mut req = tor_dirclient::request::MicrodescRequest::new();
        // Clear this flag so that the test consensus won't expire the moment
        // we're done.
        state.expire_when_complete = false;
        let mut response = "".to_owned();
        for md_digest in [md2, md3, md4] {
            response.push_str(md_text.get(&md_digest).unwrap());
            req.push(md_digest);
        }
        let req = ClientRequest::Microdescs(req);
        let outcome = state.add_from_download(response.as_str(), &req, Some(&store));
        assert!(outcome.unwrap()); // successfully loaded MDs
        assert!(state.is_ready(Readiness::Complete));
        assert!(state.is_ready(Readiness::Usable));
        assert_eq!(
            store
                .lock()
                .unwrap()
                .microdescs(&[md2, md3, md4])
                .unwrap()
                .len(),
            3
        );

            
        let missing = state.missing_docs();
        assert!(missing.is_empty());
    }
}