1
//! Code to handle incoming cells on a circuit.
2
use super::streammap::{ShouldSendEnd, StreamEnt};
3
use crate::circuit::celltypes::{ClientCircChanMsg, CreateResponse};
4
use crate::circuit::unique_id::UniqId;
5
use crate::circuit::{
6
    sendme, streammap, CircParameters, Create2Wrap, CreateFastWrap, CreateHandshakeWrap,
7
};
8
use crate::crypto::cell::{
9
    ClientLayer, CryptInit, HopNum, InboundClientCrypt, InboundClientLayer, OutboundClientCrypt,
10
    OutboundClientLayer, RelayCellBody, Tor1RelayCrypto,
11
};
12
use crate::util::err::ReactorError;
13
use crate::{Error, Result};
14
use std::collections::VecDeque;
15
use std::convert::TryFrom;
16
use std::marker::PhantomData;
17
use std::pin::Pin;
18
use tor_cell::chancell::msg::{ChanMsg, Relay};
19
use tor_cell::relaycell::msg::{End, RelayMsg, Sendme};
20
use tor_cell::relaycell::{RelayCell, RelayCmd, StreamId};
21

            
22
use futures::channel::{mpsc, oneshot};
23
use futures::Sink;
24
use futures::Stream;
25
use tor_error::internal;
26

            
27
use std::sync::atomic::{AtomicU8, Ordering};
28
use std::sync::Arc;
29
use std::task::{Context, Poll};
30

            
31
use crate::channel::Channel;
32
#[cfg(test)]
33
use crate::circuit::sendme::CircTag;
34
use crate::circuit::sendme::StreamSendWindow;
35
use crate::crypto::handshake::ntor::{NtorClient, NtorPublicKey};
36
use crate::crypto::handshake::{ClientHandshake, KeyGenerator};
37
use tor_cell::chancell;
38
use tor_cell::chancell::{ChanCell, CircId};
39
use tor_linkspec::LinkSpec;
40
use tor_llcrypto::pk;
41
use tracing::{debug, trace, warn};
42

            
43
/// Initial value for outbound flow-control window on streams.
44
pub(super) const SEND_WINDOW_INIT: u16 = 500;
45
/// Initial value for inbound flow-control window on streams.
46
pub(super) const RECV_WINDOW_INIT: u16 = 500;
47
/// Size of the buffer used between the reactor and a `StreamReader`.
48
///
49
/// FIXME(eta): We pick 2× the receive window, which is very conservative (we arguably shouldn't
50
///             get sent more than the receive window anyway!). We might do due to things that
51
///             don't count towards the window though.
52
pub(super) const STREAM_READER_BUFFER: usize = (2 * RECV_WINDOW_INIT) as usize;
53

            
54
/// The type of a oneshot channel used to inform reactor users of the result of an operation.
55
pub(super) type ReactorResultChannel<T> = oneshot::Sender<Result<T>>;
56

            
57
/// A handshake type, to be used when creating circuit hops.
58
#[derive(Clone, Debug)]
59
pub(super) enum CircuitHandshake {
60
    /// Use the CREATE_FAST handshake.
61
    CreateFast,
62
    /// Use the ntor handshake.
63
    Ntor {
64
        /// The public key of the relay.
65
        public_key: NtorPublicKey,
66
        /// The first hop's Ed25519 identity, which is verified against
67
        /// the identity held in the circuit's channel.
68
        ed_identity: pk::ed25519::Ed25519Identity,
69
    },
70
}
71

            
72
/// A message telling the reactor to do something.
73
#[derive(Debug)]
74
pub(super) enum CtrlMsg {
75
    /// Create the first hop of this circuit.
76
    Create {
77
        /// A oneshot channel on which we'll receive the creation response.
78
        recv_created: oneshot::Receiver<CreateResponse>,
79
        /// The handshake type to use for the first hop.
80
        handshake: CircuitHandshake,
81
        /// Whether the hop supports authenticated SENDME cells.
82
        /// (And therefore, whether we should require them.)
83
        require_sendme_auth: RequireSendmeAuth,
84
        /// Other parameters relevant for circuit creation.
85
        params: CircParameters,
86
        /// Oneshot channel to notify on completion.
87
        done: ReactorResultChannel<()>,
88
    },
89
    /// Extend a circuit by one hop, using the ntor handshake.
90
    ExtendNtor {
91
        /// The handshake type to use for this hop.
92
        public_key: NtorPublicKey,
93
        /// Information about how to connect to the relay we're extending to.
94
        linkspecs: Vec<LinkSpec>,
95
        /// Whether the hop supports authenticated SENDME cells.
96
        /// (And therefore, whether we should require them.)
97
        require_sendme_auth: RequireSendmeAuth,
98
        /// Other parameters relevant for circuit extension.
99
        params: CircParameters,
100
        /// Oneshot channel to notify on completion.
101
        done: ReactorResultChannel<()>,
102
    },
103
    /// Begin a stream with the provided hop in this circuit.
104
    ///
105
    /// Allocates a stream ID, and sends the provided message to that hop.
106
    BeginStream {
107
        /// The hop number to begin the stream with.
108
        hop_num: HopNum,
109
        /// The message to send.
110
        message: RelayMsg,
111
        /// A channel to send messages on this stream down.
112
        ///
113
        /// This sender shouldn't ever block, because we use congestion control and only send
114
        /// SENDME cells once we've read enough out of the other end. If it *does* block, we
115
        /// can assume someone is trying to send us more cells than they should, and abort
116
        /// the stream.
117
        sender: mpsc::Sender<RelayMsg>,
118
        /// A channel to receive messages to send on this stream from.
119
        rx: mpsc::Receiver<RelayMsg>,
120
        /// Oneshot channel to notify on completion, with the allocated stream ID.
121
        done: ReactorResultChannel<StreamId>,
122
    },
123
    /// Send a SENDME cell (used to ask for more data to be sent) on the given stream.
124
    SendSendme {
125
        /// The stream ID to send a SENDME for.
126
        stream_id: StreamId,
127
        /// The hop number the stream is on.
128
        hop_num: HopNum,
129
    },
130
    /// Shut down the reactor.
131
    Shutdown,
132
    /// (tests only) Add a hop to the list of hops on this circuit, with dummy cryptography.
133
    #[cfg(test)]
134
    AddFakeHop {
135
        supports_flowctrl_1: bool,
136
        fwd_lasthop: bool,
137
        rev_lasthop: bool,
138
        params: CircParameters,
139
        done: ReactorResultChannel<()>,
140
    },
141
    /// (tests only) Get the send window and expected tags for a given hop.
142
    #[cfg(test)]
143
    QuerySendWindow {
144
        hop: HopNum,
145
        done: ReactorResultChannel<(u16, Vec<CircTag>)>,
146
    },
147
    /// (tests only) Send a raw relay cell with send_relay_cell().
148
    #[cfg(test)]
149
    SendRelayCell {
150
        hop: HopNum,
151
        early: bool,
152
        cell: RelayCell,
153
    },
154
}
155
/// Represents the reactor's view of a single hop.
156
pub(super) struct CircHop {
157
    /// Map from stream IDs to streams.
158
    ///
159
    /// We store this with the reactor instead of the circuit, since the
160
    /// reactor needs it for every incoming cell on a stream, whereas
161
    /// the circuit only needs it when allocating new streams.
162
    map: streammap::StreamMap,
163
    /// Window used to say how many cells we can receive.
164
    recvwindow: sendme::CircRecvWindow,
165
    /// If true, this hop is using an older link protocol and we
166
    /// shouldn't expect good authenticated SENDMEs from it.
167
    auth_sendme_required: RequireSendmeAuth,
168
    /// Window used to say how many cells we can send.
169
    sendwindow: sendme::CircSendWindow,
170
    /// Buffer for messages we can't send to this hop yet due to congestion control.
171
    ///
172
    /// Contains the cell to send, and a boolean equivalent to the `early` parameter
173
    /// in `Reactor::send_relay_cell` (as in, whether to send the cell using `RELAY_EARLY`).
174
    ///
175
    /// This shouldn't grow unboundedly: we try and pop things off it first before
176
    /// doing things that would result in it growing (and stop before growing it
177
    /// if popping things off it can't be done).
178
    ///
179
    /// NOTE: Control messages could potentially add unboundedly to this, although that's
180
    ///       not likely to happen (and isn't triggereable from the network, either).
181
    outbound: VecDeque<(bool, RelayCell)>,
182
}
183

            
184
/// Enumeration to determine whether we require circuit-level SENDME cells to be
185
/// authenticated.
186
///
187
/// (This is an enumeration rather than a boolean to prevent accidental sense
188
/// inversion.)
189
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
190
pub(super) enum RequireSendmeAuth {
191
    /// Sendme authentication is expected from this hop, and therefore is
192
    /// required.
193
    Yes,
194
    /// Sendme authentication is not expected from this hop, and therefore not
195
    /// required.
196
    No,
197
}
198

            
199
impl RequireSendmeAuth {
200
    /// Create an appropriate [`RequireSendmeAuth`] for a given set of relay
201
    /// subprotocol versions.
202
    //
203
    // TODO(nickm): At some point in the future, once there are no 0.3.5 relays
204
    // on the Tor network, we can safely require authenticated SENDMEs from all
205
    // relays.
206
    //
207
    // At that point, if we have a relay implementation in Rust, it should look
208
    // at the network parameter `SendmeAcceptMinVersion` when deciding whether
209
    // to require authenticated SENDMEs.
210
24
    pub(super) fn from_protocols(protocols: &tor_protover::Protocols) -> Self {
211
24
        if protocols.supports_known_subver(tor_protover::ProtoKind::FlowCtrl, 1) {
212
            // The relay supports FlowCtrl=1, and therefore will authenticate.
213
24
            RequireSendmeAuth::Yes
214
        } else {
215
            RequireSendmeAuth::No
216
        }
217
24
    }
218
}
219

            
220
/// An indicator on what we should do when we receive a cell for a circuit.
221
44
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
222
enum CellStatus {
223
    /// The circuit should stay open.
224
    Continue,
225
    /// Perform a clean shutdown on this circuit.
226
    CleanShutdown,
227
}
228

            
229
impl CircHop {
230
    /// Create a new hop.
231
119
    pub(super) fn new(auth_sendme_required: RequireSendmeAuth, initial_window: u16) -> Self {
232
119
        CircHop {
233
119
            map: streammap::StreamMap::new(),
234
119
            recvwindow: sendme::CircRecvWindow::new(1000),
235
119
            auth_sendme_required,
236
119
            sendwindow: sendme::CircSendWindow::new(initial_window),
237
119
            outbound: VecDeque::new(),
238
119
        }
239
119
    }
240
}
241

            
242
/// An object that's waiting for a meta cell (one not associated with a stream) in order to make
243
/// progress.
244
///
245
/// # Background
246
///
247
/// The `Reactor` can't have async functions that send and receive cells, because its job is to
248
/// send and receive cells: if one of its functions tried to do that, it would just hang forever.
249
///
250
/// To get around this problem, the reactor can send some cells, and then make one of these
251
/// `MetaCellHandler` objects, which will be run when the reply arrives.
252
pub(super) trait MetaCellHandler: Send {
253
    /// The hop we're expecting the message to come from. This is compared against the hop
254
    /// from which we actually receive messages, and an error is thrown if the two don't match.
255
    fn expected_hop(&self) -> HopNum;
256
    /// Called when the message we were waiting for arrives.
257
    ///
258
    /// Gets a copy of the `Reactor` in order to do anything it likes there.
259
    fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()>;
260
}
261

            
262
/// An object that can extend a circuit by one hop, using the `MetaCellHandler` trait.
263
///
264
/// Yes, I know having trait bounds on structs is bad, but in this case it's necessary
265
/// since we want to be able to use `H::KeyType`.
266
struct CircuitExtender<H, L, FWD, REV>
267
where
268
    H: ClientHandshake,
