1
//! Functions to download or load directory objects, using the
2
//! state machines in the `states` module.
3

            
4
use std::{
5
    collections::HashMap,
6
    sync::{Arc, Weak},
7
    time::{Duration, SystemTime},
8
};
9

            
10
use crate::{
11
    docid::{self, ClientRequest},
12
    upgrade_weak_ref, DirMgr, DirState, DocId, DocumentText, Error, Readiness, Result,
13
};
14

            
15
use futures::channel::oneshot;
16
use futures::FutureExt;
17
use futures::StreamExt;
18
use tor_dirclient::DirResponse;
19
use tor_rtcompat::{Runtime, SleepProviderExt};
20
use tracing::{info, trace, warn};
21

            
22
#[cfg(test)]
23
use once_cell::sync::Lazy;
24
#[cfg(test)]
25
use std::sync::Mutex;
26

            
27
/// Try to read a set of documents from `dirmgr` by ID.
28
6
fn load_all<R: Runtime>(
29
6
    dirmgr: &DirMgr<R>,
30
6
    missing: Vec<DocId>,
31
6
) -> Result<HashMap<DocId, DocumentText>> {
32
6
    let mut loaded = HashMap::new();
33
6
    for query in docid::partition_by_type(missing.into_iter()).values() {
34
6
        dirmgr.load_documents_into(query, &mut loaded)?;
35
    }
36
6
    Ok(loaded)
37
6
}
38

            
39
/// Testing helper: if this is Some, then we return it in place of any
40
/// response to fetch_single.
41
///
42
/// Note that only one test uses this: otherwise there would be a race
43
/// condition. :p
44
#[cfg(test)]
45
1
static CANNED_RESPONSE: Lazy<Mutex<Option<String>>> = Lazy::new(|| Mutex::new(None));
46

            
47
/// Launch a single client request and get an associated response.
48
1
async fn fetch_single<R: Runtime>(
49
1
    dirmgr: Arc<DirMgr<R>>,
50
1
    request: ClientRequest,
51
1
) -> Result<(ClientRequest, DirResponse)> {
52
1
    #[cfg(test)]
53
1
    {
54
1
        let m = CANNED_RESPONSE.lock().expect("Poisoned mutex");
55
1
        if let Some(s) = m.as_ref() {
56
1
            return Ok((request, DirResponse::from_body(s)));
57
        }
58
    }
59
    let circmgr = dirmgr.circmgr()?;
60
    let cur_netdir = dirmgr.opt_netdir();
61
    let config = dirmgr.config.get();
62
    let dirinfo = match cur_netdir {
63
        Some(ref netdir) => netdir.as_ref().into(),
64
        None => config.fallbacks().into(),
65
    };
66
    let resource =
67
        tor_dirclient::get_resource(request.as_requestable(), dirinfo, &dirmgr.runtime, circmgr)
68
            .await?;
69

            
70
    Ok((request, resource))
71
1
}
72

            
73
/// Launch a set of download requests for a set of missing objects in
74
/// `missing`, and return each request along with the response it received.
75
///
76
/// Don't launch more than `parallelism` requests at once.
77
1
async fn fetch_multiple<R: Runtime>(
78
1
    dirmgr: Arc<DirMgr<R>>,
79
1
    missing: Vec<DocId>,
80
1
    parallelism: usize,
81
1
) -> Result<Vec<(ClientRequest, DirResponse)>> {
82
1
    let mut requests = Vec::new();
83
1
    for (_type, query) in docid::partition_by_type(missing.into_iter()) {
84
1
        requests.extend(dirmgr.query_into_requests(query)?);
85
    }
86

            
87
    // TODO: instead of waiting for all the queries to finish, we
88
    // could stream the responses back or something.
89
1
    let responses: Vec<Result<(ClientRequest, DirResponse)>> = futures::stream::iter(requests)
90
1
        .map(|query| fetch_single(Arc::clone(&dirmgr), query))
91
1
        .buffer_unordered(parallelism)
92
1
        .collect()
93
        .await;
94

            
95
1
    let mut useful_responses = Vec::new();
96
2
    for r in responses {
97
        // TODO: on some error cases we might want to stop using this source.
98
1
        match r {
99
1
            Ok((request, response)) => {
100
1
                if response.status_code() == 200 {
101
1
                    useful_responses.push((request, response));
102
1
                } else {
103
                    trace!(
104
                        "cache declined request; reported status {:?}",
105
                        response.status_code()
106
                    );
107
                }
108
            }
109
            Err(e) => warn!("error while downloading: {:?}", e),
110
        }
111
    }
112

            
113
1
    Ok(useful_responses)
114
1
}
115

            
116
/// Try tp update `state` by loading cached information from `dirmgr`.
117
/// Return true if anything changed.
118
7
async fn load_once<R: Runtime>(
119
7
    dirmgr: &Arc<DirMgr<R>>,
120
7
    state: &mut Box<dyn DirState>,
121
7
) -> Result<bool> {
122
7
    let missing = state.missing_docs();
123
7
    let outcome = if missing.is_empty() {
124
        trace!("Found no missing documents; can't advance current state");
125
1
        Ok(false)
126
    } else {
127
        trace!(
128
            "Found {} missing documents; trying to load them",
129
            missing.len()
130
        );
131
6
        let documents = load_all(dirmgr, missing)?;
132
6
        state.add_from_cache(documents, dirmgr.store_if_rw())
133
    };
134

            
135
7
    if matches!(outcome, Ok(true)) {
136
6
        dirmgr.update_status(state.bootstrap_status());
137
6
    }
138

            
139
7
    outcome
140
7
}
141

            
142
/// Try to load as much state as possible for a provided `state` from the
143
/// cache in `dirmgr`, advancing the state to the extent possible.
144
///
145
/// No downloads are performed; the provided state will not be reset.
146
1
pub(crate) async fn load<R: Runtime>(
147
1
    dirmgr: Arc<DirMgr<R>>,
148
1
    mut state: Box<dyn DirState>,
149
1
) -> Result<Box<dyn DirState>> {
150
1
    let mut safety_counter = 0_usize;
151
    loop {
152
        trace!(state=%state.describe(), "Loading from cache");
153
3
        let changed = load_once(&dirmgr, &mut state).await?;
154

            
155
3
        if state.can_advance() {
156
1
            state = state.advance()?;
157
1
            safety_counter = 0;
158
        } else {
159
2
            if !changed {
160
1
                break;
161
1
            }
162
1
            safety_counter += 1;
163
1
            assert!(
164
1
                safety_counter < 100,
165
1
                "Spent 100 iterations in the same state: this is a bug"
166
1
            );
167
        }
168
    }
169

            
170
1
    Ok(state)
171
1
}
172

            
173
/// Helper: Make a set of download attempts for the current directory state,
174
/// and on success feed their results into the state object.
175
///
176
/// This can launch one or more download requests, but will not launch more
177
/// than `parallelism` requests at a time.
178
///
179
/// Return true if the state reports that it changed.
180
1
async fn download_attempt<R: Runtime>(
181
1
    dirmgr: &Arc<DirMgr<R>>,
182
1
    state: &mut Box<dyn DirState>,
183
1
    parallelism: usize,
184
1
) -> Result<bool> {
185
1
    let mut changed = false;
186
1
    let missing = state.missing_docs();
187
1
    let fetched = fetch_multiple(Arc::clone(dirmgr), missing, parallelism).await?;
188
2
    for (client_req, dir_response) in fetched {
189
1
        let text =
190
1
            String::from_utf8(dir_response.into_output()).map_err(Error::BadUtf8FromDirectory)?;
191
1
        match dirmgr.expand_response_text(&client_req, text) {
192
1
            Ok(text) => {
193
1
                let outcome = state.add_from_download(&text, &client_req, Some(&dirmgr.store));
194
1
                match outcome {
195
1
                    Ok(b) => changed |= b,
196
                    // TODO: in this case we might want to stop using this source.
197
                    Err(e) => warn!("error while adding directory info: {}", e),
198
                }
199
            }
200
            Err(e) => {
201
                // TODO: in this case we might want to stop using this source.
202
                warn!("Error when expanding directory text: {}", e);
203
            }
204
        }
205
    }
206

            
207
1
    if changed {
208
1
        dirmgr.update_status(state.bootstrap_status());
209
1
    }
210

            
211
1
    Ok(changed)
212
1
}
213

            
214
/// Download information into a DirState state machine until it is
215
/// ["complete"](Readiness::Complete), or until we hit a
216
/// non-recoverable error.
217
///
218
/// Use `dirmgr` to load from the cache or to launch downloads.
219
///
220
/// Keep resetting the state as needed.
221
///
222
/// The first time that the state becomes ["usable"](Readiness::Usable),
223
/// notify the sender in `on_usable`.
224
///
225
/// Return Err only on a non-recoverable error.  On an error that
226
/// merits another bootstrap attempt with the same state, return the
227
/// state and an Error object in an option.
228
2
pub(crate) async fn download<R: Runtime>(
229
2
    dirmgr: Weak<DirMgr<R>>,
230
2
    mut state: Box<dyn DirState>,
231
2
    on_usable: &mut Option<oneshot::Sender<()>>,
232
2
) -> Result<(Box<dyn DirState>, Option<Error>)> {
233
2
    let runtime = upgrade_weak_ref(&dirmgr)?.runtime.clone();
234

            
235
    'next_state: loop {
236
4
        let retry_config = state.dl_config()?;
237
4
        let parallelism = retry_config.parallelism();
238

            
239
        // In theory this could be inside the loop below maybe?  If we
240
        // want to drop the restriction that the missing() members of a
241
        // state must never grow, then we'll need to move it inside.
242
        {
243
4
            let dirmgr = upgrade_weak_ref(&dirmgr)?;
244
4
            load_once(&dirmgr, &mut state).await?;
245
        }
246

            
247
        // Skip the downloads if we can...
248
4
        if state.can_advance() {
249
2
            state = state.advance()?;
250
2
            continue 'next_state;
251
2
        }
252
2
        if state.is_ready(Readiness::Complete) {
253
1
            return Ok((state, None));
254
1
        }
255
1

            
256
1
        let mut retry = retry_config.schedule();
257

            
258
        // Make several attempts to fetch whatever we're missing,
259
        // until either we can advance, or we've got a complete
260
        // document, or we run out of tries, or we run out of time.
261
1
        'next_attempt: for attempt in retry_config.attempts() {
262
            info!("{}: {}", attempt + 1, state.describe());
263
1
            let reset_time = no_more_than_a_week_from(SystemTime::now(), state.reset_time());
264

            
265
            {
266
1
                let dirmgr = upgrade_weak_ref(&dirmgr)?;
267
2
                futures::select_biased! {
268
1
                    outcome = download_attempt(&dirmgr, &mut state, parallelism.into()).fuse() => {
269
1
                        match outcome {
270
1
                            Err(e) => {
271
1
                                warn!("Error while downloading: {}", e);
272
1
                                continue 'next_attempt;
273
1
                            }
274
1
                            Ok(changed) => {
275
1
                                changed
276
1
                            }
277
1
                        }
278
1
                    }
279
1
                    _ = runtime.sleep_until_wallclock(reset_time).fuse() => {
280
1
                        // We need to reset. This can happen if (for
281
1
                        // example) we're downloading the last few
282
1
                        // microdescriptors on a consensus that now
283
1
                        // we're ready to replace.
284
1
                        state = state.reset()?;
285
1
                        continue 'next_state;
286
1
                    },
287
1
                };
288
            }
289

            
290
            // Exit if there is nothing more to download.
291
1
            if state.is_ready(Readiness::Complete) {
292
1
                return Ok((state, None));
293
            }
294

            
295
            // Report usable-ness if appropriate.
296
            if on_usable.is_some() && state.is_ready(Readiness::Usable) {
297
                // Unwrap should be safe due to parent `.is_some()` check
298
                #[allow(clippy::unwrap_used)]
299
                let _ = on_usable.take().unwrap().send(());
300
            }
301

            
302
            if state.can_advance() {
303
                // We have enough info to advance to another state.
304
                state = state.advance()?;
305
                continue 'next_state;
306
            } else {
307
                // We should wait a bit, and then retry.
308
                // TODO: we shouldn't wait on the final attempt.
309
                let reset_time = no_more_than_a_week_from(SystemTime::now(), state.reset_time());
310
                let delay = retry.next_delay(&mut rand::thread_rng());
311
                futures::select_biased! {
312
                    _ = runtime.sleep_until_wallclock(reset_time).fuse() => {
313
                        state = state.reset()?;
314
                        continue 'next_state;
315
                    }
316
                    _ = FutureExt::fuse(runtime.sleep(delay)) => {}
317
                };
318
            }
319
        }
