1
//! Code to handle incoming cells on a channel.
2
//!
3
//! The role of this code is to run in a separate asynchronous task,
4
//! and routes cells to the right circuits.
5
//!
6
//! TODO: I have zero confidence in the close-and-cleanup behavior here,
7
//! or in the error handling behavior.
8

            
9
use super::circmap::{CircEnt, CircMap};
10
use crate::circuit::halfcirc::HalfCirc;
11
use crate::util::err::ReactorError;
12
use crate::{Error, Result};
13
use tor_cell::chancell::msg::{Destroy, DestroyReason};
14
use tor_cell::chancell::{msg::ChanMsg, ChanCell, CircId};
15

            
16
use futures::channel::{mpsc, oneshot};
17

            
18
use futures::sink::SinkExt;
19
use futures::stream::Stream;
20
use futures::Sink;
21
use tor_error::internal;
22

            
23
use std::convert::TryInto;
24
use std::fmt;
25
use std::pin::Pin;
26
use std::sync::atomic::Ordering;
27
use std::sync::Arc;
28
use std::task::Poll;
29

            
30
use crate::channel::{codec::CodecError, unique_id, ChannelDetails};
31
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
32
use tracing::{debug, trace};
33

            
34
/// A boxed trait object that can provide `ChanCell`s.
35
pub(super) type BoxedChannelStream =
36
    Box<dyn Stream<Item = std::result::Result<ChanCell, CodecError>> + Send + Unpin + 'static>;
37
/// A boxed trait object that can sink `ChanCell`s.
38
pub(super) type BoxedChannelSink =
39
    Box<dyn Sink<ChanCell, Error = CodecError> + Send + Unpin + 'static>;
40
/// The type of a oneshot channel used to inform reactor users of the result of an operation.
41
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
42

            
43
/// Convert `err` to an Error, under the assumption that it's happening on an
44
/// open channel.
45
19
fn codec_err_to_chan(err: CodecError) -> Error {
46
19
    match err {
47
        CodecError::Io(e) => crate::Error::ChanIoErr(Arc::new(e)),
48
19
        CodecError::Cell(e) => e.into(),
49
    }
50
19
}
51

            
52
/// A message telling the channel reactor to do something.
53
#[derive(Debug)]
54
pub(super) enum CtrlMsg {
55
    /// Shut down the reactor.
56
    Shutdown,
57
    /// Tell the reactor that a given circuit has gone away.
58
    CloseCircuit(CircId),
59
    /// Allocate a new circuit in this channel's circuit map, generating an ID for it
60
    /// and registering senders for messages received for the circuit.
61
    AllocateCircuit {
62
        /// Channel to send the circuit's `CreateResponse` down.
63
        created_sender: oneshot::Sender<CreateResponse>,
64
        /// Channel to send other messages from this circuit down.
65
        sender: mpsc::Sender<ClientCircChanMsg>,
66
        /// Oneshot channel to send the new circuit's identifiers down.
67
        tx: ReactorResultChannel<(CircId, crate::circuit::UniqId)>,
68
    },
69
}
70

            
71
/// Object to handle incoming cells and background tasks on a channel.
72
///
73
/// This type is returned when you finish a channel; you need to spawn a
74
/// new task that calls `run()` on it.
75
#[must_use = "If you don't call run() on a reactor, the channel won't work."]
76
pub struct Reactor {
77
    /// A receiver for control messages from `Channel` objects.
78
    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
79
    /// A receiver for cells to be sent on this reactor's sink.
80
    ///
81
    /// `Channel` objects have a sender that can send cells here.
82
    pub(super) cells: mpsc::Receiver<ChanCell>,
83
    /// A Stream from which we can read `ChanCell`s.
84
    ///
85
    /// This should be backed by a TLS connection if you want it to be secure.
86
    pub(super) input: futures::stream::Fuse<BoxedChannelStream>,
87
    /// A Sink to which we can write `ChanCell`s.
88
    ///
89
    /// This should also be backed by a TLS connection if you want it to be secure.
90
    pub(super) output: BoxedChannelSink,
91
    /// A map from circuit ID to Sinks on which we can deliver cells.
92
    pub(super) circs: CircMap,
93
    /// Information shared with the frontend
94
    pub(super) details: Arc<ChannelDetails>,
95
    /// Context for allocating unique circuit log identifiers.
96
    pub(super) circ_unique_id_ctx: unique_id::CircUniqIdContext,
97
    /// What link protocol is the channel using?
98
    #[allow(dead_code)] // We don't support protocols where this would matter
99
    pub(super) link_protocol: u16,
100
}
101

            
102
/// Allows us to just say debug!("{}: Reactor did a thing", &self, ...)
103
///
104
/// There is no risk of confusion because no-one would try to print a
105
/// Reactor for some other reason.
106
impl fmt::Display for Reactor {
107
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
108
        fmt::Debug::fmt(&self.details.unique_id, f)
109
    }