269
{
270
    /// Handshake state.
271
    state: Option<H::StateType>,
272
    /// Whether the hop supports authenticated SENDME cells.
273
    /// (And therefore, whether we require them.)
274
    require_sendme_auth: RequireSendmeAuth,
275
    /// Parameters used for this extension.
276
    params: CircParameters,
277
    /// An identifier for logging about this reactor's circuit.
278
    unique_id: UniqId,
279
    /// The hop we're expecting the EXTENDED2 cell to come back from.
280
    expected_hop: HopNum,
281
    /// `PhantomData` used to make the other type parameters required for a circuit extension
282
    /// part of the `struct`, instead of having them be provided during a function call.
283
    ///
284
    /// This is done this way so we can implement `MetaCellHandler` for this type, which
285
    /// doesn't include any generic type parameters; we need them to be part of the type
286
    /// so we know what they are for that `impl` block.
287
    phantom: PhantomData<(L, FWD, REV)>,
288
}
289
impl<H, L, FWD, REV> CircuitExtender<H, L, FWD, REV>
290
where
291
    H: ClientHandshake,
292
    H::KeyGen: KeyGenerator,
293
    L: CryptInit + ClientLayer<FWD, REV>,
294
    FWD: OutboundClientLayer + 'static + Send,
295
    REV: InboundClientLayer + 'static + Send,