320

            
321
        // We didn't advance the state, after all the retries.
322
        warn!(n_attempts=retry_config.n_attempts(),
323
              state=%state.describe(),
324
              "Unable to advance downloading state");
325
        return Ok((state, Some(Error::CantAdvanceState)));
326
    }
327
2
}
328

            
329
/// Helper: Clamp `v` so that it is no more than one week from `now`.
330
///
331
/// If `v` is absent, return the time that's one week from now.
332
///
333
/// We use this to determine a reset time when no reset time is
334
/// available, or when it is too far in the future.
335
5
fn no_more_than_a_week_from(now: SystemTime, v: Option<SystemTime>) -> SystemTime {
336
5
    let one_week_later = now + Duration::new(86400 * 7, 0);
337
5
    match v {
338
3
        Some(t) => std::cmp::min(t, one_week_later),
339
2
        None => one_week_later,
340
    }
341
5
}
342

            
343
#[cfg(test)]
344
mod test {
345
    #![allow(clippy::unwrap_used)]
346
    use super::*;
347
    use crate::storage::DynStore;
348
    use crate::test::new_mgr;
349
    use crate::DownloadSchedule;
350
    use std::convert::TryInto;
351
    use std::sync::Mutex;
352
    use tor_netdoc::doc::microdesc::MdDigest;
353

            
354
    #[test]
355
    fn week() {
356
        let now = SystemTime::now();
357
        let one_day = Duration::new(86400, 0);
358

            
359
        assert_eq!(no_more_than_a_week_from(now, None), now + one_day * 7);
360
        assert_eq!(
361
            no_more_than_a_week_from(now, Some(now + one_day)),
362
            now + one_day
363
        );
364
        assert_eq!(
365
            no_more_than_a_week_from(now, Some(now - one_day)),
366
            now - one_day
367
        );
368
        assert_eq!(
369
            no_more_than_a_week_from(now, Some(now + 30 * one_day)),
370
            now + one_day * 7
371
        );
372
    }
373

            
374
    /// A fake implementation of DirState that just wants a fixed set
375
    /// of microdescriptors.  It doesn't care if it gets them: it just
376
    /// wants to be told that the IDs exist.
377
    #[derive(Debug, Clone)]
378
    struct DemoState {
379
        second_time_around: bool,
380
        got_items: HashMap<MdDigest, bool>,
381
    }
382

            
383
    // Constants from Lou Reed
384
    const H1: MdDigest = *b"satellite's gone up to the skies";
385
    const H2: MdDigest = *b"things like that drive me out of";
386
    const H3: MdDigest = *b"my mind i watched it for a littl";
387
    const H4: MdDigest = *b"while i like to watch things on ";
388
    const H5: MdDigest = *b"TV Satellite of love Satellite--";
389

            
390
    impl DemoState {
391
        fn new1() -> Self {
392
            DemoState {
393
                second_time_around: false,
394
                got_items: vec![(H1, false), (H2, false)].into_iter().collect(),
395
            }
396
        }
397
        fn new2() -> Self {
398
            DemoState {
399
                second_time_around: true,
400
                got_items: vec![(H3, false), (H4, false), (H5, false)]
401
                    .into_iter()
402
                    .collect(),
403
            }
404
        }
405
        fn n_ready(&self) -> usize {
406
            self.got_items.values().filter(|x| **x).count()
407
        }
408
    }
409

            
410
    impl DirState for DemoState {
411
        fn describe(&self) -> String {
412
            format!("{:?}", &self)
413
        }
414
        fn bootstrap_status(&self) -> crate::event::DirStatus {
415
            crate::event::DirStatus::default()
416
        }
417
        fn is_ready(&self, ready: Readiness) -> bool {
418
            match (ready, self.second_time_around) {
419
                (_, false) => false,
420
                (Readiness::Complete, true) => self.n_ready() == self.got_items.len(),
421
                (Readiness::Usable, true) => self.n_ready() >= self.got_items.len() - 1,
422
            }
423
        }
424
        fn can_advance(&self) -> bool {
425
            if self.second_time_around {
426
                false
427
            } else {
428
                self.n_ready() == self.got_items.len()
429
            }
430
        }
431
        fn missing_docs(&self) -> Vec<DocId> {
432
            self.got_items
433
                .iter()
434
                .filter_map(|(id, have)| {
435
                    if *have {
436
                        None
437
                    } else {
438
                        Some(DocId::Microdesc(*id))
439
                    }
440
                })
441
                .collect()
442
        }
443
        fn add_from_cache(
444
            &mut self,
445
            docs: HashMap<DocId, DocumentText>,
446
            _storage: Option<&Mutex<DynStore>>,
447
        ) -> Result<bool> {
448
            let mut changed = false;
449
            for id in docs.keys() {
450
                if let DocId::Microdesc(id) = id {
451
                    if self.got_items.get(id) == Some(&false) {
452
                        self.got_items.insert(*id, true);
453
                        changed = true;
454
                    }
455
                }
456
            }
457
            Ok(changed)
458
        }
459
        fn add_from_download(
460
            &mut self,
461
            text: &str,
462
            _request: &ClientRequest,
463
            _storage: Option<&Mutex<DynStore>>,
464
        ) -> Result<bool> {
465
            let mut changed = false;
466
            for token in text.split_ascii_whitespace() {
467
                if let Ok(v) = hex::decode(token) {
468
                    if let Ok(id) = v.try_into() {
469
                        if self.got_items.get(&id) == Some(&false) {
470
                            self.got_items.insert(id, true);
471
                            changed = true;
472
                        }
473
                    }
474
                }
475
            }
476
            Ok(changed)
477
        }
478
        fn dl_config(&self) -> Result<DownloadSchedule> {
479
            Ok(DownloadSchedule::default())
480
        }
481
        fn advance(self: Box<Self>) -> Result<Box<dyn DirState>> {
482
            if self.can_advance() {
483
                Ok(Box::new(Self::new2()))
484
            } else {
485
                Ok(self)
486
            }
487
        }
488
        fn reset_time(&self) -> Option<SystemTime> {
489
            None
490
        }
491
        fn reset(self: Box<Self>) -> Result<Box<dyn DirState>> {
492
            Ok(Box::new(Self::new1()))
493
        }
494
    }
495

            
496
    #[test]
497
    fn all_in_cache() {
498
        // Let's try bootstrapping when everything is in the cache.
499
        tor_rtcompat::test_with_one_runtime!(|rt| async {
500
            let (_tempdir, mgr) = new_mgr(rt);
501

            
502
            {
503
                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
504
                for h in [H1, H2, H3, H4, H5] {
505
                    store
506
                        .store_microdescs(&[("ignore", &h)], SystemTime::now())
507
                        .unwrap();
508
                }
509
            }
510
            let mgr = Arc::new(mgr);
511

            
512
            // Try just a load.
513
            let state = Box::new(DemoState::new1());
514
            let result = super::load(Arc::clone(&mgr), state).await.unwrap();
515
            assert!(result.is_ready(Readiness::Complete));
516

            
517
            // Try a bootstrap that could (but won't!) download.
518
            let state = Box::new(DemoState::new1());
519

            
520
            let mut on_usable = None;
521
            let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
522
                .await
523
                .unwrap();
524
            assert!(result.0.is_ready(Readiness::Complete));
525
        });
526
    }
527

            
528
    #[test]
529
    fn partly_in_cache() {
530
        // Let's try bootstrapping with all of phase1 and part of
531
        // phase 2 in cache.
532
        tor_rtcompat::test_with_one_runtime!(|rt| async {
533
            let (_tempdir, mgr) = new_mgr(rt);
534

            
535
            {
536
                let mut store = mgr.store_if_rw().unwrap().lock().unwrap();
537
                for h in [H1, H2, H3] {
538
                    store
539
                        .store_microdescs(&[("ignore", &h)], SystemTime::now())
540
                        .unwrap();
541
                }
542
            }
543
            {
544
                let mut resp = CANNED_RESPONSE.lock().unwrap();
545
                // H4 and H5.
546
                *resp = Some(
547
                    "7768696c652069206c696b6520746f207761746368207468696e6773206f6e20
548
                     545620536174656c6c697465206f66206c6f766520536174656c6c6974652d2d"
549
                        .to_owned(),
550
                );
551
            }
552
            let mgr = Arc::new(mgr);
553
            let mut on_usable = None;
554

            
555
            let state = Box::new(DemoState::new1());
556
            let result = super::download(Arc::downgrade(&mgr), state, &mut on_usable)
557
                .await
558
                .unwrap();
559
            assert!(result.0.is_ready(Readiness::Complete));
560
        });
561
    }
562
}