1
//! Types and code for mapping StreamIDs to streams on a circuit.
2

            
3
use crate::circuit::halfstream::HalfStream;
4
use crate::circuit::sendme;
5
use crate::{Error, Result};
6
/// Mapping from stream ID to streams.
7
// NOTE: This is a work in progress and I bet I'll refactor it a lot;
8
// it needs to stay opaque!
9
use tor_cell::relaycell::{msg::RelayMsg, StreamId};
10

            
11
use futures::channel::mpsc;
12
use std::collections::hash_map::Entry;
13
use std::collections::HashMap;
14
use tor_error::internal;
15

            
16
use rand::Rng;
17

            
18
use crate::circuit::reactor::RECV_WINDOW_INIT;
19
use crate::circuit::sendme::StreamRecvWindow;
20
use tracing::info;
21

            
22
/// The entry for a stream.
23
pub(super) enum StreamEnt {
24
    /// An open stream.
25
    Open {
26
        /// Sink to send relay cells tagged for this stream into.
27
        sink: mpsc::Sender<RelayMsg>,
28
        /// Stream for cells that should be sent down this stream.
29
        rx: mpsc::Receiver<RelayMsg>,
30
        /// Send window, for congestion control purposes.
31
        send_window: sendme::StreamSendWindow,
32
        /// Number of cells dropped due to the stream disappearing before we can
33
        /// transform this into an `EndSent`.
34
        dropped: u16,
35
        /// True iff we've received a CONNECTED cell on this stream.
36
        /// (This is redundant with `DataStreamReader::connected`.)
37
        received_connected: bool,
38
    },
39
    /// A stream for which we have received an END cell, but not yet
40
    /// had the stream object get dropped.
41
    EndReceived,
42
    /// A stream for which we have sent an END cell but not yet received an END
43
    /// cell.
44
    ///
45
    /// TODO(arti#264) Can we ever throw this out? Do we really get END cells for
46
    /// these?
47
    EndSent(HalfStream),
48
}
49

            
50
impl StreamEnt {
51
    /// Retrieve the send window for this stream, if it is open.
52
2412
    pub(super) fn send_window(&mut self) -> Option<&mut sendme::StreamSendWindow> {
53
2412
        match self {
54
            StreamEnt::Open {
55
2412
                ref mut send_window,
56
2412
                ..
57
2412
            } => Some(send_window),
58
            _ => None,
59
        }
60
2412
    }
61
}
62

            
63
/// Return value to indicate whether or not we send an END cell upon
64
/// terminating a given stream.
65
2
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
66
pub(super) enum ShouldSendEnd {
67
    /// An END cell should be sent.
68
    Send,
69
    /// An END cell should not be sent.
70
    DontSend,
71
}
72

            
73
/// A map from stream IDs to stream entries. Each circuit has one for each
74
/// hop.
75
pub(super) struct StreamMap {
76
    /// Map from StreamId to StreamEnt.  If there is no entry for a
77
    /// StreamId, that stream doesn't exist.
78
    m: HashMap<StreamId, StreamEnt>,
79
    /// The next StreamId that we should use for a newly allocated
80
    /// circuit.  (0 is not a valid streamID).
81
    next_stream_id: u16,
82
}
83

            
84
impl StreamMap {
85
    /// Make a new empty StreamMap.
86
120
    pub(super) fn new() -> Self {
87
120
        let mut rng = rand::thread_rng();
88
120
        let next_stream_id: u16 = loop {
89
120
            let v: u16 = rng.gen();
90
120
            if v != 0 {
91
120
                break v;
92
            }
93
        };
94
120
        StreamMap {
95
120
            m: HashMap::new(),
96
120
            next_stream_id,
97
120
        }
98
120
    }
99

            
100
    /// Get the `HashMap` inside this stream map.
101
8170
    pub(super) fn inner(&mut self) -> &mut HashMap<StreamId, StreamEnt> {
102
8170
        &mut self.m
103
8170
    }
104

            
105
    /// Add an entry to this map; return the newly allocated StreamId.
106
140
    pub(super) fn add_ent(
107
140
        &mut self,
108
140
        sink: mpsc::Sender<RelayMsg>,
109
140
        rx: mpsc::Receiver<RelayMsg>,
110
140
        send_window: sendme::StreamSendWindow,
111
140
    ) -> Result<StreamId> {
112
140
        let stream_ent = StreamEnt::Open {
113
140
            sink,
114
140
            rx,
115
140
            send_window,
116
140
            dropped: 0,
117
140
            received_connected: false,
118
140
        };
119
        // This "65536" seems too aggressive, but it's what tor does.
120
        //
121
        // Also, going around in a loop here is (sadly) needed in order
122
        // to look like Tor clients.
123
140
        for _ in 1..=65536 {
124
140
            let id: StreamId = self.next_stream_id.into();
125
140
            self.next_stream_id = self.next_stream_id.wrapping_add(1);
126
140
            if id.is_zero() {
127
                continue;
128
140
            }
129
140
            let ent = self.m.entry(id);
130
140
            if let Entry::Vacant(_) = ent {
131
140
                ent.or_insert(stream_ent);
132
140
                return Ok(id);
133
            }
134
        }
135

            
136
        Err(Error::IdRangeFull)
137
140
    }
138

            
139
    /// Return the entry for `id` in this map, if any.
140
2442
    pub(super) fn get_mut(&mut self, id: StreamId) -> Option<&mut StreamEnt> {
141
2442
        self.m.get_mut(&id)
142
2442
    }
143

            
144
    /// Note that we received an END cell on the stream with `id`.
145
    ///
146
    /// Returns true if there was really a stream there.
147
8
    pub(super) fn end_received(&mut self, id: StreamId) -> Result<()> {
148
        // Check the hashmap for the right stream. Bail if not found.
149
        // Also keep the hashmap handle so that we can do more efficient inserts/removals
150
8
        let mut stream_entry = match self.m.entry(id) {
151
            Entry::Vacant(_) => {
152
1
                return Err(Error::CircProto(
153
1
                    "Received END cell on nonexistent stream".into(),
154
1
                ))
155
            }
156
7
            Entry::Occupied(o) => o,
157
7
        };
158
7

            
159
7
        // Progress the stream's state machine accordingly
160
7
        match stream_entry.get() {
161
1
            StreamEnt::EndReceived => Err(Error::CircProto(
162
1
                "Received two END cells on same stream".into(),
163
1
            )),
164
            StreamEnt::EndSent(_) => {
165
1
                info!("Actually got an end cell on a half-closed stream!");
166
                // We got an END, and we already sent an END. Great!
167
                // we can forget about this stream.
168
1
                stream_entry.remove_entry();
169
1
                Ok(())
170
            }
171
            StreamEnt::Open { .. } => {
172
5
                stream_entry.insert(StreamEnt::EndReceived);
173
5
                Ok(())
174
            }
175
        }
176
8
    }
177

            
178
    /// Handle a termination of the stream with `id` from this side of
179
    /// the circuit. Return true if the stream was open and an END
180
    /// ought to be sent.
181
3
    pub(super) fn terminate(&mut self, id: StreamId) -> Result<ShouldSendEnd> {
182
3
        // Progress the stream's state machine accordingly
183
3
        match self
184
3
            .m
185
3
            .remove(&id)
186
3
            .ok_or_else(|| Error::from(internal!("Somehow we terminated a nonexistent stream?")))?
187
        {
188
1
            StreamEnt::EndReceived => Ok(ShouldSendEnd::DontSend),
189
            StreamEnt::Open {
190
1
                send_window,
191
1
                dropped,
192
1
                received_connected,
193
1
                // notably absent: the channels for sink and stream, which will get dropped and
194
1
                // closed (meaning reads/writes from/to this stream will now fail)
195
1
                ..
196
1
            } => {
197
1
                // FIXME(eta): we don't copy the receive window, instead just creating a new one,
198
1
                //             so a malicious peer can send us slightly more data than they should
199
1
                //             be able to; see arti#230.
200
1
                let mut recv_window = StreamRecvWindow::new(RECV_WINDOW_INIT);
201
1
                recv_window.decrement_n(dropped)?;
202
                // TODO: would be nice to avoid new_ref.
203
                // If we haven't gotten a CONNECTED already, we accept one on the half-stream.
204
1
                let connected_ok = !received_connected;
205
1
                let halfstream = HalfStream::new(send_window, recv_window, connected_ok);
206
1
                self.m.insert(id, StreamEnt::EndSent(halfstream));
207
1
                Ok(ShouldSendEnd::Send)
208
            }
209
            StreamEnt::EndSent(_) => {
210
                panic!("Hang on! We're sending an END on a stream where we already sent an END‽");
211
            }
212
        }
213
3
    }
214

            
215
    // TODO: Eventually if we want relay support, we'll need to support
216
    // stream IDs chosen by somebody else. But for now, we don't need those.
217
}
218

            
219
#[cfg(test)]
220
mod test {
221
    #![allow(clippy::unwrap_used)]
222
    use super::*;
223
    use crate::circuit::sendme::StreamSendWindow;
224

            
225
    #[test]
226
    fn streammap_basics() -> Result<()> {
227
        let mut map = StreamMap::new();
228
        let mut next_id = map.next_stream_id;
229
        let mut ids = Vec::new();
230

            
231
        // Try add_ent
232
        for _ in 0..128 {
233
            let (sink, _) = mpsc::channel(128);
234
            let (_, rx) = mpsc::channel(2);
235
            let id = map.add_ent(sink, rx, StreamSendWindow::new(500))?;
236
            let expect_id: StreamId = next_id.into();
237
            assert_eq!(expect_id, id);
238
            next_id = next_id.wrapping_add(1);
239
            if next_id == 0 {
240
                next_id = 1;
241
            }
242
            ids.push(id);
243
        }
244

            
245
        // Test get_mut.
246
        let nonesuch_id = next_id.into();
247
        assert!(matches!(map.get_mut(ids[0]), Some(StreamEnt::Open { .. })));
248
        assert!(map.get_mut(nonesuch_id).is_none());
249

            
250
        // Test end_received
251
        assert!(map.end_received(nonesuch_id).is_err());
252
        assert!(map.end_received(ids[1]).is_ok());
253
        assert!(matches!(map.get_mut(ids[1]), Some(StreamEnt::EndReceived)));
254
        assert!(map.end_received(ids[1]).is_err());
255

            
256
        // Test terminate
257
        assert!(map.terminate(nonesuch_id).is_err());
258
        assert_eq!(map.terminate(ids[2]).unwrap(), ShouldSendEnd::Send);
259
        assert!(matches!(map.get_mut(ids[2]), Some(StreamEnt::EndSent(_))));
260
        assert_eq!(map.terminate(ids[1]).unwrap(), ShouldSendEnd::DontSend);
261
        assert!(matches!(map.get_mut(ids[1]), None));
262

            
263
        // Try receiving an end after a terminate.
264
        assert!(map.end_received(ids[2]).is_ok());
265
        assert!(matches!(map.get_mut(ids[2]), None));
266

            
267
        Ok(())
268
    }
269
}