296
{
297
    /// Start extending a circuit, sending the necessary EXTEND cell and returning a
298
    /// new `CircuitExtender` to be called when the reply arrives.
299
    ///
300
    /// The `handshake_id` is the numeric identifier for what kind of
301
    /// handshake we're doing.  The `key` is the relay's onion key that
302
    /// goes along with the handshake, and the `linkspecs` are the
303
    /// link specifiers to include in the EXTEND cell to tell the
304
    /// current last hop which relay to connect to.
305
20
    fn begin(
306
20
        cx: &mut Context<'_>,
307
20
        handshake_id: u16,
308
20
        key: &H::KeyType,
309
20
        linkspecs: Vec<LinkSpec>,
310
20
        require_sendme_auth: RequireSendmeAuth,
311
20
        params: CircParameters,
312
20
        reactor: &mut Reactor,
313
20
    ) -> Result<Self> {
314
20
        let mut rng = rand::thread_rng();
315
20
        let unique_id = reactor.unique_id;
316

            
317
        use tor_cell::relaycell::msg::{Body, Extend2};
318
        // Perform the first part of the cryptographic handshake
319
20
        let (state, msg) = H::client1(&mut rng, key)?;
320

            
321
20
        let n_hops = reactor.crypto_out.n_layers();
322
20
        let hop = ((n_hops - 1) as u8).into();
323
20

            
324
20
        debug!(
325
            "{}: Extending circuit to hop {} with {:?}",
326
            unique_id,
327
            n_hops + 1,
328
            linkspecs
329
        );
330

            
331
20
        let extend_msg = Extend2::new(linkspecs, handshake_id, msg);
332
20
        let cell = RelayCell::new(0.into(), extend_msg.into_message());
333
20

            
334
20
        // Send the message to the last hop...
335
20
        reactor.send_relay_cell(
336
20
            cx, hop, true, // use a RELAY_EARLY cell
337
20
            cell,
338
20
        )?;
339
20
        trace!("{}: waiting for EXTENDED2 cell", unique_id);
340
        // ... and now we wait for a response.
341

            
342
20
        Ok(Self {
343
20
            state: Some(state),
344
20
            require_sendme_auth,
345
20
            params,
346
20
            unique_id,
347
20
            expected_hop: hop,
348
20
            phantom: Default::default(),
349
20
        })
350
20
    }
351
}
352

            
353
impl<H, L, FWD, REV> MetaCellHandler for CircuitExtender<H, L, FWD, REV>
354
where
355
    H: ClientHandshake,
356
    H::StateType: Send,
357
    H::KeyGen: KeyGenerator,
358
    L: CryptInit + ClientLayer<FWD, REV> + Send,
359
    FWD: OutboundClientLayer + 'static + Send,
360
    REV: InboundClientLayer + 'static + Send,
