Lines
92.45 %
Functions
45.12 %
Branches
100 %
//! Management for flow control windows.
//!
//! Tor maintains a separate windows on circuits and on streams.
//! These are controlled by SENDME cells, which (confusingly) are
//! applied either at the circuit or the stream level depending on
//! whether they have a stream ID set.
//! Circuit sendmes are _authenticated_: they include a cryptographic
//! tag generated by the cryptography layer. This tag proves that the
//! other side of the circuit really has read all of the data that it's
//! acknowledging.
use std::collections::VecDeque;
use tor_cell::relaycell::msg::RelayMsg;
use tor_cell::relaycell::RelayCell;
use tor_error::internal;
use crate::{Error, Result};
/// Tag type used in regular v1 sendme cells.
///
// TODO(nickm):
// Three problems with this tag:
// - First, we need to support unauthenticated flow control, but we
// still record the tags that we _would_ expect.
// - Second, this tag type could be different for each layer, if we
// eventually have an authenticator that isn't 20 bytes long.
#[derive(Clone, Debug)]
pub(crate) struct CircTag([u8; 20]);
impl From<[u8; 20]> for CircTag {
fn from(v: [u8; 20]) -> CircTag {
Self(v)
}
impl PartialEq for CircTag {
fn eq(&self, other: &Self) -> bool {
crate::util::ct::bytes_eq(&self.0, &other.0)
impl Eq for CircTag {}
impl PartialEq<[u8; 20]> for CircTag {
fn eq(&self, other: &[u8; 20]) -> bool {
crate::util::ct::bytes_eq(&self.0, &other[..])
/// Absence of a tag, as with stream cells.
pub(crate) type NoTag = ();
/// A circuit's send window.
pub(crate) type CircSendWindow = SendWindow<CircParams, CircTag>;
/// A stream's send window.
pub(crate) type StreamSendWindow = SendWindow<StreamParams, NoTag>;
/// A circuit's receive window.
pub(crate) type CircRecvWindow = RecvWindow<CircParams>;
/// A stream's receive window.
pub(crate) type StreamRecvWindow = RecvWindow<StreamParams>;
/// Tracks how many cells we can safely send on a circuit or stream.
/// Additionally, remembers a list of tags that could be used to
/// acknowledge the cells we have already sent, so we know it's safe
/// to send more.
pub(crate) struct SendWindow<P, T>
where
P: WindowParams,
T: PartialEq + Eq + Clone,
{
/// Current value for this window
window: u16,
/// Tag values that incoming "SENDME" messages need to match in order
/// for us to send more data.
tags: VecDeque<T>,
/// Marker type to tell the compiler that the P type is used.
_dummy: std::marker::PhantomData<P>,
/// Helper: parametrizes a window to determine its maximum and its increment.
pub(crate) trait WindowParams {
/// Largest allowable value for this window.
fn maximum() -> u16;
/// Increment for this window.
fn increment() -> u16;
/// Parameters used for SENDME windows on circuits: limit at 1000 cells,
/// and each SENDME adjusts by 100.
pub(crate) struct CircParams;
impl WindowParams for CircParams {
fn maximum() -> u16 {
1000
fn increment() -> u16 {
100
/// Parameters used for SENDME windows on streams: limit at 500 cells,
/// and each SENDME adjusts by 50.
pub(crate) struct StreamParams;
impl WindowParams for StreamParams {
500
50
impl<P, T> SendWindow<P, T>
/// Construct a new SendWindow.
pub(crate) fn new(window: u16) -> SendWindow<P, T> {
let increment = P::increment();
let capacity = (window + increment - 1) / increment;
SendWindow {
window,
tags: VecDeque::with_capacity(capacity as usize),
_dummy: std::marker::PhantomData,
/// Remove one item from this window (since we've sent a cell).
/// If the window was empty, returns an error.
/// The provided tag is the one associated with the crypto layer that
/// originated the cell. It will get cloned and recorded if we'll
/// need to check for it later.
/// Return the number of cells left in the window.
pub(crate) fn take<U>(&mut self, tag: &U) -> Result<u16>
U: Clone + Into<T>,
if let Some(val) = self.window.checked_sub(1) {
self.window = val;
if self.window % P::increment() == 0 {
// We record this tag.
// TODO: I'm not saying that this cell in particular
// matches the spec, but Tor seems to like it.
self.tags.push_back(tag.clone().into());
Ok(val)
} else {
Err(Error::CircProto(
"Called SendWindow::take() on empty SendWindow".into(),
))
/// Handle an incoming sendme with a provided tag.
/// If the tag is None, then we don't enforce tag requirements. (We can
/// remove this option once we no longer support getting SENDME cells
/// from relays without the FlowCtrl=1 protocol.)
/// On success, return the number of cells left in the window.
/// On failure, return None: the caller should close the stream
/// or circuit with a protocol error.
#[must_use = "didn't check whether SENDME was expected and tag was right."]
pub(crate) fn put<U>(&mut self, tag: Option<U>) -> Result<u16>
T: PartialEq<U>,
match (self.tags.front(), tag) {
(Some(t), Some(tag)) if t == &tag => {} // this is the right tag.
(Some(_), None) => {} // didn't need a tag.
(Some(_), Some(_)) => {
return Err(Error::CircProto("Mismatched tag on circuit SENDME".into()));
(None, _) => {
return Err(Error::CircProto(
"Received a SENDME when none was expected".into(),
));
self.tags.pop_front();
let v = self
.window
.checked_add(P::increment())
.ok_or_else(|| Error::from(internal!("Overflow on SENDME window")))?;
self.window = v;
Ok(v)
/// Return the current send window value.
pub(crate) fn window(&self) -> u16 {
self.window
/// For testing: get a copy of the current send window, and the
/// expected incoming tags.
#[cfg(test)]
pub(crate) fn window_and_expected_tags(&self) -> (u16, Vec<T>) {
let tags = self.tags.iter().map(Clone::clone).collect();
(self.window, tags)
/// Structure to track when we need to send SENDME cells for incoming data.
pub(crate) struct RecvWindow<P: WindowParams> {
/// Number of cells that we'd be willing to receive on this window
/// before sending a SENDME.
impl<P: WindowParams> RecvWindow<P> {
/// Create a new RecvWindow.
pub(crate) fn new(window: u16) -> RecvWindow<P> {
RecvWindow {
/// Called when we've just received a cell; return true if we need to send
/// a sendme, and false otherwise.
/// Returns None if we should not have sent the cell, and we just
/// violated the window.
pub(crate) fn take(&mut self) -> Result<bool> {
let v = self.window.checked_sub(1);
if let Some(x) = v {
self.window = x;
// TODO: same note as in SendWindow.take(). I don't know if
// this truly matches the spec, but tor accepts it.
Ok(x % P::increment() == 0)
"Received a data cell in violation of a window".into(),
/// Reduce this window by `n`; give an error if this is not possible.
pub(crate) fn decrement_n(&mut self, n: u16) -> crate::Result<()> {
let v = self.window.checked_sub(n);
Ok(())
Err(crate::Error::CircProto(
"Received too many cells on a stream".into(),
/// Called when we've just sent a SENDME.
pub(crate) fn put(&mut self) {
self.window = self
.expect("Overflow detected while attempting to increment window");
/// Return true if this message is counted by flow-control windows.
pub(crate) fn msg_counts_towards_windows(msg: &RelayMsg) -> bool {
matches!(msg, RelayMsg::Data(_))
pub(crate) fn cell_counts_towards_windows(cell: &RelayCell) -> bool {
msg_counts_towards_windows(cell.msg())
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use tor_cell::relaycell::{msg, RelayCell};
#[test]
fn what_counts() {
let m = msg::Begin::new("www.torproject.org", 443, 0)
.unwrap()
.into();
assert!(!msg_counts_towards_windows(&m));
assert!(!cell_counts_towards_windows(&RelayCell::new(77.into(), m)));
let m = msg::Data::new(&b"Education is not a prerequisite to political control-political control is the cause of popular education."[..]).unwrap().into(); // Du Bois
assert!(msg_counts_towards_windows(&m));
assert!(cell_counts_towards_windows(&RelayCell::new(128.into(), m)));
fn recvwindow() {
let mut w: RecvWindow<StreamParams> = RecvWindow::new(500);
for _ in 0..49 {
assert!(!w.take().unwrap());
assert!(w.take().unwrap());
assert_eq!(w.window, 450);
assert!(w.decrement_n(123).is_ok());
assert_eq!(w.window, 327);
w.put();
assert_eq!(w.window, 377);
// failing decrement.
assert!(w.decrement_n(400).is_err());
// failing take.
assert!(w.decrement_n(377).is_ok());
assert!(w.take().is_err());
fn new_sendwindow() -> SendWindow<CircParams, &'static str> {
SendWindow::new(1000)
fn sendwindow_basic() -> Result<()> {
let mut w = new_sendwindow();
let n = w.take(&"Hello")?;
assert_eq!(n, 999);
for _ in 0_usize..98 {
w.take(&"world")?;
assert_eq!(w.window, 901);
assert_eq!(w.tags.len(), 0);
let n = w.take(&"and")?;
assert_eq!(n, 900);
assert_eq!(w.tags.len(), 1);
assert_eq!(w.tags[0], "and");
let n = w.take(&"goodbye")?;
assert_eq!(n, 899);
// Try putting a good tag.
let n = w.put(Some("and"));
assert_eq!(n?, 999);
for _ in 0_usize..300 {
w.take(&"dreamland")?;
assert_eq!(w.tags.len(), 3);
// Put without a tag.
let x: Option<&str> = None;
let n = w.put(x);
assert_eq!(n?, 799);
assert_eq!(w.tags.len(), 2);
fn sendwindow_bad_put() -> Result<()> {
for _ in 0_usize..250 {
w.take(&"correct")?;
// wrong tag: won't work.
assert_eq!(w.window, 750);
let n = w.put(Some("incorrect"));
assert!(n.is_err());
let n = w.put(Some("correct"));
assert_eq!(n?, 850);
assert_eq!(n?, 950);
// no tag expected: won't work.
assert_eq!(w.window, 950);
fn sendwindow_erroring() -> Result<()> {
for _ in 0_usize..1000 {
w.take(&"here a string")?;
assert_eq!(w.window, 0);
let ready = w.take(&"there a string");
assert!(ready.is_err());