1
//! Management for flow control windows.
2
//!
3
//! Tor maintains a separate windows on circuits and on streams.
4
//! These are controlled by SENDME cells, which (confusingly) are
5
//! applied either at the circuit or the stream level depending on
6
//! whether they have a stream ID set.
7
//!
8
//! Circuit sendmes are _authenticated_: they include a cryptographic
9
//! tag generated by the cryptography layer.  This tag proves that the
10
//! other side of the circuit really has read all of the data that it's
11
//! acknowledging.
12

            
13
use std::collections::VecDeque;
14

            
15
use tor_cell::relaycell::msg::RelayMsg;
16
use tor_cell::relaycell::RelayCell;
17
use tor_error::internal;
18

            
19
use crate::{Error, Result};
20

            
21
/// Tag type used in regular v1 sendme cells.
22
///
23
// TODO(nickm):
24
// Three problems with this tag:
25
//  - First, we need to support unauthenticated flow control, but we
26
//    still record the tags that we _would_ expect.
27
//  - Second, this tag type could be different for each layer, if we
28
//    eventually have an authenticator that isn't 20 bytes long.
29
20
#[derive(Clone, Debug)]
30
pub(crate) struct CircTag([u8; 20]);
31

            
32
impl From<[u8; 20]> for CircTag {
33
36
    fn from(v: [u8; 20]) -> CircTag {
34
36
        Self(v)
35
36
    }
36
}
37
impl PartialEq for CircTag {
38
12
    fn eq(&self, other: &Self) -> bool {
39
12
        crate::util::ct::bytes_eq(&self.0, &other.0)
40
12
    }
41
}
42
impl Eq for CircTag {}
43
impl PartialEq<[u8; 20]> for CircTag {
44
8
    fn eq(&self, other: &[u8; 20]) -> bool {
45
8
        crate::util::ct::bytes_eq(&self.0, &other[..])
46
8
    }
47
}
48

            
49
/// Absence of a tag, as with stream cells.
50
pub(crate) type NoTag = ();
51

            
52
/// A circuit's send window.
53
pub(crate) type CircSendWindow = SendWindow<CircParams, CircTag>;
54
/// A stream's send window.
55
pub(crate) type StreamSendWindow = SendWindow<StreamParams, NoTag>;
56

            
57
/// A circuit's receive window.
58
pub(crate) type CircRecvWindow = RecvWindow<CircParams>;
59
/// A stream's receive window.
60
pub(crate) type StreamRecvWindow = RecvWindow<StreamParams>;
61

            
62
/// Tracks how many cells we can safely send on a circuit or stream.
63
///
64
/// Additionally, remembers a list of tags that could be used to
65
/// acknowledge the cells we have already sent, so we know it's safe
66
/// to send more.
67
pub(crate) struct SendWindow<P, T>
68
where
69
    P: WindowParams,
70
    T: PartialEq + Eq + Clone,
71
{
72
    /// Current value for this window
73
    window: u16,
74
    /// Tag values that incoming "SENDME" messages need to match in order
75
    /// for us to send more data.
76
    tags: VecDeque<T>,
77
    /// Marker type to tell the compiler that the P type is used.
78
    _dummy: std::marker::PhantomData<P>,
79
}
80

            
81
/// Helper: parametrizes a window to determine its maximum and its increment.
82
pub(crate) trait WindowParams {
83
    /// Largest allowable value for this window.
84
    fn maximum() -> u16;
85
    /// Increment for this window.
86
    fn increment() -> u16;
87
}
88

            
89
/// Parameters used for SENDME windows on circuits: limit at 1000 cells,
90
/// and each SENDME adjusts by 100.
91
pub(crate) struct CircParams;
92
impl WindowParams for CircParams {
93
    fn maximum() -> u16 {
94
        1000
95
    }
96
4196
    fn increment() -> u16 {
97
4196
        100
98
4196
    }
99
}
100

            
101
/// Parameters used for SENDME windows on streams: limit at 500 cells,
102
/// and each SENDME adjusts by 50.
103
#[derive(Clone, Debug)]
104
pub(crate) struct StreamParams;
105
impl WindowParams for StreamParams {
106
    fn maximum() -> u16 {
107
        500
108
    }
109
2638
    fn increment() -> u16 {
110
2638
        50
111
2638
    }
112
}
113

            
114
impl<P, T> SendWindow<P, T>
115
where
116
    P: WindowParams,