110
}
111

            
112
impl Reactor {
113
    /// Launch the reactor, and run until the channel closes or we
114
    /// encounter an error.
115
    ///
116
    /// Once this function returns, the channel is dead, and can't be
117
    /// used again.
118
60
    pub async fn run(mut self) -> Result<()> {
119
49
        if self.details.closed.load(Ordering::SeqCst) {
120
            return Err(Error::ChannelClosed);
121
49
        }
122
        debug!("{}: Running reactor", &self);
123
36
        let result: Result<()> = loop {
124
2504
            match self.run_once().await {
125
2455
                Ok(()) => (),
126
17
                Err(ReactorError::Shutdown) => break Ok(()),
127
19
                Err(ReactorError::Err(e)) => break Err(e),
128
            }
129
        };
130
        debug!("{}: Reactor stopped: {:?}", &self, result);
131
36
        self.details.closed.store(true, Ordering::SeqCst);
132
36
        result
133
36
    }
134

            
135
    /// Helper for run(): handles only one action, and doesn't mark
136
    /// the channel closed on finish.
137
2680
    async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
138
2668
        // This is written this way (manually calling poll) for a bunch of reasons:
139
2668
        //
140
2668
        // - We can only send things onto self.output if poll_ready has returned Ready, so
141
2668
        //   we need some custom logic to implement that.
142
2668
        // - We probably want to call poll_flush on every reactor iteration, to ensure it continues
143
2668
        //   to make progress flushing.
144
2668
        // - We also need to do the equivalent of select! between self.cells, self.control, and
145
2668
        //   self.input, but with the extra logic bits added above.
146
2668
        //
147
2668
        // In Rust 2021, it would theoretically be possible to do this with a hybrid mix of select!
148
2668
        // and manually implemented poll_fn, but we aren't using that yet. (also, arguably doing
149
2668
        // it this way is both less confusing and more flexible).
150
3712
        let fut = futures::future::poll_fn(|cx| -> Poll<std::result::Result<_, ReactorError>> {
151
3712
            // We've potentially got three types of thing to deal with in this reactor iteration:
152
3712
            let mut cell_to_send = None;
153
3712
            let mut control_message = None;
154
3712
            let mut input = None;
155

            
156
            // See if the output sink can have cells written to it yet.
157
3712
            if let Poll::Ready(ret) = Pin::new(&mut self.output).poll_ready(cx) {
158
3712
                let _ = ret.map_err(codec_err_to_chan)?;
159
                // If it can, check whether we have any cells to send it from `Channel` senders.
160
3693
                if let Poll::Ready(msg) = Pin::new(&mut self.cells).poll_next(cx) {
161
2469
                    match msg {
162
2456
                        x @ Some(..) => cell_to_send = x,
163
                        None => {
164
                            // cells sender dropped, shut down the reactor!
165
13
                            return Poll::Ready(Err(ReactorError::Shutdown));
166
                        }
167
                    }
168
1224
                }
169
            }
170

            
171
            // Check whether we've got a control message pending.
172
3680
            if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) {
173
16
                match ret {
174
                    None | Some(CtrlMsg::Shutdown) => {
175
8
                        return Poll::Ready(Err(ReactorError::Shutdown))
176
                    }
177
8
                    x @ Some(..) => control_message = x,
178
                }
179
3664
            }
180

            
181
            // Check whether we've got any incoming cells.
182
3672
            if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) {
183
152
                match ret {
184
                    None => return Poll::Ready(Err(ReactorError::Shutdown)),
185
152
                    Some(r) => input = Some(r.map_err(codec_err_to_chan)?),
186
                }
187
3520
            }
188

            
189
            // Flush the output sink. We don't actually care about whether it's ready or not;
190
            // we just want to keep flushing it (hence the _).
191
3672
            let _ = Pin::new(&mut self.output)
192
3672
                .poll_flush(cx)
193
3672
                .map_err(codec_err_to_chan)?;
194

            
195
            // If all three values aren't present, return Pending and wait to get polled again
196
            // so that one of them is present.
197
3672
            if cell_to_send.is_none() && control_message.is_none() && input.is_none() {
198
1055
                return Poll::Pending;
199
2614
            }
200
2614
            // Otherwise, return the three Options, one of which is going to be Some.
201
2614
            Poll::Ready(Ok((cell_to_send, control_message, input)))
202
3709
        });
