1
//! Declarations for a [`TimeoutEstimator`] type that can change implementation.
2

            
3
use crate::timeouts::{
4
    pareto::{ParetoTimeoutEstimator, ParetoTimeoutState},
5
    readonly::ReadonlyTimeoutEstimator,
6
    Action, TimeoutEstimator,
7
};
8
use crate::TimeoutStateHandle;
9
use std::sync::Mutex;
10
use std::time::Duration;
11
use tor_netdir::params::NetParameters;
12
use tracing::{debug, warn};
13

            
14
/// A timeout estimator that can change its inner implementation and share its
15
/// implementation among multiple threads.
16
pub(crate) struct Estimator {
17
    /// The estimator we're currently using.
18
    inner: Mutex<Box<dyn TimeoutEstimator + Send + 'static>>,
19
}
20

            
21
impl Estimator {
22
    /// Construct a new estimator from some variant.
23
    #[cfg(test)]
24
16
    pub(crate) fn new(est: impl TimeoutEstimator + Send + 'static) -> Self {
25
16
        Self {
26
16
            inner: Mutex::new(Box::new(est)),
27
16
        }
28
16
    }
29

            
30
    /// Create this estimator based on the values stored in `storage`, and whether
31
    /// this storage is read-only.
32
24
    pub(crate) fn from_storage(storage: &TimeoutStateHandle) -> Self {
33
24
        let (_, est) = estimator_from_storage(storage);
34
24
        Self {
35
24
            inner: Mutex::new(est),
36
24
        }
37
24
    }
38

            
39
    /// Assuming that we can read and write to `storage`, replace our state with
40
    /// a new state that estimates timeouts.
41
23
    pub(crate) fn upgrade_to_owning_storage(&self, storage: &TimeoutStateHandle) {
42
23
        let (readonly, est) = estimator_from_storage(storage);
43
23
        if readonly {
44
            warn!("Unable to upgrade to owned persistent storage.");
45
            return;
46
23
        }
47
23
        *self.inner.lock().expect("Timeout estimator lock poisoned") = est;
48
23
    }
49

            
50
    /// Replace the contents of this estimator with a read-only state estimator
51
    /// based on the contents of `storage`.
52
    pub(crate) fn reload_readonly_from_storage(&self, storage: &TimeoutStateHandle) {
53
1
        if let Ok(Some(v)) = storage.load() {
54
1
            let est = ReadonlyTimeoutEstimator::from_state(&v);
55
1
            *self.inner.lock().expect("Timeout estimator lock poisoned") = Box::new(est);
56
1
        } else {
57
            debug!("Unable to reload timeout state.");
58
        }
59
1
    }
60

            
61
    /// Record that a given circuit hop has completed.
62
    ///
63
    /// The `hop` number is a zero-indexed value for which hop just completed.
64
    ///
65
    /// The `delay` value is the amount of time after we first launched the
66
    /// circuit.
67
    ///
68
    /// If this is the last hop of the circuit, then `is_last` is true.
69
1736
    pub(crate) fn note_hop_completed(&self, hop: u8, delay: Duration, is_last: bool) {
70
1736
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
71
1736

            
72
1736
        inner.note_hop_completed(hop, delay, is_last);
73
1736
    }
74

            
75
    /// Record that a circuit failed to complete because it took too long.
76
    ///
77
    /// The `hop` number is a the number of hops that were successfully
78
    /// completed.
79
    ///
80
    /// The `delay` number is the amount of time after we first launched the
81
    /// circuit.
82
8
    pub(crate) fn note_circ_timeout(&self, hop: u8, delay: Duration) {
83
8
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
84
8
        inner.note_circ_timeout(hop, delay);
85
8
    }
86

            
87
    /// Return the current estimation for how long we should wait for a given
88
    /// [`Action`] to complete.
89
    ///
90
    /// This function should return a 2-tuple of `(timeout, abandon)`
91
    /// durations.  After `timeout` has elapsed since circuit launch,
92
    /// the circuit should no longer be used, but we should still keep
93
    /// building it in order see how long it takes.  After `abandon`
94
    /// has elapsed since circuit launch, the circuit should be
95
    /// abandoned completely.
96
24
    pub(crate) fn timeouts(&self, action: &Action) -> (Duration, Duration) {
97
24
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
98
24

            
99
24
        inner.timeouts(action)
100
24
    }
101

            
102
    /// Return true if we're currently trying to learn more timeouts
103
    /// by launching testing circuits.
104
3
    pub(crate) fn learning_timeouts(&self) -> bool {
105
3
        let inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
106
3
        inner.learning_timeouts()
107
3
    }
108

            
109
    /// Replace the network parameters used by this estimator (if any)
110
    /// with ones derived from `params`.
111
2
    pub(crate) fn update_params(&self, params: &NetParameters) {
112
2
        let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
113
2
        inner.update_params(params);
114
2
    }
115

            
116
    /// Store any state associated with this timeout estimator into `storage`.
117
2
    pub(crate) fn save_state(&self, storage: &TimeoutStateHandle) -> crate::Result<()> {
118
2
        let state = {
119
2
            let mut inner = self.inner.lock().expect("Timeout estimator lock poisoned.");
120
2
            inner.build_state()
121
        };
122
2
        if let Some(state) = state {
123
2
            storage.store(&state)?;
124
        }
125
2
        Ok(())
126
2
    }
127
}
128

            
129
/// Try to construct a new boxed TimeoutEstimator based on the contents of
130
/// storage, and whether it is read-only.
131
///
132
/// Returns true on a read-only state.
133
47
fn estimator_from_storage(
134
47
    storage: &TimeoutStateHandle,
135
47
) -> (bool, Box<dyn TimeoutEstimator + Send + 'static>) {
136
47
    let state = match storage.load() {
137
2
        Ok(Some(v)) => v,
138
45
        Ok(None) => ParetoTimeoutState::default(),
139
        Err(e) => {
140
            warn!("Unable to load timeout state: {}", e);
141
            return (true, Box::new(ReadonlyTimeoutEstimator::new()));
142
        }
143
    };
144

            
145
47
    if storage.can_store() {
146
        // We own the lock, so we're going to use a full estimator.
147
24
        (false, Box::new(ParetoTimeoutEstimator::from_state(state)))
148
    } else {
149
23
        (true, Box::new(ReadonlyTimeoutEstimator::from_state(&state)))
150
    }
151
47
}
152

            
153
#[cfg(test)]
154
mod test {
155
    #![allow(clippy::unwrap_used)]
156
    use super::*;
157
    use tor_persist::StateMgr;
158

            
159
    #[test]
160
    fn load_estimator() {
161
        let params = NetParameters::default();
162

            
163
        // Construct an estimator with write access to a state manager.
164
        let storage = tor_persist::TestingStateMgr::new();
165
        assert!(storage.try_lock().unwrap().held());
166
        let handle = storage.clone().create_handle("paretorama");
167

            
168
        let est = Estimator::from_storage(&handle);
169
        assert!(est.learning_timeouts());
170
        est.save_state(&handle).unwrap();
171

            
172
        // Construct another estimator that is looking at the same data,
173
        // but which only gets read-only access
174
        let storage2 = storage.new_manager();
175
        assert!(!storage2.try_lock().unwrap().held());
176
        let handle2 = storage2.clone().create_handle("paretorama");
177

            
178
        let est2 = Estimator::from_storage(&handle2);
179
        assert!(!est2.learning_timeouts());
180

            
181
        est.update_params(&params);
182
        est2.update_params(&params);
183

            
184
        // Initial timeouts, since no data is present yet.
185
        let act = Action::BuildCircuit { length: 3 };
186
        assert_eq!(
187
            est.timeouts(&act),
188
            (Duration::from_secs(60), Duration::from_secs(60))
189
        );
190
        assert_eq!(
191
            est2.timeouts(&act),
192
            (Duration::from_secs(60), Duration::from_secs(60))
193
        );
194

            
195
        // Pretend both estimators have gotten a bunch of observations...
196
        for _ in 0..500 {
197
            est.note_hop_completed(2, Duration::from_secs(7), true);
198
            est.note_hop_completed(2, Duration::from_secs(2), true);
199
            est2.note_hop_completed(2, Duration::from_secs(4), true);
200
        }
201
        assert!(!est.learning_timeouts());
202

            
203
        // Have est save and est2 load.
204
        est.save_state(&handle).unwrap();
205
        let to_1 = est.timeouts(&act);
206
        assert_ne!(
207
            est.timeouts(&act),
208
            (Duration::from_secs(60), Duration::from_secs(60))
209
        );
210
        assert_eq!(
211
            est2.timeouts(&act),
212
            (Duration::from_secs(60), Duration::from_secs(60))
213
        );
214
        est2.reload_readonly_from_storage(&handle2);
215
        let to_1_secs = to_1.0.as_secs_f64();
216
        let timeouts = est2.timeouts(&act);
217
        assert!((timeouts.0.as_secs_f64() - to_1_secs).abs() < 0.001);
218
        assert!((timeouts.1.as_secs_f64() - to_1_secs).abs() < 0.001);
219

            
220
        drop(est);
221
        drop(handle);
222
        drop(storage);
223

            
224
        // Now storage2 can upgrade...
225
        assert!(storage2.try_lock().unwrap().held());
226
        est2.upgrade_to_owning_storage(&handle2);
227
        let to_2 = est2.timeouts(&act);
228
        // This will be similar but not the same.
229
        assert!(to_2.0 > to_1.0 - Duration::from_secs(1));
230
        assert!(to_2.0 < to_1.0 + Duration::from_secs(1));
231
        // Make sure est2 is now mutable...
232
        for _ in 0..200 {
233
            est2.note_hop_completed(2, Duration::from_secs(1), true);
234
        }
235
        let to_3 = est2.timeouts(&act);
236
        assert!(to_3.0 < to_2.0);
237
    }
238
}