1
//! Implement background tasks used by guard managers.
2
//!
3
//! These background tasks keep a weak reference to the [`GuardMgrInner`]
4
//! and use that to notice when they should shut down.
5

            
6
use crate::pending::{GuardStatus, RequestId};
7
use crate::GuardMgrInner;
8

            
9
#[cfg(test)]
10
use futures::channel::oneshot;
11
use futures::{channel::mpsc, stream::StreamExt};
12

            
13
use std::sync::{Mutex, Weak};
14

            
15
/// A message sent by to the [`report_status_events()`] task.
16
#[derive(Debug)]
17
pub(crate) enum Msg {
18
    /// A message sent by a [`GuardMonitor`](crate::GuardMonitor) to
19
    /// report the status of an attempt to use a guard.
20
    Status(RequestId, GuardStatus),
21
    /// Tells the task to reply on the provided oneshot::Sender once
22
    /// it has seen this message.  Used to indicate that the message
23
    /// queue is flushed.
24
    #[cfg(test)]
25
    Ping(oneshot::Sender<()>),
26
}
27

            
28
/// Background task: wait for messages about guard statuses, and
29
/// tell a guard manager about them.  Runs indefinitely.
30
///
31
/// Takes the [`GuardMgrInner`] by weak reference; if the guard
32
/// manager goes away, then this task exits.
33
///
34
/// Requires a `mpsc::Receiver` that is used to tell the task about
35
/// new status events to wait for.
36
26
pub(crate) async fn report_status_events(
37
26
    runtime: impl tor_rtcompat::SleepProvider,
38
26
    inner: Weak<Mutex<GuardMgrInner>>,
39
26
    mut events: mpsc::UnboundedReceiver<Msg>,
40
26
) {
41
    loop {
42
317
        match events.next().await {
43
276
            Some(Msg::Status(id, status)) => {
44
                // We've got a report about a guard status.
45
276
                if let Some(inner) = inner.upgrade() {
46
267
                    let mut inner = inner.lock().expect("Poisoned lock");
47
267
                    inner.handle_msg(id, status, &runtime);
48
267
                } else {
49
                    // The guard manager has gone away.
50
9
                    return;
51
                }
52
            }
53
            #[cfg(test)]
54
24
            Some(Msg::Ping(sender)) => {
55
24
                let _ignore = sender.send(());
56
24
            }
57
            // The streams have all closed.  (I think this is impossible?)
58
15
            None => return,
59
        }
60
        // TODO: Is this task guaranteed to exit?
61
    }
62
24
}
63

            
64
/// Background task to run periodic events on the guard manager.
65
///
66
/// The only role of this task is to invoke
67
/// [`GuardMgrInner::run_periodic_events`] from time to time, so that
68
/// it can perform housekeeping tasks.
69
///
70
/// Takes the [`GuardMgrInner`] by weak reference; if the guard
71
/// manager goes away, then this task exits.
72
26
pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
73
26
    runtime: R,
74
26
    inner: Weak<Mutex<GuardMgrInner>>,
75
26
) {
76
    loop {
77
33
        let delay = if let Some(inner) = inner.upgrade() {
78
26
            let mut inner = inner.lock().expect("Poisoned lock");
79
26
            let wallclock = runtime.wallclock();
80
26
            let now = runtime.now();
81
26
            inner.run_periodic_events(wallclock, now)
82
        } else {
83
            // The guard manager has gone away.
84
7
            return;
85
        };
86
26
        runtime.sleep(delay).await;
87
    }
88
7
}