203
2668
        let (cell_to_send, control_message, input) = fut.await?;
204
2615
        if let Some(ctrl) = control_message {
205
8
            self.handle_control(ctrl).await?;
206
2607
        }
207
2615
        if let Some(item) = input {
208
151
            crate::note_incoming_traffic();
209
151
            self.handle_cell(item).await?;
210
2464
        }
211
2580
        if let Some(cts) = cell_to_send {
212
2456
            Pin::new(&mut self.output)
213
2456
                .start_send(cts)
214
2456
                .map_err(codec_err_to_chan)?;
215
            // Give the sink a little flush, to make sure it actually starts doing things.
216
2458
            futures::future::poll_fn(|cx| Pin::new(&mut self.output).poll_flush(cx))
217
2
                .await
218
2456
                .map_err(codec_err_to_chan)?;
219
124
        }
220
2580
        Ok(()) // Run again.
221
2656
    }
222

            
223
    /// Handle a CtrlMsg other than Shutdown.
224
8
    async fn handle_control(&mut self, msg: CtrlMsg) -> Result<()> {
225
        trace!("{}: reactor received {:?}", &self, msg);
226
8
        match msg {
227
            CtrlMsg::Shutdown => panic!(), // was handled in reactor loop.
228
4
            CtrlMsg::CloseCircuit(id) => self.outbound_destroy_circ(id).await?,
229
            CtrlMsg::AllocateCircuit {
230
4
                created_sender,
231
4
                sender,
232
4
                tx,
233
4
            } => {
234
4
                let mut rng = rand::thread_rng();
235
4
                let my_unique_id = self.details.unique_id;
236
4
                let circ_unique_id = self.circ_unique_id_ctx.next(my_unique_id);
237
4
                let ret: Result<_> = self
238
4
                    .circs
239
4
                    .add_ent(&mut rng, created_sender, sender)
240
4
                    .map(|id| (id, circ_unique_id));
241
4
                let _ = tx.send(ret); // don't care about other side going away
242
4
                self.update_disused_since();
243
4
            }
244
        }
245
8
        Ok(())
246
8
    }
247

            
248
    /// Helper: process a cell on a channel.  Most cell types get ignored
249
    /// or rejected; a few get delivered to circuits.
250
152
    async fn handle_cell(&mut self, cell: ChanCell) -> Result<()> {
251
151
        let (circid, msg) = cell.into_circid_and_msg();
252
151
        use ChanMsg::*;
253
151

            
254
151
        match msg {
255
119
            Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log.
256
            _ => trace!("{}: received {} for {}", &self, msg.cmd(), circid),
257
        }
258

            
259
151
        match msg {
260
            // These aren't allowed on clients.
261
4
            Create(_) | CreateFast(_) | Create2(_) | RelayEarly(_) | PaddingNegotiate(_) => Err(
262
4
                Error::ChanProto(format!("{} cell on client channel", msg.cmd())),
263
4
            ),
264

            
265
            // In theory this is allowed in clients, but we should never get
266
            // one, since we don't use TAP.
267
4
            Created(_) => Err(Error::ChanProto(format!(
268
4
                "{} cell received, but we never send CREATEs",
269
4
                msg.cmd()
270
4
            ))),
271

            
272
            // These aren't allowed after handshaking is done.
273
            Versions(_) | Certs(_) | Authorize(_) | Authenticate(_) | AuthChallenge(_)
274
4
            | Netinfo(_) => Err(Error::ChanProto(format!(
275
4
                "{} cell after handshake is done",
276
4
                msg.cmd()
277
4
            ))),
278

            
279
            // These are allowed, and need to be handled.
280
120
            Relay(_) => self.deliver_relay(circid, msg).await,
281

            
282
16
            Destroy(_) => self.deliver_destroy(circid, msg).await,
283

            
284
4
            CreatedFast(_) | Created2(_) => self.deliver_created(circid, msg).await,
285

            
286
            // These are always ignored.
287
            Padding(_) | VPadding(_) => Ok(()),
288

            
289
            // Unrecognized cell types should be safe to allow _on channels_,
290
            // since they can't propagate.
291
            Unrecognized(_) => Ok(()),
292

            
293
            // tor_cells knows about this type, but we don't.
294
            _ => Ok(()),
295
        }
296
150
    }