361
{
362
16
    fn expected_hop(&self) -> HopNum {
363
16
        self.expected_hop
364
16
    }
365
12
    fn finish(&mut self, msg: RelayMsg, reactor: &mut Reactor) -> Result<()> {
366
12
        // Did we get the right response?
367
12
        if msg.cmd() != RelayCmd::EXTENDED2 {
368
4
            return Err(Error::CircProto(format!(
369
4
                "wanted EXTENDED2; got {}",
370
4
                msg.cmd(),
371
4
            )));
372
8
        }
373

            
374
        // ???? Do we need to shutdown the circuit for the remaining error
375
        // ???? cases in this function?
376

            
377
8
        let msg = match msg {
378
8
            RelayMsg::Extended2(e) => e,
379
            _ => {
380
                return Err(Error::from(internal!(
381
                    "Message body {:?} didn't match cmd {:?}",
382
                    msg,
383
                    msg.cmd()
384
                )))
385
            }
386
        };
387
8
        let relay_handshake = msg.into_body();
388
8

            
389
8
        trace!(
390
            "{}: Received EXTENDED2 cell; completing handshake.",
391
            self.unique_id
392
        );
393
        // Now perform the second part of the handshake, and see if it
394
        // succeeded.
395
8
        let keygen = H::client2(
396
8
            self.state
397
8
                .take()
398
8
                .expect("CircuitExtender::finish() called twice"),
399
8
            relay_handshake,
400
8
        )?;
401
4
        let layer = L::construct(keygen)?;
402

            
403
4
        debug!("{}: Handshake complete; circuit extended.", self.unique_id);
404

            
405
        // If we get here, it succeeded.  Add a new hop to the circuit.
406
4
        let (layer_fwd, layer_back) = layer.split();
407
4
        reactor.add_hop(
408
4
            self.require_sendme_auth,
409
4
            Box::new(layer_fwd),
410
4
            Box::new(layer_back),
411
4
            &self.params,
412
4
        );
413
4
        Ok(())
414
12
    }
415
}
416

            
417
/// Object to handle incoming cells and background tasks on a circuit
418
///
419
/// This type is returned when you finish a circuit; you need to spawn a
420
/// new task that calls `run()` on it.
421
#[must_use = "If you don't call run() on a reactor, the circuit won't work."]
422
pub struct Reactor {
423
    /// Receiver for control messages for this reactor, sent by `ClientCirc` objects.
424
    pub(super) control: mpsc::UnboundedReceiver<CtrlMsg>,
425
    /// Buffer for cells we can't send out the channel yet due to it being full.
426
    ///
427
    /// We try and dequeue off this first before doing anything else, ensuring that
428
    /// it cannot grow unboundedly (and if we start having to enqueue things on here after
429
    /// the channel shows backpressure, we stop pulling from receivers that could send here).
430
    ///
431
    /// NOTE: Control messages could potentially add unboundedly to this, although that's
432
    ///       not likely to happen (and isn't triggereable from the network, either).
433
    pub(super) outbound: VecDeque<ChanCell>,
434
    /// The channel this circuit is using to send cells through.
435
    pub(super) channel: Channel,
436
    /// Input stream, on which we receive ChanMsg objects from this circuit's
437
    /// channel.
438
    // TODO: could use a SPSC channel here instead.
439
    pub(super) input: mpsc::Receiver<ClientCircChanMsg>,
440
    /// The cryptographic state for this circuit for inbound cells.
441
    /// This object is divided into multiple layers, each of which is
442
    /// shared with one hop of the circuit.
443
    pub(super) crypto_in: InboundClientCrypt,
444
    /// The cryptographic state for this circuit for outbound cells.
445
    pub(super) crypto_out: OutboundClientCrypt,
446
    /// List of hops state objects used by the reactor
447
    pub(super) hops: Vec<CircHop>,
448
    /// Shared atomic for the number of hops this circuit has.
449
    pub(super) num_hops: Arc<AtomicU8>,
450
    /// An identifier for logging about this reactor's circuit.
451
    pub(super) unique_id: UniqId,
452
    /// This circuit's identifier on the upstream channel.
453
    pub(super) channel_id: CircId,
454
    /// A handler for a meta cell, together with a result channel to notify on completion.
455
    pub(super) meta_handler: Option<(Box<dyn MetaCellHandler>, ReactorResultChannel<()>)>,
456
}
457

            
458
impl Reactor {
459
    /// Launch the reactor, and run until the circuit closes or we
460
    /// encounter an error.
461
    ///
462
    /// Once this method returns, the circuit is dead and cannot be
463
    /// used again.
464
47
    pub async fn run(mut self) -> Result<()> {
465
        trace!("{}: Running circuit reactor", self.unique_id);
466
45
        let result: Result<()> = loop {
467
2656
            match self.run_once().await {
468
2609
                Ok(()) => (),
469
37
                Err(ReactorError::Shutdown) => break Ok(()),
470
8
                Err(ReactorError::Err(e)) => break Err(e),
471
            }
472
        };
473
        debug!("{}: Circuit reactor stopped: {:?}", self.unique_id, result);
474
45
        result
475
45
    }
476

            
477
    /// Helper for run: doesn't mark the circuit closed on finish.  Only
478
    /// processes one cell or control message.
479
2658
    pub(super) async fn run_once(&mut self) -> std::result::Result<(), ReactorError> {
480
2658
        #[allow(clippy::cognitive_complexity)]
481
2842
        let fut = futures::future::poll_fn(|cx| -> Poll<std::result::Result<_, ReactorError>> {
482
2842
            let mut create_message = None;
483
2842
            let mut did_things = false;
484

            
485
            // Check whether we've got a control message pending.
486
2842
            if let Poll::Ready(ret) = Pin::new(&mut self.control).poll_next(cx) {
487
159
                match ret {
488
                    None => {
489
29
                        trace!("{}: reactor shutdown due to control drop", self.unique_id);
490
29
                        return Poll::Ready(Err(ReactorError::Shutdown));
491
                    }
492
                    Some(CtrlMsg::Shutdown) => {
493
                        trace!(
494
                            "{}: reactor shutdown due to explicit request",
495
                            self.unique_id
496
                        );
497
                        return Poll::Ready(Err(ReactorError::Shutdown));
498
                    }
499
                    // This message requires actually blocking, so we can't handle it inside
500
                    // this nonblocking poll_fn.
501
8
                    Some(x @ CtrlMsg::Create { .. }) => create_message = Some(x),
502
152
                    Some(msg) => {
503
152
                        self.handle_control(cx, msg)?;
504
149
                        did_things = true;
505
                    }
506
                }
507
2654
            }
508

            
509
            // Check whether we've got an input message pending.
510
2811
            if let Poll::Ready(ret) = Pin::new(&mut self.input).poll_next(cx) {
511
56
                match ret {
512
                    None => {
513
4
                        trace!("{}: reactor shutdown due to input drop", self.unique_id);
514
4
                        return Poll::Ready(Err(ReactorError::Shutdown));
515
                    }
516
52
                    Some(cell) => {
517
52
                        if self.handle_cell(cx, cell)? == CellStatus::CleanShutdown {
518
4
                            trace!("{}: reactor shutdown due to handled cell", self.unique_id);
519
4
                            return Poll::Ready(Err(ReactorError::Shutdown));
520
40
                        }
521
40
                        did_things = true;
522
                    }
523
                }
524
2755
            }
525

            
526
            // Now for the tricky part. We want to grab some relay cells from all of our streams
527
            // and forward them on to the channel, but we need to pay attention to both whether
528
            // the channel can accept cells right now, and whether congestion control allows us
529
            // to send them.
530
            //
531
            // We also have to do somewhat cursed things and call start_send inside this poll_fn,
532
            // since we need to check whether the channel can still receive cells after each one
533
            // that we send.
534

            
535
2795
            let mut streams_to_close = vec![];
536
2795
            let mut stream_relaycells = vec![];
537
2795

            
538
2795
            // Is the channel ready to receive anything at all?
539
2795
            if self.channel.poll_ready(cx)? {
540
                // (using this as a named block for early returns; not actually a loop)
541
                #[allow(clippy::never_loop)]
542
                'outer: loop {
543
                    // First, drain our queue of things we tried to send earlier, but couldn't.
544
2788
                    while let Some(msg) = self.outbound.pop_front() {
545
                        trace!("{}: sending from enqueued: {:?}", self.unique_id, msg);
546
                        Pin::new(&mut self.channel).start_send(msg)?;
547

            
548
                        // `futures::Sink::start_send` dictates we need to call `poll_ready` before
549
                        // each `start_send` call.
550
                        if !self.channel.poll_ready(cx)? {
551
                            break 'outer;
552
                        }
553
                    }
554

            
555
                    // Let's look at our hops, and streams for each hop.
556
8170
                    for i in 0..self.hops.len() {
557
8170
                        let hop_num = HopNum::from(i as u8);
558
8170
                        // If we can, drain our queue of things we tried to send earlier, but
559
8170
                        // couldn't due to congestion control.
560
8170
                        if self.hops[i].sendwindow.window() > 0 {
561
                            'hop: while let Some((early, cell)) = self.hops[i].outbound.pop_front()
562
                            {
563
                                trace!(
564
                                    "{}: sending from hop-{}-enqueued: {:?}",
565
                                    self.unique_id,
566
                                    i,
567
                                    cell
568
                                );
569
                                self.send_relay_cell(cx, hop_num, early, cell)?;
570
                                if !self.channel.poll_ready(cx)? {
571
                                    break 'outer;
572
                                }
573
                                if self.hops[i].sendwindow.window() == 0 {
574
                                    break 'hop;
575
                                }
576
                            }
577
1
                        }
578
8171
                        let hop = &mut self.hops[i];
579
                        // Look at all of the streams on this hop.
580
8171
                        for (id, stream) in hop.map.inner().iter_mut() {
581
                            if let StreamEnt::Open {
582
2509
                                rx, send_window, ..
583
2517
                            } = stream
584
                            {
585
                                // Do the stream and hop send windows allow us to obtain and
586
                                // send something?
587
                                //
588
                                // FIXME(eta): not everything counts toward congestion control!
589
2509
                                if send_window.window() > 0 && hop.sendwindow.window() > 0 {
590
2509
                                    match Pin::new(rx).poll_next(cx) {
591
2412
                                        Poll::Ready(Some(m)) => {
592
2412
                                            stream_relaycells
593
2412
                                                .push((hop_num, RelayCell::new(*id, m)));
594
2412
                                        }
595
                                        Poll::Ready(None) => {
596
                                            // Stream receiver was dropped; close the stream.
597
                                            // We can't close it here though due to borrowck; that
598
                                            // will happen later.
599
                                            streams_to_close.push((hop_num, *id));
600
                                        }
601
97
                                        Poll::Pending => {}
602
                                    }
603
                                }
604
8
                            }
605
                        }
606
                    }
607

            
608
2789
                    break;
609
                }
610
8
            }
611

            
612
            // Close the streams we said we'd close.
613
2797
            for (hopn, id) in streams_to_close {
614
2
                self.close_stream(cx, hopn, id)?;
615
                did_things = true;
616
            }
617
            // Send messages we said we'd send.
618
5207
            for (hopn, rc) in stream_relaycells {
619
2413
                self.send_relay_cell(cx, hopn, false, rc)?;
620
2412
                did_things = true;
621
            }
622

            
623
2794
            let _ = Pin::new(&mut self.channel)
624
2794
                .poll_flush(cx)
625
2794
                .map_err(|_| Error::ChannelClosed)?;
626
2794
            if create_message.is_some() {
627
8
                Poll::Ready(Ok(create_message))
628
2786
            } else if did_things {
629
2603
                Poll::Ready(Ok(None))
630
            } else {
631
186
                Poll::Pending
632
            }
633
2842
        });
634
2658
        let create_message = fut.await?;
635
        if let Some(CtrlMsg::Create {
636
8
            recv_created,
637
8
            handshake,
638
8
            require_sendme_auth,
639
8
            params,
640
8
            done,
641
2611
        }) = create_message
642
        {
643
8
            let ret = match handshake {
644
                CircuitHandshake::CreateFast => {
645
4
                    self.create_firsthop_fast(recv_created, &params).await
646
                }
647
                CircuitHandshake::Ntor {
648
4
                    public_key,
649
4
                    ed_identity,
650
4
                } => {
651
4
                    self.create_firsthop_ntor(
652
4
                        recv_created,
653
4
                        ed_identity,
654
4
                        public_key,
655
4
                        require_sendme_auth,
656
4
                        &params,
657
4
                    )
658
4
                    .await
659
                }
660
            };
661
8
            let _ = done.send(ret); // don't care if sender goes away
662
8
            futures::future::poll_fn(|cx| -> Poll<Result<()>> {
663
8
                let _ = Pin::new(&mut self.channel)
664
8
                    .poll_flush(cx)
665
8
                    .map_err(|_| Error::ChannelClosed)?;
666
8
                Poll::Ready(Ok(()))
667
8
            })
668
            .await?;
669
2603
        }
670
2611
        Ok(())
671
2656
    }
