1
//! Code for notifying other modules about changes in the directory.
2

            
3
// TODO(nickm): After we have enough experience with this FlagPublisher, we
4
// might want to make it a public interface. If we do it should probably move
5
// into another crate.
6

            
7
use std::{
8
    fmt,
9
    marker::PhantomData,
10
    pin::Pin,
11
    sync::{
12
        atomic::{AtomicUsize, Ordering},
13
        Arc,
14
    },
15
    task::Poll,
16
    time::SystemTime,
17
};
18

            
19
use educe::Educe;
20
use futures::{stream::Stream, Future, StreamExt};
21
use time::OffsetDateTime;
22
use tor_basic_utils::skip_fmt;
23
use tor_netdoc::doc::netstatus;
24

            
25
/// An event that a DirMgr can broadcast to indicate that a change in
26
/// the status of its directory.
27
16
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28
#[non_exhaustive]
29
pub enum DirEvent {
30
    /// A new consensus has been received, and has enough information to be
31
    /// used.
32
    ///
33
    /// This event is also broadcast when a new set of consensus parameters is
34
    /// available, even if that set of parameters comes from a configuration
35
    /// change rather than from the latest consensus.
36
    NewConsensus,
37

            
38
    /// New descriptors have been received for the current consensus.
39
    ///
40
    /// (This event is _not_ broadcast when receiving new descriptors for a
41
    /// consensus which is not yet ready to replace the current consensus.)
42
    NewDescriptors,
43
}
44

            
45
/// A trait to indicate something that can be published with [`FlagPublisher`].
46
///
47
/// Since the implementation of `FlagPublisher` requires that its events be
48
/// represented as small integers, this trait is mainly about converting to and
49
/// from those integers.
50
pub(crate) trait FlagEvent: Sized {
51
    /// The maximum allowed integer value that [`FlagEvent::to_index()`] can return
52
    /// for this type.
53
    ///
54
    /// This is limited to u16 because the [`FlagPublisher`] uses a vector of all
55
    /// known flags, and sometimes iterates over the whole vector.
56
    const MAXIMUM: u16;
57
    /// Convert this event into an index.
58
    ///
59
    /// For efficiency, indices should be small and densely packed.
60
    fn to_index(self) -> u16;
61
    /// Try to reconstruct an event from its index.  Return None if the index is
62
    /// out-of-bounds.
63
    fn from_index(flag: u16) -> Option<Self>;
64
}
65

            
66
impl FlagEvent for DirEvent {
67
    const MAXIMUM: u16 = 1;
68
1616
    fn to_index(self) -> u16 {
69
1616
        match self {
70
812
            DirEvent::NewConsensus => 0,
71
804
            DirEvent::NewDescriptors => 1,
72
        }
73
1616
    }
74
817
    fn from_index(flag: u16) -> Option<Self> {
75
817
        match flag {
76
412
            0 => Some(DirEvent::NewConsensus),
77
404
            1 => Some(DirEvent::NewDescriptors),
78
1
            _ => None,
79
        }
80
817
    }
81
}
82

            
83
/// A publisher that broadcasts flag-level events to multiple subscribers.
84
///
85
/// Events with the same flag value may be coalesced: that is, if the same event
86
/// is published ten times in a row, a subscriber may receive only a single
87
/// notification of the event.
88
///
89
/// FlagPublisher supports an MPMC model: cloning a Publisher creates a new handle
90
/// that can also broadcast events to everybody listening on the channel.
91
///  Dropping the last handle closes all streams subscribed to it.
92
pub(crate) struct FlagPublisher<F> {
93
    /// Inner data shared by publishers and streams.
94
    inner: Arc<Inner<F>>,
95
}
96

            
97
/// Shared structure to implement [`FlagPublisher`] and [`FlagListener`].
98
struct Inner<F> {
99
    /// An event that we use to broadcast whenever a new [`FlagEvent`] event has occurred.
100
    event: event_listener::Event,
101
    /// How many times has each event occurred, ever.
102
    ///
103
    /// (It is safe for this to wrap around.)
104
    // TODO(nickm): I wish this could be an array, but const generics don't
105
    // quite support that yet.
106
    counts: Vec<AtomicUsize>, // I wish this could be an array.
107
    /// How many publishers remain?
108
    n_publishers: AtomicUsize,
109
    /// Phantom member to provide correct covariance.
110
    ///
111
    /// The `fn` business is a covariance trick to include `F` without affecting
112
    /// this object's Send/Sync status.
113
    _phantom: PhantomData<fn(F) -> F>,
114
}
115

            
116
/// A [`Stream`] that returns a series of event [`FlagEvent`]s broadcast by a
117
/// [`FlagPublisher`].
118
pub(crate) struct FlagListener<F> {
119
    /// What value of each flag's count have we seen most recently?  
120
    ///
121
    /// Note that we count the event as "received" only once for each observed
122
    /// change in the flag's count, even if that count has changed by more than
123
    /// 1.
124
    my_counts: Vec<usize>,
125
    /// An an `EventListener` that will be notified when events are published,
126
    /// or when the final publisher is dropped.
127
    ///
128
    /// We must always have one of these available _before_ we check any counts
129
    /// in self.inner.
130
    listener: event_listener::EventListener,
131
    /// Reference to shared data.
132
    inner: Arc<Inner<F>>,
133
}
134

            
135
impl<F: FlagEvent> FlagPublisher<F> {
136
    /// Construct a new FlagPublisher.
137
25
    pub(crate) fn new() -> Self {
138
25
        // We can't use vec![AtomicUsize::new(0); F::MAXIMUM+1]: that would
139
25
        // require AtomicUsize to be Clone.
140
25
        let counts = std::iter::repeat_with(AtomicUsize::default)
141
25
            .take(F::MAXIMUM as usize + 1)
142
25
            .collect();
143
25
        FlagPublisher {
144
25
            inner: Arc::new(Inner {
145
25
                event: event_listener::Event::new(),
146
25
                counts,
147
25
                n_publishers: AtomicUsize::new(1),
148
25
                _phantom: PhantomData,
149
25
            }),
150
25
        }
151
25
    }
152

            
153
    /// Create a new subscription to this FlagPublisher.
154
22
    pub(crate) fn subscribe(&self) -> FlagListener<F> {
155
22
        // We need to do this event.listen before we check the counts; otherwise
156
22
        // we could have a sequence where: we check the count, then the
157
22
        // publisher increments the count, then the publisher calls
158
22
        // event.notify(), and we call event.listen(). That would cause us to
159
22
        // miss the increment.
160
22
        let listener = self.inner.event.listen();
161
22

            
162
22
        FlagListener {
163
22
            my_counts: self
164
22
                .inner
165
22
                .counts
166
22
                .iter()
167
44
                .map(|a| a.load(Ordering::SeqCst))
168
22
                .collect(),
169
22
            listener,
170
22
            inner: Arc::clone(&self.inner),
171
22
        }
172
22
    }
173

            
174
    /// Tell every listener that the provided flag has been published.
175
816
    pub(crate) fn publish(&self, flag: F) {
176
816
        self.inner.counts[flag.to_index() as usize].fetch_add(1, Ordering::SeqCst);
177
816
        self.inner.event.notify(usize::MAX);
178
816
    }
179
}
180

            
181
impl<F> Clone for FlagPublisher<F> {
182
4
    fn clone(&self) -> FlagPublisher<F> {
183
4
        self.inner.n_publishers.fetch_add(1, Ordering::SeqCst);
184
4
        FlagPublisher {
185
4
            inner: Arc::clone(&self.inner),
186
4
        }
187
4
    }
188
}
189

            
190
// We must implement Drop to keep count publishers, and so that when the last
191
// publisher goes away, we can wake up every listener  so that it notices that
192
// the stream is now ended.
193
impl<F> Drop for FlagPublisher<F> {
194
27
    fn drop(&mut self) {
195
27
        if self.inner.n_publishers.fetch_sub(1, Ordering::SeqCst) == 1 {
196
23
            // That was the last reference; we must notify the listeners.
197
23
            self.inner.event.notify(usize::MAX);
198
23
        }
199
27
    }
200
}
201

            
202
impl<F: FlagEvent> Stream for FlagListener<F> {
203
    type Item = F;
204

            
205
1250
    fn poll_next(
206
1250
        mut self: std::pin::Pin<&mut Self>,
207
1250
        cx: &mut std::task::Context<'_>,
208
1250
    ) -> std::task::Poll<Option<Self::Item>> {
209
        loop {
210
            // Notify the caller if any events are ready to fire.
211
2904
            for idx in 0..F::MAXIMUM as usize + 1 {
212
2904
                let cur = self.inner.counts[idx].load(Ordering::SeqCst);
213
2904
                // We don't have to use < here specifically, since any change
214
2904
                // indicates that the count has been modified. That lets us
215
2904
                // survive usize wraparound.
216
2904
                if cur != self.my_counts[idx] {
217
816
                    self.my_counts[idx] = cur;
218
816
                    return Poll::Ready(Some(F::from_index(idx as u16).expect("Internal error")));
219
2088
                }
220
            }
221

            
222
            // At this point, notify the caller if there are no more publishers.
223
842
            if self.inner.n_publishers.load(Ordering::SeqCst) == 0 {
224
16
                return Poll::Ready(None);
225
826
            }
226
826

            
227
826
            if let Poll::Ready(()) = Pin::new(&mut self.listener).poll(cx) {
228
408
                // Got a new notification; we must create a new event and continue the loop.
229
408
                //
230
408
                // See discussion in `FlagPublisher::subscribe()` for why we must always create
231
408
                // this listener _before_ checking any flags.
232
408
                self.listener = self.inner.event.listen();
233
408
            } else {
234
                // Nothing to do yet: put the listener back.
235
418
                return Poll::Pending;
236
            }
237
        }
238
1250
    }
239
}
240

            
241
/// Description of the directory manager's current bootstrapping status.
242
///
243
/// This status does not necessarily increase monotonically: it can go backwards
244
/// if (for example) our directory information expires before we're able to get
245
/// new information.
246
30
#[derive(Clone, Debug, Default)]
247
pub struct DirBootstrapStatus {
248
    /// The status for the current directory that we're using right now.
249
    pub(crate) current: DirStatus,
250
    /// The status for a directory that we're downloading to replace the current
251
    /// directory.
252
    ///
253
    /// This is "None" if we haven't started fetching the next consensus yet.
254
    pub(crate) next: Option<DirStatus>,
255
}
256

            
257
/// The status for a single directory.
258
37
#[derive(Clone, Debug, Default)]
259
pub struct DirStatus(DirStatusInner);
260

            
261
/// The contents of a single DirStatus.
262
///
263
/// This is a separate type so that we don't make the variants public.
264
37
#[derive(Clone, Debug, Educe)]
265
#[educe(Default)]
266
pub(crate) enum DirStatusInner {
267
    /// We don't have any information yet.
268
    #[educe(Default)]
269
    NoConsensus {
270
        /// If present, we are fetching a consensus whose valid-after time
271
        /// postdates this time.
272
        after: Option<SystemTime>,
273
    },
274
    /// We've downloaded a consensus, but we haven't validated it yet.
275
    FetchingCerts {
276
        /// The lifetime of the consensus.
277
        lifetime: netstatus::Lifetime,
278
        /// A fraction (in (numerator,denominator) format) of the certificates
279
        /// we have for this consensus.
280
        n_certs: (u16, u16),
281
    },
282
    /// We've validated a consensus and we're fetching (or have fetched) its
283
    /// microdescriptors.
284
    Validated {
285
        /// The lifetime of the consensus.
286
        lifetime: netstatus::Lifetime,
287
        /// A fraction (in (numerator,denominator) form) of the microdescriptors
288
        /// that we have for this consensus.
289
        n_mds: (u32, u32),
290
        /// True iff we've decided that the consensus is usable.
291
        usable: bool,
292
        // TODO(nickm) Someday we could add a field about whether any primary
293
        // guards are missing microdescriptors, to give a better explanation for
294
        // the case where we won't switch our consensus because of that.
295
    },
296
}
297

            
298
impl From<DirStatusInner> for DirStatus {
299
7
    fn from(inner: DirStatusInner) -> DirStatus {
300
7
        DirStatus(inner)
301
7
    }
302
}
303

            
304
impl fmt::Display for DirStatus {
305
11
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306
11
        /// Format this time in a format useful for displaying
307
11
        /// lifetime boundaries.
308
11
        fn fmt_time(t: SystemTime) -> String {
309
4
            use once_cell::sync::Lazy;
310
4
            /// Formatter object for lifetime boundaries.
311
4
            ///
312
4
            /// We use "YYYY-MM-DD HH:MM:SS UTC" here, since we never have
313
4
            /// sub-second times here, and using non-UTC offsets is confusing
314
4
            /// in this context.
315
4
            static FORMAT: Lazy<Vec<time::format_description::FormatItem>> = Lazy::new(|| {
316
1
                time::format_description::parse("[year]-[month]-[day] [hour]:[minute]:[second] UTC")
317
1
                    .expect("Invalid time format")
318
1
            });
319
4
            OffsetDateTime::from(t)
320
4
                .format(&FORMAT)
321
4
                .unwrap_or_else(|_| "(could not format)".into())
322
4
        }
323
11

            
324
11
        match &self.0 {
325
2
            DirStatusInner::NoConsensus { .. } => write!(f, "fetching a consensus"),
326
3
            DirStatusInner::FetchingCerts { n_certs, .. } => write!(
327
3
                f,
328
3
                "fetching authority certificates ({}/{})",
329
3
                n_certs.0, n_certs.1
330
3
            ),
331
            DirStatusInner::Validated {
332
                usable: false,
333
4
                n_mds,
334
4
                ..
335
4
            } => write!(f, "fetching microdescriptors ({}/{})", n_mds.0, n_mds.1),
336
            DirStatusInner::Validated {
337
                usable: true,
338
2
                lifetime,
339
2
                ..
340
2
            } => write!(
341
2
                f,
342
2
                "usable, fresh until {}, and valid until {}",
343
2
                fmt_time(lifetime.fresh_until()),
344
2
                fmt_time(lifetime.valid_until())
345
2
            ),
346
        }
347
11
    }