297

            
298
    /// Give the RELAY cell `msg` to the appropriate circuit.
299
120
    async fn deliver_relay(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
300
120
        let mut ent = self
301
120
            .circs
302
120
            .get_mut(circid)
303
120
            .ok_or_else(|| Error::ChanProto("Relay cell on nonexistent circuit".into()))?;
304

            
305
112
        match &mut *ent {
306
4
            CircEnt::Open(s) => {
307
4
                // There's an open circuit; we can give it the RELAY cell.
308
4
                if s.send(msg.try_into()?).await.is_err() {
309
                    drop(ent);
310
                    // The circuit's receiver went away, so we should destroy the circuit.
311
                    self.outbound_destroy_circ(circid).await?;
312
4
                }
313
4
                Ok(())
314
            }
315
4
            CircEnt::Opening(_, _) => Err(Error::ChanProto(
316
4
                "Relay cell on pending circuit before CREATED* received".into(),
317
4
            )),
318
104
            CircEnt::DestroySent(hs) => hs.receive_cell(),
319
        }
320
120
    }
321

            
322
    /// Handle a CREATED{,_FAST,2} cell by passing it on to the appropriate
323
    /// circuit, if that circuit is waiting for one.
324
4
    async fn deliver_created(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
325
4
        let target = self.circs.advance_from_opening(circid)?;
326
        let created = msg.try_into()?;
327
        // TODO(nickm) I think that this one actually means the other side
328
        // is closed. See arti#269.
329
        target.send(created).map_err(|_| {
330
            Error::from(internal!(
331
                "Circuit queue rejected created message. Is it closing?"
332
            ))
333
        })
334
4
    }
335

            
336
    /// Handle a DESTROY cell by removing the corresponding circuit
337
    /// from the map, and passing the destroy cell onward to the circuit.
338
16
    async fn deliver_destroy(&mut self, circid: CircId, msg: ChanMsg) -> Result<()> {
339
16
        // Remove the circuit from the map: nothing more can be done with it.
340
16
        let entry = self.circs.remove(circid);
341
16
        self.update_disused_since();
342
12
        match entry {
343
            // If the circuit is waiting for CREATED, tell it that it
344
            // won't get one.
345
4
            Some(CircEnt::Opening(oneshot, _)) => {
346
                trace!("{}: Passing destroy to pending circuit {}", &self, circid);
347
4
                oneshot
348
4
                    .send(msg.try_into()?)
349
                    // TODO(nickm) I think that this one actually means the other side
350
                    // is closed. See arti#269.
351
4
                    .map_err(|_| {
352
                        internal!("pending circuit wasn't interested in destroy cell?").into()
353
4
                    })
354
            }
355
            // It's an open circuit: tell it that it got a DESTROY cell.
356
4
            Some(CircEnt::Open(mut sink)) => {
357
                trace!("{}: Passing destroy to open circuit {}", &self, circid);
358
4
                sink.send(msg.try_into()?)
359
                    .await
360
                    // TODO(nickm) I think that this one actually means the other side
361
                    // is closed. See arti#269.
362
4
                    .map_err(|_| {
363
                        internal!("open circuit wasn't interested in destroy cell?").into()
364
4
                    })
365
            }
366
            // We've sent a destroy; we can leave this circuit removed.
367
4
            Some(CircEnt::DestroySent(_)) => Ok(()),
368
            // Got a DESTROY cell for a circuit we don't have.
369
            None => {
370
                trace!("{}: Destroy for nonexistent circuit {}", &self, circid);
371
4
                Err(Error::ChanProto("Destroy for nonexistent circuit".into()))
372
            }
373
        }
374
16
    }
