1
//! Declare the lowest level of stream: a stream that operates on raw
2
//! cells.
3

            
4
use crate::circuit::{sendme, StreamTarget};
5
use crate::{Error, Result};
6
use tor_cell::relaycell::msg::RelayMsg;
7

            
8
use crate::circuit::sendme::StreamRecvWindow;
9
use futures::channel::mpsc;
10
use futures::stream::StreamExt;
11

            
12
/// The read part of a stream on a particular circuit.
13
#[derive(Debug)]
14
pub struct StreamReader {
15
    /// The underlying `StreamTarget` for this stream.
16
    pub(crate) target: StreamTarget,
17
    /// Channel to receive stream messages from the reactor.
18
    pub(crate) receiver: mpsc::Receiver<RelayMsg>,
19
    /// Congestion control receive window for this stream.
20
    ///
21
    /// Having this here means we're only going to update it when the end consumer of this stream
22
    /// actually reads things, meaning we don't ask for more data until it's actually needed (as
23
    /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
24
    /// with having to buffer data).
25
    pub(crate) recv_window: StreamRecvWindow,
26
    /// Whether or not this stream has ended.
27
    pub(crate) ended: bool,
28
}
29

            
30
impl StreamReader {
31
    /// Try to read the next relay message from this stream.
32
20
    async fn recv_raw(&mut self) -> Result<RelayMsg> {
33
20
        if self.ended {
34
            // Prevent reading from streams after they've ended.
35
            return Err(Error::NotConnected);
36
20
        }
37
20
        let msg = self
38
20
            .receiver
39
31
            .next()
40
31
            .await
41
            // This probably means that the other side closed the
42
            // mpsc channel.  I'm not sure the error type is correct though?
43
20
            .ok_or_else(|| {
44
                Error::StreamProto("stream channel disappeared without END cell?".into())
45
20
            })?;
46

            
47
20
        if sendme::msg_counts_towards_windows(&msg) && self.recv_window.take()? {
48
            self.target.send_sendme()?;
49
            self.recv_window.put();
50
20
        }
51

            
52
20
        Ok(msg)
53
20
    }
54

            
55
    /// As recv_raw, but if there is an error or an end cell, note that this
56
    /// stream has ended.
57
20
    pub async fn recv(&mut self) -> Result<RelayMsg> {
58
31
        let val = self.recv_raw().await;
59
20
        match val {
60
4
            Err(_) | Ok(RelayMsg::End(_)) => {
61
4
                self.ended = true;
62
4
            }
63
16
            _ => {}
64
        }
65
20
        val
66
20
    }
67

            
68
    /// Shut down this stream.
69
    pub fn protocol_error(&mut self) {
70
        self.target.protocol_error();
71
    }
72
}