672

            
673
    /// Helper: create the first hop of a circuit.
674
    ///
675
    /// This is parameterized not just on the RNG, but a wrapper object to
676
    /// build the right kind of create cell, a handshake object to perform
677
    /// the cryptographic cryptographic handshake, and a layer type to
678
    /// handle relay crypto after this hop is built.
679
8
    async fn create_impl<L, FWD, REV, H, W>(
680
8
        &mut self,
681
8
        recvcreated: oneshot::Receiver<CreateResponse>,
682
8
        wrap: &W,
683
8
        key: &H::KeyType,
684
8
        require_sendme_auth: RequireSendmeAuth,
685
8
        params: &CircParameters,
686
8
    ) -> Result<()>
687
8
    where
688
8
        L: CryptInit + ClientLayer<FWD, REV> + 'static + Send,
689
8
        FWD: OutboundClientLayer + 'static + Send,
690
8
        REV: InboundClientLayer + 'static + Send,
691
8
        H: ClientHandshake,
692
8
        W: CreateHandshakeWrap,
693
8
        H::KeyGen: KeyGenerator,
694
8
    {
695
        // We don't need to shut down the circuit on failure here, since this
696
        // function consumes the PendingClientCirc and only returns
697
        // a ClientCirc on success.
698

            
699
8
        let (state, msg) = {
700
            // done like this because holding the RNG across an await boundary makes the future
701
            // non-Send
702
8
            let mut rng = rand::thread_rng();
703
8
            H::client1(&mut rng, key)?
704
        };
705
8
        let create_cell = wrap.to_chanmsg(msg);
706
        debug!(
707
            "{}: Extending to hop 1 with {}",
708
            self.unique_id,
709
            create_cell.cmd()
710
        );
711
8
        self.send_msg(create_cell).await?;
712

            
713
8
        let reply = recvcreated
714
8
            .await
715
8
            .map_err(|_| Error::CircProto("Circuit closed while waiting".into()))?;
716

            
717
8
        let relay_handshake = wrap.decode_chanmsg(reply)?;
718
8
        let keygen = H::client2(state, relay_handshake)?;
719

            
720
8
        let layer = L::construct(keygen)?;
721

            
722
        debug!("{}: Handshake complete; circuit created.", self.unique_id);
723

            
724
8
        let (layer_fwd, layer_back) = layer.split();
725
8
        self.add_hop(
726
8
            require_sendme_auth,
727
8
            Box::new(layer_fwd),
728
8
            Box::new(layer_back),
729
8
            params,
730
8
        );
731
8
        Ok(())
732
8
    }
733

            
734
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
735
    /// first hop of this circuit.
736
    ///
737
    /// There's no authentication in CREATE_FAST,
738
    /// so we don't need to know whom we're connecting to: we're just
739
    /// connecting to whichever relay the channel is for.
740
4
    async fn create_firsthop_fast(
741
4
        &mut self,
742
4
        recvcreated: oneshot::Receiver<CreateResponse>,
743
4
        params: &CircParameters,
744
4
    ) -> Result<()> {
745
4
        use crate::crypto::handshake::fast::CreateFastClient;
746
4
        let wrap = CreateFastWrap;
747
4
        self.create_impl::<Tor1RelayCrypto, _, _, CreateFastClient, _>(
748
4
            recvcreated,
749
4
            &wrap,
750
4
            &(),
751
4
            RequireSendmeAuth::No,
752
4
            params,
753
4
        )
754
4
        .await
755
4
    }
756

            
757
    /// Use the ntor handshake to connect to the first hop of this circuit.
758
    ///
759
    /// Note that the provided 'target' must match the channel's target,
760
    /// or the handshake will fail.
761
4
    async fn create_firsthop_ntor(
762
4
        &mut self,
763
4
        recvcreated: oneshot::Receiver<CreateResponse>,
764
4
        ed_identity: pk::ed25519::Ed25519Identity,
765
4
        pubkey: NtorPublicKey,
766
4
        require_sendme_auth: RequireSendmeAuth,
767
4
        params: &CircParameters,
768
4
    ) -> Result<()> {
769
4
        // Exit now if we have an Ed25519 or RSA identity mismatch.
770
4
        // FIXME(eta): this is copypasta from Channel::check_match!
771
4
        if self.channel.peer_rsa_id() != &pubkey.id {
772
            return Err(Error::ChanMismatch(format!(
773
                "Identity {} does not match target {}",
774
                self.channel.peer_rsa_id(),
775
                pubkey.id,
776
            )));
777
4
        }
778
4
        if self.channel.peer_ed25519_id() != &ed_identity {
779
            return Err(Error::ChanMismatch(format!(
780
                "Identity {} does not match target {}",
781
                self.channel.peer_ed25519_id(),
782
                ed_identity
783
            )));
784
4
        }
785
4

            
786
4
        let wrap = Create2Wrap {
787
4
            handshake_type: 0x0002, // ntor
788
4
        };
789
4
        self.create_impl::<Tor1RelayCrypto, _, _, NtorClient, _>(
790
4
            recvcreated,
791
4
            &wrap,
792
4
            &pubkey,
793
4
            require_sendme_auth,
794
4
            params,
795
4
        )
796
4
        .await
797
4
    }
798

            
799
    /// Add a hop to the end of this circuit.
800
120
    fn add_hop(
801
120
        &mut self,
802
120
        require_sendme_auth: RequireSendmeAuth,
803
120
        fwd: Box<dyn OutboundClientLayer + 'static + Send>,
804
120
        rev: Box<dyn InboundClientLayer + 'static + Send>,
805
120
        params: &CircParameters,
806
120
    ) {
807
120
        let hop = crate::circuit::reactor::CircHop::new(
808
120
            require_sendme_auth,
809
120
            params.initial_send_window(),
810
120
        );
811
120
        self.hops.push(hop);
812
120
        self.crypto_in.add_layer(rev);
813
120
        self.crypto_out.add_layer(fwd);
814
120
        self.num_hops.fetch_add(1, Ordering::SeqCst);
815
120
    }
816

            
817
    /// Handle a RELAY cell on this circuit with stream ID 0.