375

            
376
    /// Helper: send a cell on the outbound sink.
377
4
    async fn send_cell(&mut self, cell: ChanCell) -> Result<()> {
378
4
        self.output.send(cell).await.map_err(codec_err_to_chan)?;
379
4
        Ok(())
380
4
    }
381

            
382
    /// Called when a circuit goes away: sends a DESTROY cell and removes
383
    /// the circuit.
384
4
    async fn outbound_destroy_circ(&mut self, id: CircId) -> Result<()> {
385
        trace!("{}: Circuit {} is gone; sending DESTROY", &self, id);
386
        // Remove the circuit's entry from the map: nothing more
387
        // can be done with it.
388
        // TODO: It would be great to have a tighter upper bound for
389
        // the number of relay cells we'll receive.
390
4
        self.circs.destroy_sent(id, HalfCirc::new(3000));
391
4
        self.update_disused_since();
392
4
        let destroy = Destroy::new(DestroyReason::NONE).into();
393
4
        let cell = ChanCell::new(id, destroy);
394
4
        self.send_cell(cell).await?;
395

            
396
4
        Ok(())
397
4
    }
398

            
399
    /// Update disused timestamp with current time if this channel is no longer used
400
24
    fn update_disused_since(&self) {
401
24
        if self.circs.open_ent_count() == 0 {
402
20
            // Update disused_since if it still indicates that the channel is in use
403
20
            self.details.unused_since.update_if_none();
404
20
        } else {
405
4
            // Mark this channel as in use
406
4
            self.details.unused_since.clear();
407
4
        }
408
24
    }
409
}
410

            
411
#[cfg(test)]
412
pub(crate) mod test {
413
    #![allow(clippy::unwrap_used)]
414
    use super::*;
415
    use crate::channel::UniqId;
416
    use crate::circuit::CircParameters;
417
    use futures::sink::SinkExt;
418
    use futures::stream::StreamExt;
419
    use futures::task::SpawnExt;
420

            
421
    type CodecResult = std::result::Result<ChanCell, CodecError>;
422

            
423
68
    pub(crate) fn new_reactor() -> (
424
68
        crate::channel::Channel,
425
68
        Reactor,
426
68
        mpsc::Receiver<ChanCell>,
427
68
        mpsc::Sender<CodecResult>,
428
68
    ) {
429
68
        let link_protocol = 4;
430
68
        let (send1, recv1) = mpsc::channel(32);
431
68
        let (send2, recv2) = mpsc::channel(32);
432
68
        let unique_id = UniqId::new();
433
68
        let ed_id = [6; 32].into();
434
68
        let rsa_id = [10; 20].into();
435
68
        let send1 = send1.sink_map_err(|e| {
436
19
            trace!("got sink error: {}", e);
437
19
            CodecError::Cell(tor_cell::Error::ChanProto("dummy message".into()))
438
68
        });
439
68
        let (chan, reactor) = crate::channel::Channel::new(
440
68
            link_protocol,
441
68
            Box::new(send1),
442
68
            Box::new(recv2),
443
68
            unique_id,
444
68
            ed_id,
445
68
            rsa_id,
446
68
        );
447
68
        (chan, reactor, recv1, send2)
448
68
    }
449

            
450
    // Try shutdown from inside run_once..
451
1
    #[test]
452
1
    fn shutdown() {
453
4
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
454
4
            let (chan, mut reactor, _output, _input) = new_reactor();
455
4

            
456
4
            chan.terminate();
457
4
            let r = reactor.run_once().await;
458
4
            assert!(matches!(r, Err(ReactorError::Shutdown)));
459
4
        });
460
1
    }
461

            
462
    // Try shutdown while reactor is running.
463
1
    #[test]
