1
//! Implement a timer for retrying a single failed fetch or object,
2
//! using the [decorrelated jitter] algorithm.
3
//!
4
//! For a more full specification, see [`dir-spec.txt`].
5
//!
6
//! [decorrelated jitter]: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
7
//! [`dir-spec.txt`]: https://spec.torproject.org/dir-spec
8

            
9
use rand::Rng;
10
use std::convert::TryInto;
11
use std::num::{NonZeroU32, NonZeroU8};
12
use std::time::Duration;
13

            
14
use serde::Deserialize;
15

            
16
/// An implementation for retrying downloads based on a decorrelated jitter
17
/// schedule.
18
///
19
/// The algorithm used here has several desirable properties:
20
///    * It is randomized, so that multiple timeouts don't have a
21
///      danger of getting synchronized with each other and hammering the
22
///      same directory servers all at once.
23
///    * It tends on average to wait longer and longer over time, so
24
///      that if the directory server is down, it won't get pummeled by
25
///      a zillion failing clients when it comes back up.
26
///    * It has a chance of retrying promptly, which results in better
27
///      client performance on average.
28
pub struct RetryDelay {
29
    /// The last delay that this retry delay returned (in msec), or 0
30
    /// if this never returned a delay.
31
    last_delay_ms: u32,
32
    /// The lowest allowable delay (in msec).
33
    low_bound_ms: u32,
34
}
35

            
36
/// Lowest possible lower bound, in milliseconds.
37
// We're doing this in MS, and Tor does it in seconds, so I'm
38
// multiplying the minimum by 1000 here.
39
const MIN_LOW_BOUND: u32 = 1000;
40

            
41
/// Largest possible lower bound, in milliseconds.
42
const MAX_LOW_BOUND: u32 = std::u32::MAX - 1;
43

            
44
/// Maximum amount to multiply the previous delay by.
45
const MAX_DELAY_MULT: u32 = 3;
46

            
47
impl RetryDelay {
48
    /// Construct a new RetryDelay from a given base delay in
49
    /// milliseconds.
50
    ///
51
    /// The base delay defines the lowest possible interval that can
52
    /// be returned.
53
    ///
54
    /// # Limitations
55
    ///
56
    /// If the base delay is less than 1000, a base delay of 1000 is
57
    /// used instead, since that's what the C tor implementation does.
58
8
    pub fn from_msec(base_delay_msec: u32) -> Self {
59
8
        let low_bound_ms = base_delay_msec.clamp(MIN_LOW_BOUND, MAX_LOW_BOUND);
60
8
        RetryDelay {
61
8
            last_delay_ms: 0,
62
8
            low_bound_ms,
63
8
        }
64
8
    }
65

            
66
    /// Construct a new RetryDelay from a given base delay.
67
    ///
68
    /// See from_msec for more information.
69
4
    pub fn from_duration(d: Duration) -> Self {
70
4
        let msec = d.as_millis();
71
4
        let msec = std::cmp::min(msec, u128::from(MAX_LOW_BOUND)) as u32;
72
4
        RetryDelay::from_msec(msec)
73
4
    }
74

            
75
    /// Helper: Return a lower and upper bound for the next delay to
76
    /// be yielded.
77
201
    fn delay_bounds(&self) -> (u32, u32) {
78
201
        let low = self.low_bound_ms;
79
201
        let high = std::cmp::max(
80
201
            // We don't need a saturating_add here, since low is always
81
201
            // less than high, so low cannot be equal to u32::MAX.
82
201
            low + 1,
83
201
            self.last_delay_ms.saturating_mul(MAX_DELAY_MULT),
84
201
        );
85
201
        (low, high)
86
201
    }
87

            
88
    /// Return the next delay to be used (in milliseconds), according
89
    /// to a given random number generator.
90
99
    pub fn next_delay_msec<R: Rng>(&mut self, rng: &mut R) -> u32 {
91
99
        let (low, high) = self.delay_bounds();
92
99
        assert!(low < high);
93

            
94
99
        let val = rng.gen_range(low..high);
95
99
        self.last_delay_ms = val;
96
99
        val
97
99
    }
98

            
99
    /// Return the next delay to be used (as a [`Duration`]),
100
    /// according to a given random number generator.
101
99
    pub fn next_delay<R: Rng>(&mut self, rng: &mut R) -> Duration {
102
99
        Duration::from_millis(u64::from(self.next_delay_msec(rng)))
103
99
    }
104
}
105

            
106
impl Default for RetryDelay {
107
    fn default() -> Self {
108
        RetryDelay::from_msec(0)
109
    }
110
}
111

            
112
/// Configuration for how many times to retry a download, with what
113
/// frequency.
114
186
#[derive(Debug, Copy, Clone, Deserialize, Eq, PartialEq)]
115
#[serde(deny_unknown_fields)]
116
pub struct DownloadSchedule {
117
    /// How many times to retry before giving up?
118
    num_retries: NonZeroU32,
119

            
120
    /// The amount of time to delay after the first failure, and a
121
    /// lower-bound for future delays.
122
    #[serde(with = "humantime_serde")]
123
    initial_delay: Duration,
124

            
125
    /// When we want to download a bunch of these at a time, how many
126
    /// attempts should we try to launch at once?
127
    #[serde(default = "default_parallelism")]
128
    parallelism: NonZeroU8,
129
}
130

            
131
impl Default for DownloadSchedule {
132
265
    fn default() -> Self {
133
265
        DownloadSchedule::new(3, Duration::from_millis(1000), 1)
134
265
    }
135
}
136

            
137
/// Return the default parallelism for DownloadSchedule.
138
30
fn default_parallelism() -> NonZeroU8 {
139
30
    #![allow(clippy::unwrap_used)]
140
30
    1.try_into().unwrap()
141
30
}
142

            
143
impl DownloadSchedule {
144
    /// Create a new DownloadSchedule to control our logic for retrying
145
    /// a given download.
146
    ///
147
    /// The resulting configuration will always make at least one
148
    /// attempt, and at most `attempts`.  After a failure, it will
149
    /// wait at least `initial_delay` before trying again.
150
    #[allow(clippy::missing_panics_doc)] // can't really panic.
151
570
    pub fn new(attempts: u32, initial_delay: Duration, parallelism: u8) -> Self {
152
570
        // If unwrapping `1.try_into()` is not safe there are bigger problems
153
570
        #![allow(clippy::unwrap_used)]
154
570
        let num_retries = attempts
155
570
            .try_into()
156
570
            .unwrap_or_else(|_| 1.try_into().unwrap());
157
570
        let parallelism = parallelism
158
570
            .try_into()
159
570
            .unwrap_or_else(|_| 1.try_into().unwrap());
160
570
        DownloadSchedule {
161
570
            num_retries,
162
570
            initial_delay,
163
570
            parallelism,
164
570
        }
165
570
    }
166

            
167
    /// Return an iterator to use over all the supported attempts for
168
    /// this configuration.
169
3
    pub fn attempts(&self) -> impl Iterator<Item = u32> {
170
3
        0..(self.num_retries.into())
171
3
    }
172

            
173
    /// Return the number of times that we're supposed to retry, according
174
    /// to this DownloadSchedule.
175
8
    pub fn n_attempts(&self) -> u32 {
176
8
        self.num_retries.into()
177
8
    }
178

            
179
    /// Return the number of parallel attempts that we're supposed to launch,
180
    /// according to this DownloadSchedule.
181
7
    pub fn parallelism(&self) -> u8 {
182
7
        self.parallelism.into()
183
7
    }
184

            
185
    /// Return a RetryDelay object for this configuration.
186
    ///
187
    /// If the initial delay is longer than 32
188
3
    pub fn schedule(&self) -> RetryDelay {
189
3
        RetryDelay::from_duration(self.initial_delay)
190
3
    }
191
}
192

            
193
#[cfg(test)]
194
mod test {
195
    use super::*;
196

            
197
    #[test]
198
    fn init() {
199
        let rd = RetryDelay::from_msec(2000);
200
        assert_eq!(rd.last_delay_ms, 0);
201
        assert_eq!(rd.low_bound_ms, 2000);
202

            
203
        let rd = RetryDelay::from_msec(0);
204
        assert_eq!(rd.last_delay_ms, 0);
205
        assert_eq!(rd.low_bound_ms, 1000);
206

            
207
        let rd = RetryDelay::from_duration(Duration::new(1, 500_000_000));
208
        assert_eq!(rd.last_delay_ms, 0);
209
        assert_eq!(rd.low_bound_ms, 1500);
210
    }
211

            
212
    #[test]
213
    fn bounds() {
214
        let mut rd = RetryDelay::from_msec(1000);
215
        assert_eq!(rd.delay_bounds(), (1000, 1001));
216
        rd.last_delay_ms = 1500;
217
        assert_eq!(rd.delay_bounds(), (1000, 4500));
218
        rd.last_delay_ms = 3_000_000_000;
219
        assert_eq!(rd.delay_bounds(), (1000, std::u32::MAX));
220
    }
221

            
222
    #[test]
223
    fn rng() {
224
        let mut rd = RetryDelay::from_msec(50);
225
        let real_low_bound = std::cmp::max(50, MIN_LOW_BOUND);
226

            
227
        let mut rng = rand::thread_rng();
228
        for _ in 1..100 {
229
            let (b_lo, b_hi) = rd.delay_bounds();
230
            assert!(b_lo == real_low_bound);
231
            assert!(b_hi > b_lo);
232
            let delay = rd.next_delay(&mut rng).as_millis() as u32;
233
            assert_eq!(delay, rd.last_delay_ms);
234
            assert!(delay >= b_lo);
235
            assert!(delay < b_hi);
236
        }
237
    }
238

            
239
    #[test]
240
    fn config() {
241
        // default configuration is 3 tries, 1000 msec initial delay
242
        let cfg = DownloadSchedule::default();
243

            
244
        assert_eq!(cfg.n_attempts(), 3);
245
        let v: Vec<_> = cfg.attempts().collect();
246
        assert_eq!(&v[..], &[0, 1, 2]);
247

            
248
        let sched = cfg.schedule();
249
        assert_eq!(sched.last_delay_ms, 0);
250
        assert_eq!(sched.low_bound_ms, 1000);
251

            
252
        // Try a zero-attempt schedule, and have it get remapped to 1,1
253
        let cfg = DownloadSchedule::new(0, Duration::new(0, 0), 0);
254
        assert_eq!(cfg.n_attempts(), 1);
255
        assert_eq!(cfg.parallelism(), 1);
256
        let v: Vec<_> = cfg.attempts().collect();
257
        assert_eq!(&v[..], &[0]);
258

            
259
        let sched = cfg.schedule();
260
        assert_eq!(sched.last_delay_ms, 0);
261
        assert_eq!(sched.low_bound_ms, 1000);
262
    }
263
}