117
    T: PartialEq + Eq + Clone,
118
{
119
    /// Construct a new SendWindow.
120
266
    pub(crate) fn new(window: u16) -> SendWindow<P, T> {
121
266
        let increment = P::increment();
122
266
        let capacity = (window + increment - 1) / increment;
123
266
        SendWindow {
124
266
            window,
125
266
            tags: VecDeque::with_capacity(capacity as usize),
126
266
            _dummy: std::marker::PhantomData,
127
266
        }
128
266
    }
129

            
130
    /// Remove one item from this window (since we've sent a cell).
131
    /// If the window was empty, returns an error.
132
    ///
133
    /// The provided tag is the one associated with the crypto layer that
134
    /// originated the cell.  It will get cloned and recorded if we'll
135
    /// need to check for it later.
136
    ///
137
    /// Return the number of cells left in the window.
138
    pub(crate) fn take<U>(&mut self, tag: &U) -> Result<u16>
139
    where
140
        U: Clone + Into<T>,
141
    {
142
6477
        if let Some(val) = self.window.checked_sub(1) {
143
6476
            self.window = val;
144
6476
            if self.window % P::increment() == 0 {
145
89
                // We record this tag.
146
89
                // TODO: I'm not saying that this cell in particular
147
89
                // matches the spec, but Tor seems to like it.
148
89
                self.tags.push_back(tag.clone().into());
149
6387
            }
150

            
151
6476
            Ok(val)
152
        } else {
153
1
            Err(Error::CircProto(
154
1
                "Called SendWindow::take() on empty SendWindow".into(),
155
1
            ))
156
        }
157
6477
    }
158

            
159
    /// Handle an incoming sendme with a provided tag.
160
    ///
161
    /// If the tag is None, then we don't enforce tag requirements. (We can
162
    /// remove this option once we no longer support getting SENDME cells
163
    /// from relays without the FlowCtrl=1 protocol.)
164
    ///
165
    /// On success, return the number of cells left in the window.
166
    ///
167
    /// On failure, return None: the caller should close the stream
168
    /// or circuit with a protocol error.
169
    #[must_use = "didn't check whether SENDME was expected and tag was right."]
170
    pub(crate) fn put<U>(&mut self, tag: Option<U>) -> Result<u16>
171
    where
172
        T: PartialEq<U>,
173
    {
174
21
        match (self.tags.front(), tag) {
175
17
            (Some(t), Some(tag)) if t == &tag => {} // this is the right tag.
176
1
            (Some(_), None) => {}                   // didn't need a tag.
177
            (Some(_), Some(_)) => {
178
5
                return Err(Error::CircProto("Mismatched tag on circuit SENDME".into()));
179
            }
180
            (None, _) => {
181
3
                return Err(Error::CircProto(
182
3
                    "Received a SENDME when none was expected".into(),
183
3
                ));
184
            }
185
        }
186
13
        self.tags.pop_front();
187

            
188
13
        let v = self
189
13
            .window
190
13
            .checked_add(P::increment())
191
13
            .ok_or_else(|| Error::from(internal!("Overflow on SENDME window")))?;
192
13
        self.window = v;
193
13
        Ok(v)
194
21
    }
195

            
196
    /// Return the current send window value.
197
15599
    pub(crate) fn window(&self) -> u16 {
198
15599
        self.window
199
15599
    }
200

            
201
    /// For testing: get a copy of the current send window, and the
202
    /// expected incoming tags.
203
    #[cfg(test)]
204
8
    pub(crate) fn window_and_expected_tags(&self) -> (u16, Vec<T>) {
205
8
        let tags = self.tags.iter().map(Clone::clone).collect();
206
8
        (self.window, tags)
207
8
    }
208
}
209

            
210
/// Structure to track when we need to send SENDME cells for incoming data.
211
#[derive(Clone, Debug)]
212
pub(crate) struct RecvWindow<P: WindowParams> {
213
    /// Number of cells that we'd be willing to receive on this window
214
    /// before sending a SENDME.
215
    window: u16,
216
    /// Marker type to tell the compiler that the P type is used.
217
    _dummy: std::marker::PhantomData<P>,
218
}
219

            
220
impl<P: WindowParams> RecvWindow<P> {
221
    /// Create a new RecvWindow.
222
138
    pub(crate) fn new(window: u16) -> RecvWindow<P> {
223
138
        RecvWindow {
224
138
            window,
225
138
            _dummy: std::marker::PhantomData,
226
138
        }
227
138
    }
228

            
229
    /// Called when we've just received a cell; return true if we need to send
230
    /// a sendme, and false otherwise.
231
    ///
232
    /// Returns None if we should not have sent the cell, and we just
233
    /// violated the window.
234
80
    pub(crate) fn take(&mut self) -> Result<bool> {
235
80
        let v = self.window.checked_sub(1);
236
80
        if let Some(x) = v {
237
78
            self.window = x;
238
78
            // TODO: same note as in SendWindow.take(). I don't know if
239
78
            // this truly matches the spec, but tor accepts it.
240
78
            Ok(x % P::increment() == 0)
241
        } else {
242
2
            Err(Error::CircProto(
243
2
                "Received a data cell in violation of a window".into(),
244
2
            ))
245
        }
246
80
    }
247

            
248
    /// Reduce this window by `n`; give an error if this is not possible.
249
4
    pub(crate) fn decrement_n(&mut self, n: u16) -> crate::Result<()> {
250
4
        let v = self.window.checked_sub(n);
251
4
        if let Some(x) = v {
252
3
            self.window = x;
253
3
            Ok(())
254
        } else {
255
1
            Err(crate::Error::CircProto(
256
1
                "Received too many cells on a stream".into(),
257
1
            ))
258
        }
259
4
    }
260

            
261
    /// Called when we've just sent a SENDME.
262
1
    pub(crate) fn put(&mut self) {
263
1
        self.window = self
264
1
            .window
265
1
            .checked_add(P::increment())
266
1
            .expect("Overflow detected while attempting to increment window");
267
1
    }
268
}
269

            
270
/// Return true if this message is counted by flow-control windows.
271
2519
pub(crate) fn msg_counts_towards_windows(msg: &RelayMsg) -> bool {
272
2519
    matches!(msg, RelayMsg::Data(_))
273
2519
}
274

            
275
/// Return true if this message is counted by flow-control windows.
276
2497
pub(crate) fn cell_counts_towards_windows(cell: &RelayCell) -> bool {
277
2497
    msg_counts_towards_windows(cell.msg())
278
2497
}
279

            
280
#[cfg(test)]
281
mod test {
282
    #![allow(clippy::unwrap_used)]
283
    use super::*;
284
    use tor_cell::relaycell::{msg, RelayCell};
285

            
286
    #[test]
287
    fn what_counts() {
288
        let m = msg::Begin::new("www.torproject.org", 443, 0)
289
            .unwrap()
290
            .into();
291
        assert!(!msg_counts_towards_windows(&m));
292
        assert!(!cell_counts_towards_windows(&RelayCell::new(77.into(), m)));
293

            
294
        let m = msg::Data::new(&b"Education is not a prerequisite to political control-political control is the cause of popular education."[..]).unwrap().into(); // Du Bois
295
        assert!(msg_counts_towards_windows(&m));
296
        assert!(cell_counts_towards_windows(&RelayCell::new(128.into(), m)));
297
    }
298

            
299
    #[test]
300
    fn recvwindow() {
301
        let mut w: RecvWindow<StreamParams> = RecvWindow::new(500);
302

            
303
        for _ in 0..49 {
304
            assert!(!w.take().unwrap());
305
        }
306
        assert!(w.take().unwrap());
307
        assert_eq!(w.window, 450);
308

            
309
        assert!(w.decrement_n(123).is_ok());
310
        assert_eq!(w.window, 327);
311

            
312
        w.put();
313
        assert_eq!(w.window, 377);
314

            
315
        // failing decrement.
316
        assert!(w.decrement_n(400).is_err());
317
        // failing take.
318
        assert!(w.decrement_n(377).is_ok());
319
        assert!(w.take().is_err());
320
    }
321

            
322
    fn new_sendwindow() -> SendWindow<CircParams, &'static str> {
323
        SendWindow::new(1000)
324
    }