464
1
    fn shutdown2() {
465
4
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
466
4
            // TODO: Ask a rust person if this is how to do this.
467
4

            
468
4
            use futures::future::FutureExt;
469
4
            use futures::join;
470
4

            
471
4
            let (chan, reactor, _output, _input) = new_reactor();
472
4
            // Let's get the reactor running...
473
4
            let run_reactor = reactor.run().map(|x| x.is_ok()).shared();
474
4

            
475
4
            let rr = run_reactor.clone();
476
4

            
477
4
            let exit_then_check = async {
478
4
                assert!(rr.peek().is_none());
479
                // ... and terminate the channel while that's happening.
480
4
                chan.terminate();
481
4
            };
482

            
483
4
            let (rr_s, _) = join!(run_reactor, exit_then_check);
484

            
485
            // Now let's see. The reactor should not _still_ be running.
486
4
            assert!(rr_s);
487
4
        });
488
1
    }
489

            
490
1
    #[test]
491
1
    fn new_circ_closed() {
492
4
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
493
4
            let (chan, mut reactor, mut output, _input) = new_reactor();
494
4
            assert!(chan.duration_unused().is_some()); // unused yet
495

            
496
4
            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
497
4
            let (pending, circr) = ret.unwrap();
498
4
            rt.spawn(async {
499
4
                let _ignore = circr.run().await;
500
4
            })
501
4
            .unwrap();
502
4
            assert!(reac.is_ok());
503

            
504
4
            let id = pending.peek_circid();
505
4

            
506
4
            let ent = reactor.circs.get_mut(id);
507
4
            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
508
4
            assert!(chan.duration_unused().is_none()); // in use
509

            
510
            // Now drop the circuit; this should tell the reactor to remove
511
            // the circuit from the map.
512
4
            drop(pending);
513
4

            
514
4
            reactor.run_once().await.unwrap();
515
4
            let ent = reactor.circs.get_mut(id);
516
4
            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
517
4
            let cell = output.next().await.unwrap();
518
4
            assert_eq!(cell.circid(), id);
519
4
            assert!(matches!(cell.msg(), ChanMsg::Destroy(_)));
520
4
            assert!(chan.duration_unused().is_some()); // unused again
521
4
        });
522
1
    }
523

            
524
    // Test proper delivery of a created cell that doesn't make a channel
525
    #[test]
526
    #[ignore] // See bug #244: re-enable this test once it passes reliably.
527
    fn new_circ_create_failure() {
528
        use std::time::Duration;
529
        use tor_rtcompat::SleepProvider;
530

            
531
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
532
            use tor_cell::chancell::msg;
533
            let (chan, mut reactor, mut output, mut input) = new_reactor();
534

            
535
            let (ret, reac) = futures::join!(chan.new_circ(), reactor.run_once());
536
            let (pending, circr) = ret.unwrap();
537
            rt.spawn(async {
538
                let _ignore = circr.run().await;
539
            })
540
            .unwrap();
541
            assert!(reac.is_ok());
542

            
543
            let circparams = CircParameters::default();
544

            
545
            let id = pending.peek_circid();
546

            
547
            let ent = reactor.circs.get_mut(id);
548
            assert!(matches!(*ent.unwrap(), CircEnt::Opening(_, _)));
549

            
550
            #[allow(clippy::clone_on_copy)]
551
            let rtc = rt.clone();
552
            let send_response = async {
553
                rtc.sleep(Duration::from_millis(100)).await;
554
                trace!("sending createdfast");
555
                // We'll get a bad handshake result from this createdfast cell.
556
                let created_cell = ChanCell::new(id, msg::CreatedFast::new(*b"x").into());
557
                input.send(Ok(created_cell)).await.unwrap();
558
                reactor.run_once().await.unwrap();
559
            };
560

            
561
            let (circ, _) =
562
                futures::join!(pending.create_firsthop_fast(&circparams), send_response);
563
            // Make sure statuses are as expected.
564
            assert!(matches!(circ.err().unwrap(), Error::BadCircHandshake));
565

            
566
            reactor.run_once().await.unwrap();
567

            
568
            // Make sure that the createfast cell got sent
569
            let cell_sent = output.next().await.unwrap();
570
            assert!(matches!(cell_sent.msg(), msg::ChanMsg::CreateFast(_)));
571

            
572
            // But the next run if the reactor will make the circuit get closed.
573
            let ent = reactor.circs.get_mut(id);
574
            assert!(matches!(*ent.unwrap(), CircEnt::DestroySent(_)));
575
        });
576
    }
577

            
578
    // Try incoming cells that shouldn't arrive on channels.
579
1
    #[test]