818
    fn handle_meta_cell(&mut self, hopnum: HopNum, msg: RelayMsg) -> Result<CellStatus> {
819
        // SENDME cells and TRUNCATED get handled internally by the circuit.
820
24
        if let RelayMsg::Sendme(s) = msg {
821
8
            return self.handle_sendme(hopnum, s);
822
16
        }
823
16
        if let RelayMsg::Truncated(t) = msg {
824
            let reason = t.reason();
825
            debug!(
826
                "{}: Truncated from hop {}. Reason: {} [{}]",
827
                self.unique_id,
828
                hopnum,
829
                reason.human_str(),
830
                reason
831
            );
832

            
833
            return Ok(CellStatus::CleanShutdown);
834
16
        }
835
16

            
836
16
        trace!("{}: Received meta-cell {:?}", self.unique_id, msg);
837

            
838
        // For all other command types, we'll only get them in response
839
        // to another command, which should have registered a responder.
840
        //
841
        // TODO: that means that service-introduction circuits will need
842
        // a different implementation, but that should be okay. We'll work
843
        // something out.
844
16
        if let Some((mut handler, done)) = self.meta_handler.take() {
845
16
            if handler.expected_hop() == hopnum {
846
                // Somebody was waiting for a message -- maybe this message
847
12
                let ret = handler.finish(msg, self);
848
12
                trace!(
849
                    "{}: meta handler completed with result: {:?}",
850
                    self.unique_id,
851
                    ret
852
                );
853
12
                let _ = done.send(ret); // don't care if sender goes away
854
12
                Ok(CellStatus::Continue)
855
            } else {
856
                // Somebody wanted a message from a different hop!  Put this
857
                // one back.
858
4
                self.meta_handler = Some((handler, done));
859
4
                Err(Error::CircProto(format!(
860
4
                    "Unexpected {} cell from hop {} on client circuit",
861
4
                    msg.cmd(),
862
4
                    hopnum,
863
4
                )))
864
            }
865
        } else {
866
            // No need to call shutdown here, since this error will
867
            // propagate to the reactor shut it down.
868
            Err(Error::CircProto(format!(
869
                "Unexpected {} cell on client circuit",
870
                msg.cmd()
871
            )))
872
        }
873
24
    }
874

            
875
    /// Handle a RELAY_SENDME cell on this circuit with stream ID 0.
876
8
    fn handle_sendme(&mut self, hopnum: HopNum, msg: Sendme) -> Result<CellStatus> {
877
        // No need to call "shutdown" on errors in this function;
878
        // it's called from the reactor task and errors will propagate there.
879
8
        let hop = self
880
8
            .hop_mut(hopnum)
881
8
            .ok_or_else(|| Error::CircProto(format!("Couldn't find {} hop", hopnum)))?;
882

            
883
8
        let auth: Option<[u8; 20]> = match msg.into_tag() {
884
8
            Some(v) => {
885
8
                if let Ok(tag) = <[u8; 20]>::try_from(v) {
886
8
                    Some(tag)
887
                } else {
888
                    return Err(Error::CircProto("malformed tag on circuit sendme".into()));
889
                }
890
            }
891
            None => {
892
                if hop.auth_sendme_required == RequireSendmeAuth::Yes {
893
                    return Err(Error::CircProto("missing tag on circuit sendme".into()));
894
                } else {
895
                    None
896
                }
897
            }
898
        };
899
8
        hop.sendwindow.put(auth)?;
900
4
        Ok(CellStatus::Continue)
901
8
    }
902

            
903
    /// Send a message onto the circuit's channel (to be called with a `Context`)
904
    ///
905
    /// If the channel is ready to accept messages, it will be sent immediately. If not, the message
906
    /// will be enqueued for sending at a later iteration of the reactor loop.
907
    ///
908
    /// # Note
909
    ///
910
    /// Making use of the enqueuing capabilities of this function is discouraged! You should first
911
    /// check whether the channel is ready to receive messages (`self.channel.poll_ready`), and
912
    /// ideally use this to implement backpressure (such that you do not read from other sources
913
    /// that would send here while you know you're unable to forward the messages on).
914
2456
    fn send_msg_direct(&mut self, cx: &mut Context<'_>, msg: ChanMsg) -> Result<()> {
915
2456
        let cell = ChanCell::new(self.channel_id, msg);
916
2456
        // NOTE(eta): We need to check whether the outbound queue is empty before trying to send:
917
2456
        //            if we just checked whether the channel was ready, it'd be possible for
918
2456
        //            cells to be sent out of order, since it could transition from not ready to
919
2456
        //            ready during one cycle of the reactor!
920
2456
        //            (This manifests as a protocol violation.)
921
2456
        if self.outbound.is_empty() && self.channel.poll_ready(cx)? {
922
2456
            Pin::new(&mut self.channel).start_send(cell)?;
923
        } else {
924
            // This has been observed to happen in code that doesn't have bugs in it, simply due
925
            // to the way `Channel`'s `poll_ready` implementation works (it can change due to
926
            // the actions of another thread in between callers of this function checking it,
927
            // and this function checking it).
928
            //
929
            // However, if it's happening a lot more than it used to, that probably indicates
930
            // some caller that's not checking whether the channel is full before calling
931
            // this function.
932

            
933
            debug!(
934
                "{}: having to enqueue cell due to backpressure: {:?}",
935
                self.unique_id, cell
936
            );
937
            self.outbound.push_back(cell);
938

            
939
            // Ensure we absolutely get scheduled again to clear `self.outbound`.
940
            cx.waker().wake_by_ref();
941
        }
942
2456
        Ok(())
943
2456
    }
944

            
945
    /// Wrapper around `send_msg_direct` that uses `futures::future::poll_fn` to get a `Context`.
946
8
    async fn send_msg(&mut self, msg: ChanMsg) -> Result<()> {
947
8
        // HACK(eta): technically the closure passed to `poll_fn` is a `FnMut` closure, since it
948
8
        //            can be polled multiple times.
949
8
        //            We're going to return Ready immediately since we're only using `poll_fn` to
950
8
        //            get a `Context`, but the compiler doesn't know that, so use an `Option`
951
8
        //            which we can `take()` in order to move out of it.
952
8
        //            (if we do get polled again this'll panic, but that shouldn't happen!)
953
8
        let mut msg = Some(msg);
954
8
        futures::future::poll_fn(|cx| -> Poll<Result<()>> {
955
8
            self.send_msg_direct(cx, msg.take().expect("poll_fn called twice?"))?;
956
8
            Poll::Ready(Ok(()))
957
8
        })
958
        .await?;
959
8
        Ok(())
960
8
    }
961

            
962
    /// Encode the relay cell `cell`, encrypt it, and send it to the 'hop'th hop.
963
    ///
964
    /// Does not check whether the cell is well-formed or reasonable.