325

            
326
    #[test]
327
    fn sendwindow_basic() -> Result<()> {
328
        let mut w = new_sendwindow();
329

            
330
        let n = w.take(&"Hello")?;
331
        assert_eq!(n, 999);
332
        for _ in 0_usize..98 {
333
            w.take(&"world")?;
334
        }
335
        assert_eq!(w.window, 901);
336
        assert_eq!(w.tags.len(), 0);
337

            
338
        let n = w.take(&"and")?;
339
        assert_eq!(n, 900);
340
        assert_eq!(w.tags.len(), 1);
341
        assert_eq!(w.tags[0], "and");
342

            
343
        let n = w.take(&"goodbye")?;
344
        assert_eq!(n, 899);
345
        assert_eq!(w.tags.len(), 1);
346

            
347
        // Try putting a good tag.
348
        let n = w.put(Some("and"));
349
        assert_eq!(n?, 999);
350
        assert_eq!(w.tags.len(), 0);
351

            
352
        for _ in 0_usize..300 {
353
            w.take(&"dreamland")?;
354
        }
355
        assert_eq!(w.tags.len(), 3);
356

            
357
        // Put without a tag.
358
        let x: Option<&str> = None;
359
        let n = w.put(x);
360
        assert_eq!(n?, 799);
361
        assert_eq!(w.tags.len(), 2);
362

            
363
        Ok(())
364
    }