580
1
    fn bad_cells() {
581
4
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
582
4
            use tor_cell::chancell::msg;
583
4
            let (_chan, mut reactor, _output, mut input) = new_reactor();
584
4

            
585
4
            // We shouldn't get create cells, ever.
586
4
            let create_cell = msg::Create2::new(4, *b"hihi").into();
587
4
            input
588
4
                .send(Ok(ChanCell::new(9.into(), create_cell)))
589
                .await
590
4
                .unwrap();
591
4

            
592
4
            // shouldn't get created2 cells for nonexistent circuits
593
4
            let created2_cell = msg::Created2::new(*b"hihi").into();
594
4
            input
595
4
                .send(Ok(ChanCell::new(7.into(), created2_cell)))
596
                .await
597
4
                .unwrap();
598

            
599
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
600
4
            assert_eq!(
601
4
                format!("{}", e),
602
4
                "channel protocol violation: CREATE2 cell on client channel"
603
4
            );
604

            
605
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
606
4
            assert_eq!(
607
4
                format!("{}", e),
608
4
                "channel protocol violation: Unexpected CREATED* cell not on opening circuit"
609
4
            );
610

            
611
            // Can't get a relay cell on a circuit we've never heard of.
612
4
            let relay_cell = msg::Relay::new(b"abc").into();
613
4
            input
614
4
                .send(Ok(ChanCell::new(4.into(), relay_cell)))
615
                .await
616
4
                .unwrap();
617
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
618
4
            assert_eq!(
619
4
                format!("{}", e),
620
4
                "channel protocol violation: Relay cell on nonexistent circuit"
621
4
            );
622

            
623
            // Can't get handshaking cells while channel is open.
624
4
            let versions_cell = msg::Versions::new([3]).unwrap().into();
625
4
            input
626
4
                .send(Ok(ChanCell::new(0.into(), versions_cell)))
627
                .await
628
4
                .unwrap();
629
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
630
4
            assert_eq!(
631
4
                format!("{}", e),
632
4
                "channel protocol violation: VERSIONS cell after handshake is done"
633
4
            );
634

            
635
            // We don't accept CREATED.
636
4
            let created_cell = msg::Created::new(&b"xyzzy"[..]).into();
637
4
            input
638
4
                .send(Ok(ChanCell::new(25.into(), created_cell)))
639
                .await
640
4
                .unwrap();
641
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
642
4
            assert_eq!(
643
4
                format!("{}", e),
644
4
                "channel protocol violation: CREATED cell received, but we never send CREATEs"
645
4
            );
646
4
        });
647
1
    }
648

            
649
1
    #[test]
650
1
    fn deliver_relay() {
651
4
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
652
4
            use crate::circuit::celltypes::ClientCircChanMsg;
653
4
            use futures::channel::oneshot;
654
4
            use tor_cell::chancell::msg;
655
4

            
656
4
            let (_chan, mut reactor, _output, mut input) = new_reactor();
657
4

            
658
4
            let (_circ_stream_7, mut circ_stream_13) = {
659
4
                let (snd1, _rcv1) = oneshot::channel();
660
4
                let (snd2, rcv2) = mpsc::channel(64);
661
4
                reactor
662
4
                    .circs
663
4
                    .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
664
4

            
665
4
                let (snd3, rcv3) = mpsc::channel(64);
666
4
                reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
667
4

            
668
4
                reactor
669
4
                    .circs
670
4
                    .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
671
4
                (rcv2, rcv3)
672
4
            };
673
4

            
674
4
            // If a relay cell is sent on an open channel, the correct circuit
675
4
            // should get it.
676
4
            let relaycell: ChanMsg = msg::Relay::new(b"do you suppose").into();
677
4
            input
678
4
                .send(Ok(ChanCell::new(13.into(), relaycell.clone())))
679
                .await
680
4
                .unwrap();
681
4
            reactor.run_once().await.unwrap();
682
4
            let got = circ_stream_13.next().await.unwrap();
683
4
            assert!(matches!(got, ClientCircChanMsg::Relay(_)));
684

            
685
            // If a relay cell is sent on an opening channel, that's an error.
686
4
            input
687
4
                .send(Ok(ChanCell::new(7.into(), relaycell.clone())))
688
                .await
689
4
                .unwrap();
690
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
691
4
            assert_eq!(
692
4
            format!("{}", e),
693
4
            "channel protocol violation: Relay cell on pending circuit before CREATED* received"
694
4
        );
695

            
696
            // If a relay cell is sent on a non-existent channel, that's an error.
697
4
            input
698
4
                .send(Ok(ChanCell::new(101.into(), relaycell.clone())))
699
                .await
700
4
                .unwrap();
701
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
702
4
            assert_eq!(
703
4
                format!("{}", e),
704
4
                "channel protocol violation: Relay cell on nonexistent circuit"
705
4
            );
706

            
707
            // It's fine to get a relay cell on a DestroySent channel: that happens
708
            // when the other side hasn't noticed the Destroy yet.
709

            
710
            // We can do this 25 more times according to our setup:
711
104
            for _ in 0..25 {
712
100
                input
713
100
                    .send(Ok(ChanCell::new(23.into(), relaycell.clone())))
714
                    .await
715
100
                    .unwrap();
716
100
                reactor.run_once().await.unwrap(); // should be fine.
717
            }
718

            
719
            // This one will fail.
720
4
            input
721
4
                .send(Ok(ChanCell::new(23.into(), relaycell.clone())))
722
                .await
723
4
                .unwrap();
724
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
725
4
            assert_eq!(
726
4
                format!("{}", e),
727
4
                "channel protocol violation: Too many cells received on destroyed circuit"
728
4
            );
729
4
        });