348
}
349

            
350
impl fmt::Display for DirBootstrapStatus {
351
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352
1
        write!(f, "directory is {}", self.current)?;
353
1
        if let Some(ref next) = self.next {
354
1
            write!(f, "; next directory is {}", next)?;
355
        }
356
1
        Ok(())
357
1
    }
358
}
359

            
360
impl DirBootstrapStatus {
361
    /// Return the fraction of completion for directory download, in a form
362
    /// suitable for a progress bar at some particular time.
363
    ///
364
    /// This value is not monotonic, and can go down as one directory is
365
    /// replaced with another.
366
    ///
367
    /// Callers _should not_ depend on the specific meaning of any particular
368
    /// fraction; we may change these fractions in the future.
369
2
    pub fn frac_at(&self, when: SystemTime) -> f32 {
370
2
        self.current
371
2
            .frac_at(when)
372
2
            .or_else(|| self.next.as_ref().and_then(|next| next.frac_at(when)))
373
2
            .unwrap_or(0.0)
374
2
    }
375

            
376
    /// Return true if this status indicates that we have a current usable
377
    /// directory.
378
    pub fn usable_at(&self, now: SystemTime) -> bool {
379
        self.current.usable() && self.current.valid_at(now)
380
    }
381

            
382
    /// Update this status by replacing its current status (or its next status)
383
    /// with `new_status`, as appropriate.
384
11
    pub(crate) fn update(&mut self, new_status: DirStatus) {
385
11
        if new_status.usable() {
386
            // This is a usable directory, but it might be a stale one still
387
            // getting updated.  Make sure that it is at least as new as the one
388
            // in `current` before we set `current`.
389
2
            if new_status.at_least_as_new_as(&self.current) {
390
                // This one will be `current`. Should we clear `next`? Only if
391
                // this one is at least as recent as `next` too.
392
1
                if let Some(ref next) = self.next {
393
1
                    if new_status.at_least_as_new_as(next) {
394
1
                        self.next = None;
395
1
                    }
396
                }
397
1
                self.current = new_status;
398
1
            }
399
9
        } else if !self.current.usable() {
400
8
            // Not a usable directory, but we don't _have_ a usable directory. This is therefore current.
401
8
            self.current = new_status;
402
8
        } else {
403
1
            // This is _not_ a usable directory, so it can only be `next`.
404
1
            self.next = Some(new_status);
405
1
        }
406
11
    }
407
}
408

            
409
impl DirStatus {
410
    /// Return the consensus lifetime for this directory, if we have one.
411
16
    fn lifetime(&self) -> Option<&netstatus::Lifetime> {
412
16
        match &self.0 {
413
4
            DirStatusInner::NoConsensus { .. } => None,
414
3
            DirStatusInner::FetchingCerts { lifetime, .. } => Some(lifetime),
415
9
            DirStatusInner::Validated { lifetime, .. } => Some(lifetime),
416
        }
417
16
    }
418

            
419
    /// Return true if the directory is valid at the given time.
420
    fn valid_at(&self, when: SystemTime) -> bool {
421
9
        if let Some(lifetime) = self.lifetime() {
422
7
            lifetime.valid_after() <= when && when < lifetime.valid_until()
423
        } else {
424
2
            false
425
        }
426
9
    }
427

            
428
    /// As frac_at, but return None if this consensus is not valid at the given time.
429
9
    fn frac_at(&self, when: SystemTime) -> Option<f32> {
430
9
        if self.valid_at(when) {
431
4
            Some(self.frac())
432
        } else {
433
5
            None
434
        }
435
9
    }
436

            
437
    /// Return true if this status indicates a usable directory.
438
21
    fn usable(&self) -> bool {
439
21
        matches!(self.0, DirStatusInner::Validated { usable: true, .. })
440
21
    }
441

            
442
    /// Return the fraction of completion for directory download, in a form
443
    /// suitable for a progress bar.
444
    ///
445
    /// This is monotonically increasing for a single directory, but can go down
446
    /// as one directory is replaced with another.
447
    ///
448
    /// Callers _should not_ depend on the specific meaning of any particular
449
    /// fraction; we may change these fractions in the future.
450
7
    fn frac(&self) -> f32 {
451
7
        // We arbitrarily decide that 25% is downloading the consensus, 10% is
452
7
        // downloading the certificates, and the remaining 65% is downloading
453
7
        // the microdescriptors until we become usable.  We may want to re-tune that in the future, but
454
7
        // the documentation of this function should allow us to do so.
455
7
        match &self.0 {
456
1
            DirStatusInner::NoConsensus { .. } => 0.0,
457
2
            DirStatusInner::FetchingCerts { n_certs, .. } => {
458
2
                0.25 + f32::from(n_certs.0) / f32::from(n_certs.1) * 0.10
459
            }
460
            DirStatusInner::Validated {
461
                usable: false,
462
3
                n_mds,
463
3
                ..
464
3
            } => 0.35 + (n_mds.0 as f32) / (n_mds.1 as f32) * 0.65,
465
1
            DirStatusInner::Validated { usable: true, .. } => 1.0,
466
        }
467
7
    }
468

            
469
    /// Return true if the consensus in this DirStatus (if any) is at least as
470
    /// new as the one in `other`.
471
    fn at_least_as_new_as(&self, other: &DirStatus) -> bool {
472
        /// return a candidate "valid after" time for a DirStatus, for comparison purposes.
473
        fn start_time(st: &DirStatus) -> Option<SystemTime> {
474
3
            match &st.0 {
475
                DirStatusInner::NoConsensus { after: Some(t) } => {
476
                    Some(*t + std::time::Duration::new(1, 0)) // Make sure this sorts _after_ t.
477
                }
478
5
                DirStatusInner::FetchingCerts { lifetime, .. } => Some(lifetime.valid_after()),
479
10
                DirStatusInner::Validated { lifetime, .. } => Some(lifetime.valid_after()),
480
3
                _ => None,
481
            }
482
18
        }
483

            
484
9
        match (start_time(self), start_time(other)) {
485
            // If both have a lifetime, compare their valid_after times.
486
7
            (Some(l1), Some(l2)) => l1 >= l2,
487
            // Any consensus is newer than none.
488
1
            (Some(_), None) => true,
489
            // No consensus is never newer than anything.
490
1
            (None, _) => false,
491
        }
492
9
    }
493
}
494

            
495
/// A stream of [`DirBootstrapStatus`] events.
496
2
#[derive(Clone, Educe)]
497
#[educe(Debug)]
498
pub struct DirBootstrapEvents {
499
    /// The `postage::watch::Receiver` that we're wrapping.
500
    ///
501
    /// We wrap this type so that we don't expose its entire API, and so that we
502
    /// can migrate to some other implementation in the future if we want.
503
    #[educe(Debug(method = "skip_fmt"))]
504
    pub(crate) inner: postage::watch::Receiver<DirBootstrapStatus>,
505
}
506

            
507
impl Stream for DirBootstrapEvents {
508
    type Item = DirBootstrapStatus;
509

            
510
40
    fn poll_next(
511
40
        mut self: Pin<&mut Self>,
512
40
        cx: &mut std::task::Context<'_>,
513
40
    ) -> Poll<Option<Self::Item>> {
514
40
        self.inner.poll_next_unpin(cx)
515
40
    }
516
}
517

            
518
#[allow(clippy::unwrap_used)]
519
#[cfg(test)]
520
mod test {
521
    use std::time::Duration;
522

            
523
    use super::*;
524
    use float_eq::assert_float_eq;
525
    use futures::stream::StreamExt;
526
    use tor_rtcompat::test_with_all_runtimes;
527

            
528
    #[test]
529
    fn subscribe_and_publish() {
530
        test_with_all_runtimes!(|_rt| async {
531
            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
532
            let mut sub1 = publish.subscribe();
533
            publish.publish(DirEvent::NewConsensus);
534
            let mut sub2 = publish.subscribe();
535
            let ev = event_listener::Event::new();
536
            let lis = ev.listen();
537

            
538
            futures::join!(
539
                async {
540
                    // sub1 was created in time to see this event...
541
                    let val1 = sub1.next().await;
542
                    assert_eq!(val1, Some(DirEvent::NewConsensus));
543
                    ev.notify(1); // Tell the third task below to drop the publisher.
544
                    let val2 = sub1.next().await;
545
                    assert_eq!(val2, None);
546
                },
547
                async {
548
                    let val = sub2.next().await;
549
                    assert_eq!(val, None);
550
                },
551
                async {
552
                    lis.await;
553
                    drop(publish);
554
                }
555
            );
556
        });
557
    }
558

            
559
    #[test]
560
    fn receive_two() {
561
        test_with_all_runtimes!(|_rt| async {
562
            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
563

            
564
            let mut sub = publish.subscribe();
565
            let ev = event_listener::Event::new();
566
            let ev_lis = ev.listen();
567
            futures::join!(
568
                async {
569
                    let val1 = sub.next().await;
570
                    assert_eq!(val1, Some(DirEvent::NewDescriptors));
571
                    ev.notify(1);
572
                    let val2 = sub.next().await;
573
                    assert_eq!(val2, Some(DirEvent::NewConsensus));
574
                },
575
                async {
576
                    publish.publish(DirEvent::NewDescriptors);
577
                    ev_lis.await;
578
                    publish.publish(DirEvent::NewConsensus);
579
                }
580
            );
581
        });
582
    }
583

            
584
    #[test]
585
    fn two_publishers() {
586
        test_with_all_runtimes!(|_rt| async {
587
            let publish1: FlagPublisher<DirEvent> = FlagPublisher::new();
588
            let publish2 = publish1.clone();
589

            
590
            let mut sub = publish1.subscribe();
591
            let ev1 = event_listener::Event::new();
592
            let ev2 = event_listener::Event::new();
593
            let ev1_lis = ev1.listen();
594
            let ev2_lis = ev2.listen();
595
            futures::join!(
596
                async {
597
                    let mut count = [0_usize; 2];
598
                    // These awaits guarantee that we will see at least one event flag of each
599
                    // type, before the stream is dropped.
600
                    ev1_lis.await;
601
                    ev2_lis.await;
602
                    while let Some(e) = sub.next().await {
603
                        count[e.to_index() as usize] += 1;
604
                    }
605
                    assert!(count[0] > 0);
606
                    assert!(count[1] > 0);
607
                    assert!(count[0] <= 100);
608
                    assert!(count[1] <= 100);
609
                },
610
                async {
611
                    for _ in 0..100 {
612
                        publish1.publish(DirEvent::NewDescriptors);
613
                        ev1.notify(1);
614
                        tor_rtcompat::task::yield_now().await;
615
                    }
616
                    drop(publish1);
617
                },
618
                async {
619
                    for _ in 0..100 {
620
                        publish2.publish(DirEvent::NewConsensus);
621
                        ev2.notify(1);
622
                        tor_rtcompat::task::yield_now().await;
623
                    }
624
                    drop(publish2);
625
                }
626
            );
627
        });
628
    }
629

            
630
    #[test]
631
    fn receive_after_publishers_are_gone() {
632
        test_with_all_runtimes!(|_rt| async {
633
            let publish: FlagPublisher<DirEvent> = FlagPublisher::new();
634

            
635
            let mut sub = publish.subscribe();
636

            
637
            publish.publish(DirEvent::NewConsensus);
638
            drop(publish);
639
            let v = sub.next().await;
640
            assert_eq!(v, Some(DirEvent::NewConsensus));
641
            let v = sub.next().await;
642
            assert!(v.is_none());
643
        });
644
    }
645

            
646
    #[test]
647
    fn failed_conversion() {
648
        assert_eq!(DirEvent::from_index(999), None);
649
    }
650

            
651
    #[test]
652
    fn dir_status_basics() {
653
        let now = SystemTime::now();
654
        let hour = Duration::new(3600, 0);
655

            
656
        let nothing = DirStatus(DirStatusInner::NoConsensus { after: None });
657
        let unval = DirStatus(DirStatusInner::FetchingCerts {
658
            lifetime: netstatus::Lifetime::new(now, now + hour, now + hour * 2).unwrap(),
659
            n_certs: (3, 5),
660
        });
661
        let with_c = DirStatus(DirStatusInner::Validated {
662
            lifetime: netstatus::Lifetime::new(now + hour, now + hour * 2, now + hour * 3).unwrap(),
663
            n_mds: (30, 40),
664
            usable: false,
665
        });
666

            
667
        // lifetime()
668
        assert!(nothing.lifetime().is_none());
669
        assert_eq!(unval.lifetime().unwrap().valid_after(), now);
670
        assert_eq!(with_c.lifetime().unwrap().valid_until(), now + hour * 3);
671

            
672
        // at_least_as_new_as()
673
        assert!(!nothing.at_least_as_new_as(&nothing));
674
        assert!(unval.at_least_as_new_as(&nothing));
675
        assert!(unval.at_least_as_new_as(&unval));
676
        assert!(!unval.at_least_as_new_as(&with_c));
677
        assert!(with_c.at_least_as_new_as(&unval));
678
        assert!(with_c.at_least_as_new_as(&with_c));
679

            
680
        // frac() (It's okay if we change the actual numbers here later; the
681
        // current ones are more or less arbitrary.)
682
        const TOL: f32 = 0.00001;
683
        assert_float_eq!(nothing.frac(), 0.0, abs <= TOL);
684
        assert_float_eq!(unval.frac(), 0.25 + 0.06, abs <= TOL);
685
        assert_float_eq!(with_c.frac(), 0.35 + 0.65 * 0.75, abs <= TOL);
686

            
687
        // frac_at()
688
        let t1 = now + hour / 2;
689
        let t2 = t1 + hour * 2;
690
        assert!(nothing.frac_at(t1).is_none());
691
        assert_float_eq!(unval.frac_at(t1).unwrap(), 0.25 + 0.06, abs <= TOL);
692
        assert!(with_c.frac_at(t1).is_none());
693
        assert!(nothing.frac_at(t2).is_none());
694
        assert!(unval.frac_at(t2).is_none());
695
        assert_float_eq!(with_c.frac_at(t2).unwrap(), 0.35 + 0.65 * 0.75, abs <= TOL);
696
    }
697

            
698
    #[test]
699
    fn dir_status_display() {
700
        use time::macros::datetime;
701
        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
702
        let hour = Duration::new(3600, 0);
703
        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
704

            
705
        let ds = DirStatus(DirStatusInner::NoConsensus { after: None });
706
        assert_eq!(ds.to_string(), "fetching a consensus");
707

            
708
        let ds = DirStatus(DirStatusInner::FetchingCerts {
709
            lifetime: lifetime.clone(),
710
            n_certs: (3, 5),
711
        });
712
        assert_eq!(ds.to_string(), "fetching authority certificates (3/5)");
713

            
714
        let ds = DirStatus(DirStatusInner::Validated {
715
            lifetime: lifetime.clone(),
716
            n_mds: (30, 40),
717
            usable: false,
718
        });
719
        assert_eq!(ds.to_string(), "fetching microdescriptors (30/40)");
720

            
721
        let ds = DirStatus(DirStatusInner::Validated {
722
            lifetime,
723
            n_mds: (30, 40),
724
            usable: true,
725
        });
726
        assert_eq!(
727
            ds.to_string(),
728
            "usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC"
729
        );
730
    }
731

            
732
    #[test]
733
    fn bootstrap_status() {
734
        use time::macros::datetime;
735
        let t1: SystemTime = datetime!(2022-01-17 11:00:00 UTC).into();
736
        let hour = Duration::new(3600, 0);
737
        let lifetime = netstatus::Lifetime::new(t1, t1 + hour, t1 + hour * 3).unwrap();
738
        let lifetime2 = netstatus::Lifetime::new(t1 + hour, t1 + hour * 2, t1 + hour * 4).unwrap();
739

            
740
        let ds1: DirStatus = DirStatusInner::Validated {
741
            lifetime: lifetime.clone(),
742
            n_mds: (3, 40),
743
            usable: true,
744
        }
745
        .into();
746
        let ds2: DirStatus = DirStatusInner::Validated {
747
            lifetime: lifetime2.clone(),
748
            n_mds: (5, 40),
749
            usable: false,
750
        }
751
        .into();
752

            
753
        let bs = DirBootstrapStatus {
754
            current: ds1.clone(),
755
            next: Some(ds2.clone()),
756
        };
757

            
758
        assert_eq!(bs.to_string(),
759
            "directory is usable, fresh until 2022-01-17 12:00:00 UTC, and valid until 2022-01-17 14:00:00 UTC; next directory is fetching microdescriptors (5/40)"
760
        );
761

            
762
        const TOL: f32 = 0.00001;
763
        assert_float_eq!(bs.frac_at(t1 + hour / 2), 1.0, abs <= TOL);
764
        assert_float_eq!(
765
            bs.frac_at(t1 + hour * 3 + hour / 2),
766
            0.35 + 0.65 * 0.125,
767
            abs <= TOL
768
        );
769

            
770
        // Now try updating.
771

            
772
        // Case 1: we have a usable directory and the updated status isn't usable.
773
        let mut bs = bs;
774
        let ds3 = DirStatus(DirStatusInner::Validated {
775
            lifetime: lifetime2.clone(),
776
            n_mds: (10, 40),
777
            usable: false,
778
        });
779
        bs.update(ds3);
780
        assert!(matches!(
781
            bs.next.as_ref().unwrap().0,
782
            DirStatusInner::Validated {
783
                n_mds: (10, 40),
784
                ..
785
            }
786
        ));
787

            
788
        // Case 2: The new directory _is_ usable and newer.  It will replace the old one.
789
        let ds4 = DirStatus(DirStatusInner::Validated {
790
            lifetime: lifetime2.clone(),
791
            n_mds: (20, 40),
792
            usable: true,
793
        });
794
        bs.update(ds4);
795
        assert!(bs.next.as_ref().is_none());
796
        assert_eq!(
797
            bs.current.lifetime().unwrap().valid_after(),
798
            lifetime2.valid_after()
799
        );
800

            
801
        // Case 3: The new directory is usable but older. Nothing will happen.
802
        bs.update(ds1);
803
        assert!(bs.next.as_ref().is_none());
804
        assert_ne!(
805
            bs.current.lifetime().unwrap().valid_after(),
806
            lifetime.valid_after()
807
        );
808

            
809
        // Case 4: starting with an unusable directory, we always replace.
810
        let mut bs = DirBootstrapStatus::default();
811
        assert!(!ds2.usable());
812
        assert!(bs.current.lifetime().is_none());
813
        bs.update(ds2);
814
        assert!(bs.current.lifetime().is_some());
815
    }
816
}