365

            
366
    #[test]
367
    fn sendwindow_bad_put() -> Result<()> {
368
        let mut w = new_sendwindow();
369
        for _ in 0_usize..250 {
370
            w.take(&"correct")?;
371
        }
372

            
373
        // wrong tag: won't work.
374
        assert_eq!(w.window, 750);
375
        let n = w.put(Some("incorrect"));
376
        assert!(n.is_err());
377

            
378
        let n = w.put(Some("correct"));
379
        assert_eq!(n?, 850);
380
        let n = w.put(Some("correct"));
381
        assert_eq!(n?, 950);
382

            
383
        // no tag expected: won't work.
384
        let n = w.put(Some("correct"));
385
        assert!(n.is_err());
386
        assert_eq!(w.window, 950);
387

            
388
        let x: Option<&str> = None;
389
        let n = w.put(x);
390
        assert!(n.is_err());
391
        assert_eq!(w.window, 950);
392

            
393
        Ok(())
394
    }
395

            
396
    #[test]
397
    fn sendwindow_erroring() -> Result<()> {
398
        let mut w = new_sendwindow();
399
        for _ in 0_usize..1000 {
400
            w.take(&"here a string")?;
401
        }
402
        assert_eq!(w.window, 0);
403

            
404
        let ready = w.take(&"there a string");
405
        assert!(ready.is_err());
406
        Ok(())
407
    }
408
}