730
1
    }
731

            
732
1
    #[test]
733
1
    fn deliver_destroy() {
734
4
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
735
4
            use crate::circuit::celltypes::*;
736
4
            use futures::channel::oneshot;
737
4
            use tor_cell::chancell::msg;
738
4

            
739
4
            let (_chan, mut reactor, _output, mut input) = new_reactor();
740
4

            
741
4
            let (circ_oneshot_7, mut circ_stream_13) = {
742
4
                let (snd1, rcv1) = oneshot::channel();
743
4
                let (snd2, _rcv2) = mpsc::channel(64);
744
4
                reactor
745
4
                    .circs
746
4
                    .put_unchecked(7.into(), CircEnt::Opening(snd1, snd2));
747
4

            
748
4
                let (snd3, rcv3) = mpsc::channel(64);
749
4
                reactor.circs.put_unchecked(13.into(), CircEnt::Open(snd3));
750
4

            
751
4
                reactor
752
4
                    .circs
753
4
                    .put_unchecked(23.into(), CircEnt::DestroySent(HalfCirc::new(25)));
754
4
                (rcv1, rcv3)
755
4
            };
756
4

            
757
4
            // Destroying an opening circuit is fine.
758
4
            let destroycell: ChanMsg = msg::Destroy::new(0.into()).into();
759
4
            input
760
4
                .send(Ok(ChanCell::new(7.into(), destroycell.clone())))
761
                .await
762
4
                .unwrap();
763
4
            reactor.run_once().await.unwrap();
764
4
            let msg = circ_oneshot_7.await;
765
4
            assert!(matches!(msg, Ok(CreateResponse::Destroy(_))));
766

            
767
            // Destroying an open circuit is fine.
768
4
            input
769
4
                .send(Ok(ChanCell::new(13.into(), destroycell.clone())))
770
                .await
771
4
                .unwrap();
772
4
            reactor.run_once().await.unwrap();
773
4
            let msg = circ_stream_13.next().await.unwrap();
774
4
            assert!(matches!(msg, ClientCircChanMsg::Destroy(_)));
775

            
776
            // Destroying a DestroySent circuit is fine.
777
4
            input
778
4
                .send(Ok(ChanCell::new(23.into(), destroycell.clone())))
779
                .await
780
4
                .unwrap();
781
4
            reactor.run_once().await.unwrap();
782
4

            
783
4
            // Destroying a nonexistent circuit is an error.
784
4
            input
785
4
                .send(Ok(ChanCell::new(101.into(), destroycell.clone())))
786
                .await
787
4
                .unwrap();
788
4
            let e = reactor.run_once().await.unwrap_err().unwrap_err();
789
4
            assert_eq!(
790
4
                format!("{}", e),
791
4
                "channel protocol violation: Destroy for nonexistent circuit"
792
4
            );
793
4
        });
794
1
    }
795
}