965
2447
    fn send_relay_cell(
966
2447
        &mut self,
967
2447
        cx: &mut Context<'_>,
968
2447
        hop: HopNum,
969
2447
        early: bool,
970
2447
        cell: RelayCell,
971
2447
    ) -> Result<()> {
972
2447
        let c_t_w = sendme::cell_counts_towards_windows(&cell);
973
2447
        let stream_id = cell.stream_id();
974
2447
        // Check whether the hop send window is empty, if this cell counts towards windows.
975
2447
        // NOTE(eta): It is imperative this happens *before* calling encrypt() below, otherwise
976
2447
        //            we'll have cells rejected due to a protocol violation! (Cells have to be
977
2447
        //            sent out in the order they were passed to encrypt().)
978
2447
        if c_t_w {
979
2412
            let hop_num = Into::<usize>::into(hop);
980
2412
            let hop = &mut self.hops[hop_num];
981
2412
            if hop.sendwindow.window() == 0 {
982
                // Send window is empty! Push this cell onto the hop's outbound queue, and it'll
983
                // get sent later.
984
                trace!(
985
                    "{}: having to use onto hop {} queue for cell: {:?}",
986
                    self.unique_id,
987
                    hop_num,
988
                    cell
989
                );
990
                hop.outbound.push_back((early, cell));
991
                return Ok(());
992
2412
            }
993
35
        }
994
2447
        let mut body: RelayCellBody = cell.encode(&mut rand::thread_rng())?.into();
995
2447
        let tag = self.crypto_out.encrypt(&mut body, hop)?;
996
        // NOTE(eta): Now that we've encrypted the cell, we *must* either send it or abort
997
        //            the whole circuit (e.g. by returning an error).
998
2447
        let msg = chancell::msg::Relay::from_raw(body.into());
999
2447
        let msg = if early {
20
            ChanMsg::RelayEarly(msg)
        } else {
2427
            ChanMsg::Relay(msg)
        };
        // If the cell counted towards our sendme window, decrement
        // that window, and maybe remember the authentication tag.
2447
        if c_t_w {
2411
            let hop_num = Into::<usize>::into(hop);
2411
            let hop = &mut self.hops[hop_num];
2411
            // checked by earlier conditional, so this shouldn't fail
2411
            hop.sendwindow.take(tag)?;
2411
            if !stream_id.is_zero() {
                // We need to decrement the stream-level sendme window.
                // Stream data cells should only be dequeued and fed into this function if
                // the window is above zero, so we don't need to worry about enqueuing things.
2411
                if let Some(window) = hop.map.get_mut(stream_id).and_then(StreamEnt::send_window) {
2411
                    window.take(&())?;
                } else {
                    warn!(
                        "{}: sending a relay cell for non-existent or non-open stream with ID {}!",
                        self.unique_id, stream_id
                    );
                    return Err(Error::CircProto(format!(
                        "tried to send a relay cell on non-open stream {}",
                        stream_id
                    )));
                }
            }
36
        }
2448
        self.send_msg_direct(cx, msg)
2448
    }

            
    /// Try to install a given meta-cell handler to receive any unusual cells on
    /// this circuit, along with a result channel to notify on completion.
20
    fn set_meta_handler(
20
        &mut self,
20
        handler: Box<dyn MetaCellHandler>,
20
        done: ReactorResultChannel<()>,
20
    ) -> Result<()> {
20
        if self.meta_handler.is_none() {
20
            self.meta_handler = Some((handler, done));
20
            Ok(())
        } else {
            Err(Error::from(internal!(
                "Tried to install a meta-cell handler before the old one was gone."
            )))
        }
20
    }

            
    /// Handle a CtrlMsg other than Shutdown.
151
    fn handle_control(&mut self, cx: &mut Context<'_>, msg: CtrlMsg) -> Result<()> {
151
        trace!("{}: reactor received {:?}", self.unique_id, msg);
152
        match msg {
            // This is handled earlier, since it requires blocking.
            CtrlMsg::Create { .. } => panic!("got a CtrlMsg::Create in handle_control"),
            // This is handled earlier, since it requires generating a ReactorError.
            CtrlMsg::Shutdown => panic!("got a CtrlMsg::Shutdown in handle_control"),
            CtrlMsg::ExtendNtor {
20
                public_key,
20
                linkspecs,
20
                require_sendme_auth,
20
                params,
20
                done,
20
            } => {
20
                match CircuitExtender::<NtorClient, Tor1RelayCrypto, _, _>::begin(
20
                    cx,
20
                    0x02,
20
                    &public_key,
20
                    linkspecs,
20
                    require_sendme_auth,
20
                    params,
20
                    self,
20
                ) {
20
                    Ok(e) => {
20
                        self.set_meta_handler(Box::new(e), done)?;
                    }
                    Err(e) => {
                        let _ = done.send(Err(e));
                    }
                };
            }
            CtrlMsg::BeginStream {
12
                hop_num,
12
                message,
12
                sender,
12
                rx,
12
                done,
12
            } => {
12
                let ret = self.begin_stream(cx, hop_num, message, sender, rx);
12
                let _ = done.send(ret); // don't care if sender goes away
12
            }
            CtrlMsg::SendSendme { stream_id, hop_num } => {
                let sendme = Sendme::new_empty();
                let cell = RelayCell::new(stream_id, sendme.into());
                self.send_relay_cell(cx, hop_num, false, cell)?;
            }
            #[cfg(test)]
            CtrlMsg::AddFakeHop {
108
                supports_flowctrl_1,
108
                fwd_lasthop,
108
                rev_lasthop,
108
                params,
108
                done,
107
            } => {
107
                use crate::circuit::test::DummyCrypto;
107

            
107
                // This kinds of conversion is okay for testing, but just for testing.
108
                let require_sendme_auth = if supports_flowctrl_1 {
107
                    RequireSendmeAuth::Yes
                } else {
                    RequireSendmeAuth::No
                };

            
107
                let fwd = Box::new(DummyCrypto::new(fwd_lasthop));
107
                let rev = Box::new(DummyCrypto::new(rev_lasthop));
107
                self.add_hop(require_sendme_auth, fwd, rev, &params);
107
                let _ = done.send(Ok(()));
            }
            #[cfg(test)]
8
            CtrlMsg::QuerySendWindow { hop, done } => {
8
                let _ = done.send(if let Some(hop) = self.hop_mut(hop) {
8
                    Ok(hop.sendwindow.window_and_expected_tags())
                } else {
                    Err(Error::from(internal!(
                        "received QuerySendWindow for unknown hop {:?}",
                        hop
                    )))
                });
            }
            #[cfg(test)]
4
            CtrlMsg::SendRelayCell { hop, early, cell } => {
4
                self.send_relay_cell(cx, hop, early, cell)?;
            }
        }
151
        Ok(())
151
    }

            
    /// Start a stream. Creates an entry in the stream map with the given channels, and sends the
    /// `message` to the provided hop.
12
    fn begin_stream(
12
        &mut self,
12
        cx: &mut Context<'_>,
12
        hopnum: HopNum,
12
        message: RelayMsg,
12
        sender: mpsc::Sender<RelayMsg>,
12
        rx: mpsc::Receiver<RelayMsg>,
12
    ) -> Result<StreamId> {
12
        let hop = self
12
            .hop_mut(hopnum)
12
            .ok_or_else(|| Error::from(internal!("No such hop {:?}", hopnum)))?;
12
        let send_window = StreamSendWindow::new(SEND_WINDOW_INIT);
12
        let r = hop.map.add_ent(sender, rx, send_window)?;
12
        let cell = RelayCell::new(r, message);
12
        self.send_relay_cell(cx, hopnum, false, cell)?;
12
        Ok(r)
12
    }

            
    /// Close the stream associated with `id` because the stream was
    /// dropped.
    ///
    /// If we have not already received an END cell on this stream, send one.
    fn close_stream(&mut self, cx: &mut Context<'_>, hopnum: HopNum, id: StreamId) -> Result<()> {
        // Mark the stream as closing.
        let hop = self.hop_mut(hopnum).ok_or_else(|| {
            Error::from(internal!(
                "Tried to close a stream on a hop {:?} that wasn't there?",
                hopnum
            ))
        })?;

            
        let should_send_end = hop.map.terminate(id)?;
        trace!(
            "{}: Ending stream {}; should_send_end={:?}",
            self.unique_id,
            id,
            should_send_end
        );
        // TODO: I am about 80% sure that we only send an END cell if
        // we didn't already get an END cell.  But I should double-check!
        if should_send_end == ShouldSendEnd::Send {
            let end_cell = RelayCell::new(id, End::new_misc().into());
            self.send_relay_cell(cx, hopnum, false, end_cell)?;
        }
        Ok(())
    }

            
    /// Helper: process a cell on a channel.  Most cells get ignored
    /// or rejected; a few get delivered to circuits.
    ///
    /// Return true if we should exit.
52
    fn handle_cell(&mut self, cx: &mut Context<'_>, cell: ClientCircChanMsg) -> Result<CellStatus> {
52
        trace!("{}: handling cell: {:?}", self.unique_id, cell);
        use ClientCircChanMsg::*;
52
        match cell {
48
            Relay(r) => Ok(self.handle_relay_cell(cx, r)?),
4
            Destroy(d) => {
4
                let reason = d.reason();
4
                debug!(
                    "{}: Received DESTROY cell. Reason: {} [{}]",
                    self.unique_id,
                    reason.human_str(),
                    reason
                );

            
4
                self.handle_destroy_cell()?;
4
                Ok(CellStatus::CleanShutdown)
            }
        }
52
    }

            
    /// React to a Relay or RelayEarly cell.
48
    fn handle_relay_cell(&mut self, cx: &mut Context<'_>, cell: Relay) -> Result<CellStatus> {
48
        let mut body = cell.into_relay_body().into();

            
        // Decrypt the cell. If it's recognized, then find the
        // corresponding hop.
48
        let (hopnum, tag) = self.crypto_in.decrypt(&mut body)?;
        // Make a copy of the authentication tag. TODO: I'd rather not
        // copy it, but I don't see a way around it right now.
48
        let tag = {
48
            let mut tag_copy = [0_u8; 20];
48
            // TODO(nickm): This could crash if the tag length changes.  We'll
48
            // have to refactor it then.
48
            tag_copy.copy_from_slice(tag);
48
            tag_copy
        };
        // Decode the cell.
48
        let msg = RelayCell::decode(body.into())?;

            
48
        let c_t_w = sendme::cell_counts_towards_windows(&msg);

            
        // Decrement the circuit sendme windows, and see if we need to
        // send a sendme cell.
48
        let send_circ_sendme = if c_t_w {
4
            let hop = self
4
                .hop_mut(hopnum)
4
                .ok_or_else(|| Error::CircProto("Sendme from nonexistent hop".into()))?;
4
            hop.recvwindow.take()?
        } else {
44
            false
        };
        // If we do need to send a circuit-level SENDME cell, do so.
48
        if send_circ_sendme {
            // This always sends a V1 (tagged) sendme cell, and thereby assumes
            // that SendmeEmitMinVersion is no more than 1.  If the authorities
            // every increase that parameter to a higher number, this will
            // become incorrect.  (Higher numbers are not currently defined.)
            let sendme = Sendme::new_tag(tag);
            let cell = RelayCell::new(0.into(), sendme.into());
            self.send_relay_cell(cx, hopnum, false, cell)?;
            self.hop_mut(hopnum)
                .ok_or_else(|| {
                    Error::from(internal!(
                        "Trying to send SENDME to nonexistent hop {:?}",
                        hopnum
                    ))
                })?
                .recvwindow
                .put();
48
        }

            
        // Break the message apart into its streamID and message.
48
        let (streamid, msg) = msg.into_streamid_and_msg();
48

            
48
        // If this cell wants/refuses to have a Stream ID, does it
48
        // have/not have one?
48
        if !msg.cmd().accepts_streamid_val(streamid) {
            return Err(Error::CircProto(format!(
                "Invalid stream ID {} for relay command {}",
                streamid,
                msg.cmd()
            )));
48
        }
48

            
48
        // If this has a reasonable streamID value of 0, it's a meta cell,
48
        // not meant for a particular stream.
48
        if streamid.is_zero() {
24
            return self.handle_meta_cell(hopnum, msg);
24
        }

            
24
        let hop = self
24
            .hop_mut(hopnum)
24
            .ok_or_else(|| Error::CircProto("Cell from nonexistent hop!".into()))?;
24
        match hop.map.get_mut(streamid) {
            Some(StreamEnt::Open {
24
                sink,
24
                send_window,
24
                dropped,
24
                ref mut received_connected,
24
                ..
24
            }) => {
24
                // The stream for this message exists, and is open.
24

            
24
                if let RelayMsg::Sendme(_) = msg {
                    // We need to handle sendmes here, not in the stream's
                    // recv() method, or else we'd never notice them if the
                    // stream isn't reading.
4
                    send_window.put(Some(()))?;
4
                    return Ok(CellStatus::Continue);
20
                }

            
20
                if matches!(msg, RelayMsg::Connected(_)) {
12
                    // Remember that we've received a Connected cell, and can't get another,
12
                    // even if we become a HalfStream.  (This rule is enforced separately at
12
                    // DataStreamReader.)
12
                    *received_connected = true;
12
                }

            
                // Remember whether this was an end cell: if so we should
                // close the stream.
20
                let is_end_cell = matches!(msg, RelayMsg::End(_));

            
                // TODO: Add a wrapper type here to reject cells that should
                // never go to a client, like BEGIN.
20
                if let Err(e) = sink.try_send(msg) {
                    if e.is_full() {
                        // If we get here, we either have a logic bug (!), or an attacker
                        // is sending us more cells than we asked for via congestion control.
                        return Err(Error::CircProto(format!(
                            "Stream sink would block; received too many cells on stream ID {}",
                            streamid,
                        )));
                    }
                    if e.is_disconnected() && c_t_w {
                        // the other side of the stream has gone away; remember
                        // that we received a cell that we couldn't queue for it.
                        //
                        // Later this value will be recorded in a half-stream.
                        *dropped += 1;
                    }
20
                }
20
                if is_end_cell {
4
                    hop.map.end_received(streamid)?;
16
                }
            }
            Some(StreamEnt::EndSent(halfstream)) => {
                // We sent an end but maybe the other side hasn't heard.

            
                if matches!(msg, RelayMsg::End(_)) {
                    hop.map.end_received(streamid)?;
                } else {
                    halfstream.handle_msg(&msg)?;
                }
            }
            _ => {
                // No stream wants this message.
                return Err(Error::CircProto(
                    "Cell received on nonexistent stream!?".into(),
                ));
            }
        }
20
        Ok(CellStatus::Continue)
48
    }

            
    /// Helper: process a destroy cell.
    #[allow(clippy::unnecessary_wraps)]
4
    fn handle_destroy_cell(&mut self) -> Result<()> {
4
        // I think there is nothing more to do here.
4
        Ok(())
4
    }

            
    /// Return the hop corresponding to `hopnum`, if there is one.
56
    fn hop_mut(&mut self, hopnum: HopNum) -> Option<&mut CircHop> {
56
        self.hops.get_mut(Into::<usize>::into(hopnum))
56
    }
}

            
impl Drop for Reactor {
48
    fn drop(&mut self) {
48
        let _ = self.channel.close_circuit(self.channel_id);
48
    }
}

            
#[cfg(test)]
mod test {}