1
//! Multi-hop paths over the Tor network.
2
//!
3
//! Right now, we only implement "client circuits" -- also sometimes
4
//! called "origin circuits".  A client circuit is one that is
5
//! constructed by this Tor instance, and used in its own behalf to
6
//! send data over the Tor network.
7
//!
8
//! Each circuit has multiple hops over the Tor network: each hop
9
//! knows only the hop before and the hop after.  The client shares a
10
//! separate set of keys with each hop.
11
//!
12
//! To build a circuit, first create a [crate::channel::Channel], then
13
//! call its [crate::channel::Channel::new_circ] method.  This yields
14
//! a [PendingClientCirc] object that won't become live until you call
15
//! one of the methods that extends it to its first hop.  After you've
16
//! done that, you can call [ClientCirc::extend_ntor] on the circuit to
17
//! build it into a multi-hop circuit.  Finally, you can use
18
//! [ClientCirc::begin_stream] to get a Stream object that can be used
19
//! for anonymized data.
20
//!
21
//! # Implementation
22
//!
23
//! Each open circuit has a corresponding Reactor object that runs in
24
//! an asynchronous task, and manages incoming cells from the
25
//! circuit's upstream channel.  These cells are either RELAY cells or
26
//! DESTROY cells.  DESTROY cells are handled immediately.
27
//! RELAY cells are either for a particular stream, in which case they
28
//! get forwarded to a RawCellStream object, or for no particular stream,
29
//! in which case they are considered "meta" cells (like EXTENDED2)
30
//! that should only get accepted if something is waiting for them.
31
//!
32
//! # Limitations
33
//!
34
//! This is client-only.
35
//!
36
//! There's one big mutex on the whole circuit: the reactor needs to hold
37
//! it to process a cell, and streams need to hold it to send.
38
//!
39
//! There is no flow-control or rate-limiting or fairness.
40

            
41
pub(crate) mod celltypes;
42
pub(crate) mod halfcirc;
43
mod halfstream;
44
pub(crate) mod reactor;
45
pub(crate) mod sendme;
46
mod streammap;
47
mod unique_id;
48

            
49
use crate::channel::Channel;
50
use crate::circuit::celltypes::*;
51
use crate::circuit::reactor::{
52
    CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
53
};
54
pub use crate::circuit::unique_id::UniqId;
55
use crate::crypto::cell::{HopNum, InboundClientCrypt, OutboundClientCrypt};
56
use crate::stream::{DataStream, ResolveStream, StreamParameters, StreamReader};
57
use crate::{Error, Result};
58
use tor_cell::{
59
    chancell::{self, msg::ChanMsg, CircId},
60
    relaycell::msg::{Begin, RelayMsg, Resolve, Resolved, ResolvedVal},
61
};
62

            
63
use tor_error::{bad_api_usage, internal};
64
use tor_linkspec::{CircTarget, LinkSpec};
65

            
66
use futures::channel::{mpsc, oneshot};
67

            
68
use crate::circuit::sendme::StreamRecvWindow;
69
use futures::SinkExt;
70
use std::net::IpAddr;
71
use std::sync::atomic::{AtomicU8, Ordering};
72
use std::sync::Arc;
73
use tor_cell::relaycell::StreamId;
74
// use std::time::Duration;
75

            
76
use crate::crypto::handshake::ntor::NtorPublicKey;
77

            
78
use self::reactor::RequireSendmeAuth;
79

            
80
/// The size of the buffer for communication between `ClientCirc` and its reactor.
81
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
82

            
83
32
#[derive(Clone, Debug)]
84
/// A circuit that we have constructed over the Tor network.
85
///
86
/// This struct is the interface used by the rest of the code, It is fairly
87
/// cheaply cloneable.  None of the public methods need mutable access, since
88
/// they all actually communicate with the Reactor which contains the primary
89
/// mutable state, and does the actual work.
90
//
91
// Effectively, this struct contains two Arcs: one for `hops` and one for
92
// `control` (which surely has something Arc-like in it).  We cannot unify
93
// these by putting a single Arc around the whole struct, and passing
94
// an Arc strong reference to the `Reactor`, because then `control` would
95
// not be dropped when the last user of the circuit goes away.  We could
96
// make the reactor have a weak reference but weak references are more
97
// expensive to dereference.
98
//
99
// Because of the above, cloning this struct is always going to involve
100
// two atomic refcount changes/checks.  Wrapping it in another Arc would
101
// be overkill.
102
pub struct ClientCirc {
103
    /// Number of hops on this circuit.
104
    ///
105
    /// This value is incremented after the circuit successfully completes extending to a new hop.
106
    hops: Arc<AtomicU8>,
107
    /// A unique identifier for this circuit.
108
    unique_id: UniqId,
109
    /// Channel to send control messages to the reactor.
110
    control: mpsc::UnboundedSender<CtrlMsg>,
111
    /// For testing purposes: the CircId, for use in peek_circid().
112
    #[cfg(test)]
113
    circid: CircId,
114
}
115

            
116
/// A ClientCirc that needs to send a create cell and receive a created* cell.
117
///
118
/// To use one of these, call create_firsthop_fast() or create_firsthop_ntor()
119
/// to negotiate the cryptographic handshake with the first hop.
120
pub struct PendingClientCirc {
121
    /// A oneshot receiver on which we'll receive a CREATED* cell,
122
    /// or a DESTROY cell.
123
    recvcreated: oneshot::Receiver<CreateResponse>,
124
    /// The ClientCirc object that we can expose on success.
125
    circ: ClientCirc,
126
}
127

            
128
/// Description of the network's current rules for building circuits.
129
44
#[derive(Clone, Debug)]
130
pub struct CircParameters {
131
    /// Initial value to use for our outbound circuit-level windows.
132
    initial_send_window: u16,
133
    /// Whether we should include ed25519 identities when we send
134
    /// EXTEND2 cells.
135
    extend_by_ed25519_id: bool,
136
}
137

            
138
impl Default for CircParameters {
139
457
    fn default() -> CircParameters {
140
457
        CircParameters {
141
457
            initial_send_window: 1000,
142
457
            extend_by_ed25519_id: true,
143
457
        }
144
457
    }
145
}
146

            
147
impl CircParameters {
148
    /// Override the default initial send window for these parameters.
149
    /// Gives an error on any value above 1000.
150
    ///
151
    /// You should probably not call this.
152
53
    pub fn set_initial_send_window(&mut self, v: u16) -> Result<()> {
153
53
        if v <= 1000 {
154
52
            self.initial_send_window = v;
155
52
            Ok(())
156
        } else {
157
1
            Err(Error::from(bad_api_usage!(
158
1
                "Tried to set an initial send window over 1000"
159
1
            )))
160
        }
161
53
    }
162

            
163
    /// Return the initial send window as set in this parameter set.
164
173
    pub fn initial_send_window(&self) -> u16 {
165
173
        self.initial_send_window
166
173
    }
167

            
168
    /// Override the default decision about whether to use ed25519
169
    /// identities in outgoing EXTEND2 cells.
170
    ///
171
    /// You should probably not call this.
172
52
    pub fn set_extend_by_ed25519_id(&mut self, v: bool) {
173
52
        self.extend_by_ed25519_id = v;
174
52
    }
175

            
176
    /// Return true if we're configured to extend by ed25519 ID; false
177
    /// otherwise.
178
73
    pub fn extend_by_ed25519_id(&self) -> bool {
179
73
        self.extend_by_ed25519_id
180
73
    }
181
}
182

            
183
/// A stream on a particular circuit.
184
12
#[derive(Clone, Debug)]
185
pub(crate) struct StreamTarget {
186
    /// Which hop of the circuit this stream is with.
187
    hop_num: HopNum,
188
    /// Reactor ID for this stream.
189
    stream_id: StreamId,
190
    /// Channel to send cells down.
191
    tx: mpsc::Sender<RelayMsg>,
192
    /// Reference to the circuit that this stream is on.
193
    circ: ClientCirc,
194
}
195

            
196
impl ClientCirc {
197
    /// Extend the circuit via the ntor handshake to a new target last
198
    /// hop.
199
20
    pub async fn extend_ntor<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
200
20
    where
201
20
        Tg: CircTarget,
202
20
    {
203
20
        let key = NtorPublicKey {
204
20
            id: *target.rsa_identity(),
205
20
            pk: *target.ntor_onion_key(),
206
20
        };
207
20
        let mut linkspecs = target.linkspecs();
208
20
        if !params.extend_by_ed25519_id() {
209
            linkspecs.retain(|ls| !matches!(ls, LinkSpec::Ed25519Id(_)));
210
20
        }
211
        // FlowCtrl=1 means that this hop supports authenticated SENDMEs
212
20
        let require_sendme_auth = RequireSendmeAuth::from_protocols(target.protovers());
213
20

            
214
20
        let (tx, rx) = oneshot::channel();
215
20

            
216
20
        self.control
217
20
            .unbounded_send(CtrlMsg::ExtendNtor {
218
20
                public_key: key,
219
20
                linkspecs,
220
20
                require_sendme_auth,
221
20
                params: params.clone(),
222
20
                done: tx,
223
20
            })
224
20
            .map_err(|_| Error::CircuitClosed)?;
225

            
226
23
        rx.await.map_err(|_| Error::CircuitClosed)??;
227

            
228
4
        Ok(())
229
20
    }
230

            
231
    /// Helper, used to begin a stream.
232
    ///
233
    /// This function allocates a stream ID, and sends the message
234
    /// (like a BEGIN or RESOLVE), but doesn't wait for a response.
235
    ///
236
    /// The caller will typically want to see the first cell in response,
237
    /// to see whether it is e.g. an END or a CONNECTED.
238
12
    async fn begin_stream_impl(&self, begin_msg: RelayMsg) -> Result<(StreamReader, StreamTarget)> {
239
12
        // TODO: Possibly this should take a hop, rather than just
240
12
        // assuming it's the last hop.
241
12

            
242
12
        let num_hops = self.hops.load(Ordering::SeqCst);
243
12
        if num_hops == 0 {
244
            return Err(Error::from(internal!(
245
                "Can't begin a stream at the 0th hop"
246
            )));
247
12
        }
248
12
        let hop_num: HopNum = (num_hops - 1).into();
249
12
        let (sender, receiver) = mpsc::channel(STREAM_READER_BUFFER);
250
12
        let (tx, rx) = oneshot::channel();
251
12
        let (msg_tx, msg_rx) = mpsc::channel(CIRCUIT_BUFFER_SIZE);
252
12

            
253
12
        self.control
254
12
            .unbounded_send(CtrlMsg::BeginStream {
255
12
                hop_num,
256
12
                message: begin_msg,
257
12
                sender,
258
12
                rx: msg_rx,
259
12
                done: tx,
260
12
            })
261
12
            .map_err(|_| Error::CircuitClosed)?;
262

            
263
12
        let stream_id = rx.await.map_err(|_| Error::CircuitClosed)??;
264

            
265
12
        let target = StreamTarget {
266
12
            circ: self.clone(),
267
12
            tx: msg_tx,
268
12
            hop_num,
269
12
            stream_id,
270
12
        };
271
12

            
272
12
        let reader = StreamReader {
273
12
            target: target.clone(),
274
12
            receiver,
275
12
            recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
276
12
            ended: false,
277
12
        };
278
12

            
279
12
        Ok((reader, target))
280
12
    }
281

            
282
    /// Start a DataStream (anonymized connection) to the given
283
    /// address and port, using a BEGIN cell.
284
12
    async fn begin_data_stream(&self, msg: RelayMsg, optimistic: bool) -> Result<DataStream> {
285
12
        let (reader, target) = self.begin_stream_impl(msg).await?;
286
12
        let mut stream = DataStream::new(reader, target);
287
12
        if !optimistic {
288
14
            stream.wait_for_connection().await?;
289
4
        }
290
12
        Ok(stream)
291
12
    }
292

            
293
    /// Start a stream to the given address and port, using a BEGIN
294
    /// cell.
295
    ///
296
    /// The use of a string for the address is intentional: you should let
297
    /// the remote Tor relay do the hostname lookup for you.
298
8
    pub async fn begin_stream(
299
8
        &self,
300
8
        target: &str,
301
8
        port: u16,
302
8
        parameters: Option<StreamParameters>,
303
8
    ) -> Result<DataStream> {
304
8
        let parameters = parameters.unwrap_or_default();
305
8
        let begin_flags = parameters.begin_flags();
306
8
        let optimistic = parameters.is_optimistic();
307
8
        let beginmsg = Begin::new(target, port, begin_flags)?;
308
22
        self.begin_data_stream(beginmsg.into(), optimistic).await
309
8
    }
310

            
311
    /// Start a new stream to the last relay in the circuit, using
312
    /// a BEGIN_DIR cell.
313
4
    pub async fn begin_dir_stream(&self) -> Result<DataStream> {
314
        // Note that we always open begindir connections optimistically.
315
        // Since they are local to a relay that we've already authenticated
316
        // with and built a circuit to, there should be no additional checks
317
        // we need to perform to see whether the BEGINDIR will succeed.
318
4
        self.begin_data_stream(RelayMsg::BeginDir, true).await
319
4
    }
320

            
321
    /// Perform a DNS lookup, using a RESOLVE cell with the last relay
322
    /// in this circuit.
323
    ///
324
    /// Note that this function does not check for timeouts; that's
325
    /// the caller's responsibility.
326
    pub async fn resolve(&self, hostname: &str) -> Result<Vec<IpAddr>> {
327
        let resolve_msg = Resolve::new(hostname);
328

            
329
        let resolved_msg = self.try_resolve(resolve_msg).await?;
330

            
331
        resolved_msg
332
            .into_answers()
333
            .into_iter()
334
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
335
                Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
336
                Ok(_) => None,
337
                Err(e) => Some(Err(e)),
338
            })
339
            .collect()
340
    }
341

            
342
    /// Perform a reverse DNS lookup, by sending a RESOLVE cell with
343
    /// the last relay on this circuit.
344
    ///
345
    /// Note that this function does not check for timeouts; that's
346
    /// the caller's responsibility.
347
    pub async fn resolve_ptr(&self, addr: IpAddr) -> Result<Vec<String>> {
348
        let resolve_ptr_msg = Resolve::new_reverse(&addr);
349

            
350
        let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
351

            
352
        resolved_msg
353
            .into_answers()
354
            .into_iter()
355
            .filter_map(|(val, _)| match resolvedval_to_result(val) {
356
                Ok(ResolvedVal::Hostname(v)) => Some(
357
                    String::from_utf8(v)
358
                        .map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
359
                ),
360
                Ok(_) => None,
361
                Err(e) => Some(Err(e)),
362
            })
363
            .collect()
364
    }
365

            
366
    /// Helper: Send the resolve message, and read resolved message from
367
    /// resolve stream.
368
    async fn try_resolve(&self, msg: Resolve) -> Result<Resolved> {
369
        let (reader, _) = self.begin_stream_impl(msg.into()).await?;
370
        let mut resolve_stream = ResolveStream::new(reader);
371
        resolve_stream.read_msg().await
372
    }
373

            
374
    /// Shut down this circuit, along with all streams that are using it.
375
    /// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
376
    /// immediately after this function returns!).
377
    ///
378
    /// Note that other references to this circuit may exist.  If they
379
    /// do, they will stop working after you call this function.
380
    ///
381
    /// It's not necessary to call this method if you're just done
382
    /// with a circuit: the channel should close on its own once nothing
383
    /// is using it any more.
384
    pub fn terminate(&self) {
385
        let _ = self.control.unbounded_send(CtrlMsg::Shutdown);
386
    }
387

            
388
    /// Called when a circuit-level protocol error has occurred and the
389
    /// circuit needs to shut down.
390
    ///
391
    /// This is a separate function because we may eventually want to have
392
    /// it do more than just shut down.
393
    ///
394
    /// As with `terminate`, this function is asynchronous.
395
    pub(crate) fn protocol_error(&self) {
396
        self.terminate();
397
    }
398

            
399
    /// Return true if this circuit is closed and therefore unusable.
400
    pub fn is_closing(&self) -> bool {
401
        self.control.is_closed()
402
    }
403

            
404
    /// Return a process-unique identifier for this circuit.
405
    pub fn unique_id(&self) -> UniqId {
406
        self.unique_id
407
    }
408

            
409
    #[cfg(test)]
410
19
    pub fn n_hops(&self) -> u8 {
411
19
        self.hops.load(Ordering::SeqCst)
412
19
    }
413
}
414

            
415
impl PendingClientCirc {
416
    /// Instantiate a new circuit object: used from Channel::new_circ().
417
    ///
418
    /// Does not send a CREATE* cell on its own.
419
    ///
420
    ///
421
48
    pub(crate) fn new(
422
48
        id: CircId,
423
48
        channel: Channel,
424
48
        createdreceiver: oneshot::Receiver<CreateResponse>,
425
48
        input: mpsc::Receiver<ClientCircChanMsg>,
426
48
        unique_id: UniqId,
427
48
    ) -> (PendingClientCirc, reactor::Reactor) {
428
48
        let crypto_out = OutboundClientCrypt::new();
429
48
        let (control_tx, control_rx) = mpsc::unbounded();
430
48
        let num_hops = Arc::new(AtomicU8::new(0));
431
48

            
432
48
        let reactor = Reactor {
433
48
            control: control_rx,
434
48
            outbound: Default::default(),
435
48
            channel,
436
48
            input,
437
48
            crypto_in: InboundClientCrypt::new(),
438
48
            hops: vec![],
439
48
            unique_id,
440
48
            channel_id: id,
441
48
            crypto_out,
442
48
            meta_handler: None,
443
48
            num_hops: Arc::clone(&num_hops),
444
48
        };
445
48

            
446
48
        let circuit = ClientCirc {
447
48
            hops: num_hops,
448
48
            unique_id,
449
48
            control: control_tx,
450
48
            #[cfg(test)]
451
48
            circid: id,
452
48
        };
453
48

            
454
48
        let pending = PendingClientCirc {
455
48
            recvcreated: createdreceiver,
456
48
            circ: circuit,
457
48
        };
458
48
        (pending, reactor)
459
48
    }
460

            
461
    /// Testing only: Extract the circuit ID for this pending circuit.
462
    #[cfg(test)]
463
4
    pub(crate) fn peek_circid(&self) -> CircId {
464
4
        self.circ.circid
465
4
    }
466

            
467
    /// Use the (questionable!) CREATE_FAST handshake to connect to the
468
    /// first hop of this circuit.
469
    ///
470
    /// There's no authentication in CRATE_FAST,
471
    /// so we don't need to know whom we're connecting to: we're just
472
    /// connecting to whichever relay the channel is for.
473
4
    pub async fn create_firsthop_fast(self, params: &CircParameters) -> Result<ClientCirc> {
474
4
        let (tx, rx) = oneshot::channel();
475
4
        self.circ
476
4
            .control
477
4
            .unbounded_send(CtrlMsg::Create {
478
4
                recv_created: self.recvcreated,
479
4
                handshake: CircuitHandshake::CreateFast,
480
4
                require_sendme_auth: RequireSendmeAuth::No,
481
4
                params: params.clone(),
482
4
                done: tx,
483
4
            })
484
4
            .map_err(|_| Error::CircuitClosed)?;
485

            
486
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
487

            
488
4
        Ok(self.circ)
489
4
    }
490

            
491
    /// Use the ntor handshake to connect to the first hop of this circuit.
492
    ///
493
    /// Note that the provided 'target' must match the channel's target,
494
    /// or the handshake will fail.
495
4
    pub async fn create_firsthop_ntor<Tg>(
496
4
        self,
497
4
        target: &Tg,
498
4
        params: CircParameters,
499
4
    ) -> Result<ClientCirc>
500
4
    where
501
4
        Tg: tor_linkspec::CircTarget,
502
4
    {
503
4
        let (tx, rx) = oneshot::channel();
504
4
        let require_sendme_auth = RequireSendmeAuth::from_protocols(target.protovers());
505
4

            
506
4
        self.circ
507
4
            .control
508
4
            .unbounded_send(CtrlMsg::Create {
509
4
                recv_created: self.recvcreated,
510
4
                handshake: CircuitHandshake::Ntor {
511
4
                    public_key: NtorPublicKey {
512
4
                        id: *target.rsa_identity(),
513
4
                        pk: *target.ntor_onion_key(),
514
4
                    },
515
4
                    ed_identity: *target.ed_identity(),
516
4
                },
517
4
                require_sendme_auth,
518
4
                params: params.clone(),
519
4
                done: tx,
520
4
            })
521
4
            .map_err(|_| Error::CircuitClosed)?;
522

            
523
8
        rx.await.map_err(|_| Error::CircuitClosed)??;
524

            
525
4
        Ok(self.circ)
526
4
    }
527
}
528

            
529
/// An object that can put a given handshake into a ChanMsg for a CREATE*
530
/// cell, and unwrap a CREATED* cell.
531
trait CreateHandshakeWrap {
532
    /// Construct an appropriate ChanMsg to hold this kind of handshake.
533
    fn to_chanmsg(&self, bytes: Vec<u8>) -> ChanMsg;
534
    /// Decode a ChanMsg to an appropriate handshake value, checking
535
    /// its type.
536
    fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>>;
537
}
538

            
539
/// A CreateHandshakeWrap that generates CREATE_FAST and handles CREATED_FAST.
540
struct CreateFastWrap;
541

            
542
impl CreateHandshakeWrap for CreateFastWrap {
543
4
    fn to_chanmsg(&self, bytes: Vec<u8>) -> ChanMsg {
544
4
        chancell::msg::CreateFast::new(bytes).into()
545
4
    }
546
4
    fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>> {
547
4
        use CreateResponse::*;
548
4
        match msg {
549
4
            CreatedFast(m) => Ok(m.into_body()),
550
            Destroy(_) => Err(Error::CircRefused(
551
                "Relay replied to CREATE_FAST with DESTROY.",
552
            )),
553
            _ => Err(Error::CircProto(format!(
554
                "Relay replied to CREATE_FAST with unexpected cell: {:?}",
555
                msg
556
            ))),
557
        }
558
4
    }
559
}
560

            
561
/// A CreateHandshakeWrap that generates CREATE2 and handles CREATED2
562
struct Create2Wrap {
563
    /// The handshake type to put in the CREATE2 cell.
564
    handshake_type: u16,
565
}
566
impl CreateHandshakeWrap for Create2Wrap {
567
4
    fn to_chanmsg(&self, bytes: Vec<u8>) -> ChanMsg {
568
4
        chancell::msg::Create2::new(self.handshake_type, bytes).into()
569
4
    }
570
4
    fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>> {
571
4
        use CreateResponse::*;
572
4
        match msg {
573
4
            Created2(m) => Ok(m.into_body()),
574
            Destroy(_) => Err(Error::CircRefused("Relay replied to CREATE2 with DESTROY.")),
575
            _ => Err(Error::CircProto(format!(
576
                "Relay replied to CREATE2 with unexpected cell {:?}",
577
                msg
578
            ))),
579
        }
580
4
    }
581
}
582

            
583
impl StreamTarget {
584
    /// Deliver a relay message for the stream that owns this StreamTarget.
585
    ///
586
    /// The StreamTarget will set the correct stream ID and pick the
587
    /// right hop, but will not validate that the message is well-formed
588
    /// or meaningful in context.
589
2412
    pub(crate) async fn send(&mut self, msg: RelayMsg) -> Result<()> {
590
2412
        self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
591
2412
        Ok(())
592
2412
    }
593

            
594
    /// Called when a circuit-level protocol error has occurred and the
595
    /// circuit needs to shut down.
596
    pub(crate) fn protocol_error(&mut self) {
597
        self.circ.protocol_error();
598
    }
599

            
600
    /// Send a SENDME cell for this stream.
601
    pub(crate) fn send_sendme(&mut self) -> Result<()> {
602
        self.circ
603
            .control
604
            .unbounded_send(CtrlMsg::SendSendme {
605
                stream_id: self.stream_id,
606
                hop_num: self.hop_num,
607
            })
608
            .map_err(|_| Error::CircuitClosed)?;
609
        Ok(())
610
    }
611
}
612

            
613
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
614
/// it represents an error.
615
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
616
    match val {
617
        ResolvedVal::TransientError => Err(Error::ResolveError(
618
            "Received retriable transient error".into(),
619
        )),
620
        ResolvedVal::NontransientError => {
621
            Err(Error::ResolveError("Received not retriable error.".into()))
622
        }
623
        _ => Ok(val),
624
    }
625
}
626

            
627
#[cfg(test)]
628
mod test {
629
    #![allow(clippy::unwrap_used)]
630

            
631
    use super::*;
632
    use crate::channel::{test::new_reactor, CodecError};
633
    use crate::crypto::cell::RelayCellBody;
634
    use chanmsg::{ChanMsg, Created2, CreatedFast};
635
    use futures::channel::mpsc::{Receiver, Sender};
636
    use futures::io::{AsyncReadExt, AsyncWriteExt};
637
    use futures::sink::SinkExt;
638
    use futures::stream::StreamExt;
639
    use futures::task::SpawnExt;
640
    use hex_literal::hex;
641
    use rand::thread_rng;
642
    use std::time::Duration;
643
    use tor_cell::chancell::{msg as chanmsg, ChanCell};
644
    use tor_cell::relaycell::{msg as relaymsg, RelayCell, StreamId};
645
    use tor_llcrypto::pk;
646
    use tor_rtcompat::{Runtime, SleepProvider};
647
    use tracing::trace;
648

            
649
    fn rmsg_to_ccmsg<ID>(id: ID, msg: relaymsg::RelayMsg) -> ClientCircChanMsg
650
    where
651
        ID: Into<StreamId>,
652
    {
653
        let body: RelayCellBody = RelayCell::new(id.into(), msg)
654
            .encode(&mut thread_rng())
655
            .unwrap()
656
            .into();
657
        let chanmsg = chanmsg::Relay::from_raw(body.into());
658
        ClientCircChanMsg::Relay(chanmsg)
659
    }
660

            
661
    struct ExampleTarget {
662
        ntor_key: pk::curve25519::PublicKey,
663
        protovers: tor_protover::Protocols,
664
        ed_id: pk::ed25519::Ed25519Identity,
665
        rsa_id: pk::rsa::RsaIdentity,
666
    }
667
    impl tor_linkspec::ChanTarget for ExampleTarget {
668
        fn addrs(&self) -> &[std::net::SocketAddr] {
669
            &[]
670
        }
671
        fn ed_identity(&self) -> &pk::ed25519::Ed25519Identity {
672
            &self.ed_id
673
        }
674
        fn rsa_identity(&self) -> &pk::rsa::RsaIdentity {
675
            &self.rsa_id
676
        }
677
    }
678
    impl tor_linkspec::CircTarget for ExampleTarget {
679
        fn ntor_onion_key(&self) -> &pk::curve25519::PublicKey {
680
            &self.ntor_key
681
        }
682
        fn protovers(&self) -> &tor_protover::Protocols {
683
            &self.protovers
684
        }
685
    }
686
    /// return an ExampleTarget that can get used for an ntor handshake.
687
    fn example_target() -> ExampleTarget {
688
        ExampleTarget {
689
            ntor_key: hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253")
690
                .into(),
691
            protovers: "FlowCtrl=1".parse().unwrap(),
692
            ed_id: [6_u8; 32].into(),
693
            rsa_id: [10_u8; 20].into(),
694
        }
695
    }
696
    fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
697
        crate::crypto::handshake::ntor::NtorSecretKey::new(
698
            hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803").into(),
699
            hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253").into(),
700
            [10_u8; 20].into(),
701
        )
702
    }
703

            
704
    fn working_fake_channel<R: Runtime>(
705
        rt: &R,
706
    ) -> (
707
        Channel,
708
        Receiver<ChanCell>,
709
        Sender<std::result::Result<ChanCell, CodecError>>,
710
    ) {
711
        let (channel, chan_reactor, rx, tx) = new_reactor();
712
        rt.spawn(async {
713
            let _ignore = chan_reactor.run().await;
714
        })
715
        .unwrap();
716
        (channel, rx, tx)
717
    }
718

            
719
    async fn test_create<R: Runtime>(rt: &R, fast: bool) {
720
        // We want to try progressing from a pending circuit to a circuit
721
        // via a crate_fast handshake.
722

            
723
        use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
724

            
725
        let (chan, mut rx, _sink) = working_fake_channel(rt);
726
        let circid = 128.into();
727
        let (created_send, created_recv) = oneshot::channel();
728
        let (_circmsg_send, circmsg_recv) = mpsc::channel(64);
729
        let unique_id = UniqId::new(23, 17);
730

            
731
        let (pending, reactor) =
732
            PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
733

            
734
        rt.spawn(async {
735
            let _ignore = reactor.run().await;
736
        })
737
        .unwrap();
738

            
739
        // Future to pretend to be a relay on the other end of the circuit.
740
        let simulate_relay_fut = async move {
741
            let mut rng = rand::thread_rng();
742
            let create_cell = rx.next().await.unwrap();
743
            assert_eq!(create_cell.circid(), 128.into());
744
            let reply = if fast {
745
                let cf = match create_cell.msg() {
746
                    ChanMsg::CreateFast(cf) => cf,
747
                    _ => panic!(),
748
                };
749
                let (_, rep) = CreateFastServer::server(&mut rng, &[()], cf.body()).unwrap();
750
                CreateResponse::CreatedFast(CreatedFast::new(rep))
751
            } else {
752
                let c2 = match create_cell.msg() {
753
                    ChanMsg::Create2(c2) => c2,
754
                    _ => panic!(),
755
                };
756
                let (_, rep) =
757
                    NtorServer::server(&mut rng, &[example_ntor_key()], c2.body()).unwrap();
758
                CreateResponse::Created2(Created2::new(rep))
759
            };
760
            created_send.send(reply).unwrap();
761
        };
762
        // Future to pretend to be a client.
763
        let client_fut = async move {
764
            let target = example_target();
765
            let params = CircParameters::default();
766
            let ret = if fast {
767
                trace!("doing fast create");
768
                pending.create_firsthop_fast(&params).await
769
            } else {
770
                trace!("doing ntor create");
771
                pending.create_firsthop_ntor(&target, params).await
772
            };
773
            trace!("create done: result {:?}", ret);
774
            ret
775
        };
776

            
777
        let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
778

            
779
        let _circ = circ.unwrap();
780

            
781
        // pfew!  We've build a circuit!  Let's make sure it has one hop.
782
        /* TODO: reinstate this.
783
        let inner = Arc::get_mut(&mut circuit).unwrap().c.into_inner();
784
        assert_eq!(inner.hops.len(), 1);
785
         */
786
    }
787

            
788
    #[test]
789
    fn test_create_fast() {
790
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
791
            test_create(&rt, true).await;
792
        });
793
    }
794
    #[test]
795
    fn test_create_ntor() {
796
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
797
            test_create(&rt, false).await;
798
        });
799
    }
800

            
801
    // An encryption layer that doesn't do any crypto.   Can be used
802
    // as inbound or outbound, but not both at once.
803
    pub(crate) struct DummyCrypto {
804
        counter_tag: [u8; 20],
805
        counter: u32,
806
        lasthop: bool,
807
    }
808
    impl DummyCrypto {
809
        fn next_tag(&mut self) -> &[u8; 20] {
810
            #![allow(clippy::identity_op)]
811
            self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
812
            self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
813
            self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
814
            self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
815
            self.counter += 1;
816
            &self.counter_tag
817
        }
818
    }
819

            
820
    impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
821
        fn originate_for(&mut self, _cell: &mut RelayCellBody) -> &[u8] {
822
            self.next_tag()
823
        }
824
        fn encrypt_outbound(&mut self, _cell: &mut RelayCellBody) {}
825
    }
826
    impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
827
        fn decrypt_inbound(&mut self, _cell: &mut RelayCellBody) -> Option<&[u8]> {
828
            if self.lasthop {
829
                Some(self.next_tag())
830
            } else {
831
                None
832
            }
833
        }
834
    }
835
    impl DummyCrypto {
836
        pub(crate) fn new(lasthop: bool) -> Self {
837
            DummyCrypto {
838
                counter_tag: [0; 20],
839
                counter: 0,
840
                lasthop,
841
            }
842
        }
843
    }
844

            
845
    // Helper: set up a 3-hop circuit with no encryption, where the
846
    // next inbound message seems to come from hop next_msg_from
847
    async fn newcirc_ext<R: Runtime>(
848
        rt: &R,
849
        chan: Channel,
850
        next_msg_from: HopNum,
851
    ) -> (ClientCirc, mpsc::Sender<ClientCircChanMsg>) {
852
        let circid = 128.into();
853
        let (_created_send, created_recv) = oneshot::channel();
854
        let (circmsg_send, circmsg_recv) = mpsc::channel(64);
855
        let unique_id = UniqId::new(23, 17);
856

            
857
        let (pending, reactor) =
858
            PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
859

            
860
        rt.spawn(async {
861
            let _ignore = reactor.run().await;
862
        })
863
        .unwrap();
864

            
865
        let PendingClientCirc {
866
            circ,
867
            recvcreated: _,
868
        } = pending;
869

            
870
        for idx in 0_u8..3 {
871
            let params = CircParameters::default();
872
            let (tx, rx) = oneshot::channel();
873
            circ.control
874
                .unbounded_send(CtrlMsg::AddFakeHop {
875
                    supports_flowctrl_1: true,
876
                    fwd_lasthop: idx == 2,
877
                    rev_lasthop: idx == next_msg_from.into(),
878
                    params,
879
                    done: tx,
880
                })
881
                .unwrap();
882
            rx.await.unwrap().unwrap();
883
        }
884

            
885
        (circ, circmsg_send)
886
    }
887

            
888
    // Helper: set up a 3-hop circuit with no encryption, where the
889
    // next inbound message seems to come from hop next_msg_from
890
    async fn newcirc<R: Runtime>(
891
        rt: &R,
892
        chan: Channel,
893
    ) -> (ClientCirc, mpsc::Sender<ClientCircChanMsg>) {
894
        newcirc_ext(rt, chan, 2.into()).await
895
    }
896

            
897
    // Try sending a cell via send_relay_cell
898
    #[test]
899
    fn send_simple() {
900
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
901
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
902
            let (circ, _send) = newcirc(&rt, chan).await;
903
            let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir);
904
            circ.control
905
                .unbounded_send(CtrlMsg::SendRelayCell {
906
                    hop: 2.into(),
907
                    early: false,
908
                    cell: begindir,
909
                })
910
                .unwrap();
911

            
912
            // Here's what we tried to put on the TLS channel.  Note that
913
            // we're using dummy relay crypto for testing convenience.
914
            let rcvd = rx.next().await.unwrap();
915
            assert_eq!(rcvd.circid(), 128.into());
916
            let m = match rcvd.into_circid_and_msg().1 {
917
                ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
918
                _ => panic!(),
919
            };
920
            assert!(matches!(m.msg(), RelayMsg::BeginDir));
921
        });
922
    }
923

            
924
    // NOTE(eta): this test is commented out because it basically tested implementation details
925
    //            of the old code which are hard to port to the reactor version, and the behaviour
926
    //            is covered by the extend tests anyway, so I don't think it's worth it.
927

            
928
    /*
929
    // Try getting a "meta-cell", which is what we're calling those not
930
    // for a specific circuit.
931
    #[async_test]
932
    async fn recv_meta() {
933
        let (chan, _, _sink) = working_fake_channel();
934
        let (circ, mut reactor, mut sink) = newcirc(chan).await;
935

            
936
        // 1: Try doing it via handle_meta_cell directly.
937
        let meta_receiver = circ.register_meta_handler(2.into()).await.unwrap();
938
        let extended: RelayMsg = relaymsg::Extended2::new((*b"123").into()).into();
939
        {
940
            circ.c
941
                .lock()
942
                .await
943
                .handle_meta_cell(2.into(), extended.clone())
944
                .await
945
                .unwrap();
946
        }
947
        let msg = meta_receiver.await.unwrap().unwrap();
948
        assert!(matches!(msg, RelayMsg::Extended2(_)));
949

            
950
        // 2: Try doing it via the reactor.
951
        let meta_receiver = circ.register_meta_handler(2.into()).await.unwrap();
952
        sink.send(rmsg_to_ccmsg(0, extended.clone())).await.unwrap();
953
        reactor.run_once().await.unwrap();
954
        let msg = meta_receiver.await.unwrap().unwrap();
955
        assert!(matches!(msg, RelayMsg::Extended2(_)));
956

            
957
        // 3: Try getting a meta cell that we didn't want.
958
        let e = {
959
            circ.c
960
                .lock()
961
                .await
962
                .handle_meta_cell(2.into(), extended.clone())
963
                .await
964
                .err()
965
                .unwrap()
966
        };
967
        assert_eq!(
968
            format!("{}", e),
969
            "circuit protocol violation: Unexpected EXTENDED2 cell on client circuit"
970
        );
971

            
972
        // 3: Try getting a meta from a hop that we didn't want.
973
        let _receiver = circ.register_meta_handler(2.into()).await.unwrap();
974
        let e = {
975
            circ.c
976
                .lock()
977
                .await
978
                .handle_meta_cell(1.into(), extended.clone())
979
                .await
980
                .err()
981
                .unwrap()
982
        };
983
        assert_eq!(
984
            format!("{}", e),
985
            "circuit protocol violation: Unexpected EXTENDED2 cell from hop 1 on client circuit"
986
        );
987
    }
988
     */
989

            
990
    #[test]
991
    fn extend() {
992
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
993
            use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
994

            
995
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
996
            let (circ, mut sink) = newcirc(&rt, chan).await;
997
            let params = CircParameters::default();
998

            
999
            let extend_fut = async move {
                let target = example_target();
                circ.extend_ntor(&target, &params).await.unwrap();
                circ // gotta keep the circ alive, or the reactor would exit.
            };
            let reply_fut = async move {
                // We've disabled encryption on this circuit, so we can just
                // read the extend2 cell.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, 128.into());
                let rmsg = match chmsg {
                    ChanMsg::RelayEarly(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
                    _ => panic!(),
                };
                let e2 = match rmsg.msg() {
                    RelayMsg::Extend2(e2) => e2,
                    _ => panic!(),
                };
                let mut rng = thread_rng();
                let (_, reply) =
                    NtorServer::server(&mut rng, &[example_ntor_key()], e2.handshake()).unwrap();
                let extended2 = relaymsg::Extended2::new(reply).into();
                sink.send(rmsg_to_ccmsg(0, extended2)).await.unwrap();
                sink // gotta keep the sink alive, or the reactor will exit.
            };

            
            let (circ, _) = futures::join!(extend_fut, reply_fut);

            
            // Did we really add another hop?
            assert_eq!(circ.n_hops(), 4);
        });
    }

            
    async fn bad_extend_test_impl<R: Runtime>(
        rt: &R,
        reply_hop: HopNum,
        bad_reply: ClientCircChanMsg,
    ) -> Error {
        let (chan, _rx, _sink) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
        let params = CircParameters::default();

            
        let target = example_target();
        #[allow(clippy::clone_on_copy)]
        let rtc = rt.clone();
        let sink_handle = rt
            .spawn_with_handle(async move {
                rtc.sleep(Duration::from_millis(100)).await;
                sink.send(bad_reply).await.unwrap();
                sink
            })
            .unwrap();
        let outcome = circ.extend_ntor(&target, &params).await;
        let _sink = sink_handle.await;

            
        assert_eq!(circ.n_hops(), 3);
        assert!(outcome.is_err());
        outcome.unwrap_err()
    }

            
    #[test]
    fn bad_extend_wronghop() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![]).into();
            let cc = rmsg_to_ccmsg(0, extended2);

            
            let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
            // This case shows up as a CircDestroy, since a message sent
            // from the wrong hop won't even be delivered to the extend
            // code's meta-handler.  Instead the unexpected message will cause
            // the circuit to get torn down.
            match error {
                Error::CircuitClosed => {}
                x => panic!("got other error: {}", x),
            }
        });
    }

            
    #[test]
    fn bad_extend_wrongtype() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended = relaymsg::Extended::new(vec![7; 200]).into();
            let cc = rmsg_to_ccmsg(0, extended);

            
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::CircProto(s) => {
                    assert_eq!(s, "wanted EXTENDED2; got EXTENDED");
                }
                _ => panic!(),
            }
        });
    }

            
    #[test]
    fn bad_extend_destroy() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            match error {
                Error::CircuitClosed => {}
                _ => panic!(),
            }
        });
    }

            
    #[test]
    fn bad_extend_crypto() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
            let cc = rmsg_to_ccmsg(0, extended2);
            let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
            assert!(matches!(error, Error::BadCircHandshake));
        });
    }

            
    #[test]
    fn begindir() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (chan, mut rx, _sink) = working_fake_channel(&rt);
            let (circ, mut sink) = newcirc(&rt, chan).await;

            
            let begin_and_send_fut = async move {
                // Here we'll say we've got a circuit, and we want to
                // make a simple BEGINDIR request with it.
                let mut stream = circ.begin_dir_stream().await.unwrap();
                stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
                stream.flush().await.unwrap();
                let mut buf = [0_u8; 1024];
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
                let n = stream.read(&mut buf).await.unwrap();
                assert_eq!(n, 0);
                stream
            };
            let reply_fut = async move {
                // We've disabled encryption on this circuit, so we can just
                // read the begindir cell.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, 128.into()); // hardcoded circid.
                let rmsg = match chmsg {
                    ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
                    _ => panic!(),
                };
                let (streamid, rmsg) = rmsg.into_streamid_and_msg();
                assert!(matches!(rmsg, RelayMsg::BeginDir));

            
                // Reply with a Connected cell to indicate success.
                let connected = relaymsg::Connected::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();

            
                // Now read a DATA cell...
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, 128.into());
                let rmsg = match chmsg {
                    ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
                    _ => panic!(),
                };
                let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid_2, streamid);
                if let RelayMsg::Data(d) = rmsg {
                    assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
                } else {
                    panic!();
                }

            
                // Write another data cell in reply!
                let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
                    .unwrap()
                    .into();
                sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();

            
                // Send an END cell to say that the conversation is over.
                let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
                sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();

            
                (rx, sink) // gotta keep these alive, or the reactor will exit.
            };

            
            let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
        });
    }

            
    // Set up a circuit and stream that expects some incoming SENDMEs.
    async fn setup_incoming_sendme_case<R: Runtime>(
        rt: &R,
        n_to_send: usize,
    ) -> (
        ClientCirc,
        DataStream,
        mpsc::Sender<ClientCircChanMsg>,
        StreamId,
        usize,
        Receiver<ChanCell>,
        Sender<std::result::Result<ChanCell, CodecError>>,
    ) {
        let (chan, mut rx, sink2) = working_fake_channel(rt);
        let (circ, mut sink) = newcirc(rt, chan).await;

            
        let circ_clone = circ.clone();
        let begin_and_send_fut = async move {
            // Take our circuit and make a stream on it.
            let mut stream = circ_clone
                .begin_stream("www.example.com", 443, None)
                .await
                .unwrap();
            let junk = [0_u8; 1024];
            let mut remaining = n_to_send;
            while remaining > 0 {
                let n = std::cmp::min(remaining, junk.len());
                stream.write_all(&junk[..n]).await.unwrap();
                remaining -= n;
            }
            stream.flush().await.unwrap();
            stream
        };

            
        let receive_fut = async move {
            // Read the begindir cell.
            let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
            let rmsg = match chmsg {
                ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
                _ => panic!(),
            };
            let (streamid, rmsg) = rmsg.into_streamid_and_msg();
            assert!(matches!(rmsg, RelayMsg::Begin(_)));
            // Reply with a connected cell...
            let connected = relaymsg::Connected::new_empty().into();
            sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
            // Now read bytes from the stream until we have them all.
            let mut bytes_received = 0_usize;
            let mut cells_received = 0_usize;
            while bytes_received < n_to_send {
                // Read a data cell, and remember how much we got.
                let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
                assert_eq!(id, 128.into());

            
                let rmsg = match chmsg {
                    ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
                    _ => panic!(),
                };
                let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
                assert_eq!(streamid2, streamid);
                if let RelayMsg::Data(dat) = rmsg {
                    cells_received += 1;
                    bytes_received += dat.as_ref().len();
                } else {
                    panic!();
                }
            }

            
            (sink, streamid, cells_received, rx)
        };

            
        let (stream, (sink, streamid, cells_received, rx)) =
            futures::join!(begin_and_send_fut, receive_fut);

            
        (circ, stream, sink, streamid, cells_received, rx, sink2)
    }

            
    #[test]
    fn accept_valid_sendme() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;

            
            assert_eq!(cells_received, 301);

            
            // Make sure that the circuit is indeed expecting the right sendmes
            {
                let (tx, rx) = oneshot::channel();
                circ.control
                    .unbounded_send(CtrlMsg::QuerySendWindow {
                        hop: 2.into(),
                        done: tx,
                    })
                    .unwrap();
                let (window, tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 301);
                assert_eq!(tags.len(), 3);
                // 100
                assert_eq!(
                    tags[0],
                    sendme::CircTag::from(hex!("6400000000000000000000000000000000000000"))
                );
                // 200
                assert_eq!(
                    tags[1],
                    sendme::CircTag::from(hex!("c800000000000000000000000000000000000000"))
                );
                // 300
                assert_eq!(
                    tags[2],
                    sendme::CircTag::from(hex!("2c01000000000000000000000000000000000000"))
                );
            }

            
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
                        .into();
                sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap();

            
                // Make and send a stream-level sendme.
                let s_sendme = relaymsg::Sendme::new_empty().into();
                sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();

            
                sink
            };

            
            let _sink = reply_with_sendme_fut.await;

            
            // FIXME(eta): this is a hacky way of waiting for the reactor to run before doing the below
            //             query; should find some way to properly synchronize to avoid flakiness
            rt.sleep(Duration::from_millis(100)).await;
            // Now make sure that the circuit is still happy, and its
            // window is updated.
            {
                let (tx, rx) = oneshot::channel();
                circ.control
                    .unbounded_send(CtrlMsg::QuerySendWindow {
                        hop: 2.into(),
                        done: tx,
                    })
                    .unwrap();
                let (window, _tags) = rx.await.unwrap().unwrap();
                assert_eq!(window, 1000 - 201);
            }
        });
    }

            
    #[test]
    fn invalid_circ_sendme() {
        tor_rtcompat::test_with_all_runtimes!(|rt| async move {
            // Same setup as accept_valid_sendme() test above but try giving
            // a sendme with the wrong tag.

            
            let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
                setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;

            
            let reply_with_sendme_fut = async move {
                // make and send a circuit-level sendme with a bad tag.
                let c_sendme =
                    relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
                        .into();
                sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap();
                sink
            };

            
            let _sink = reply_with_sendme_fut.await;

            
            let mut tries = 0;
            // FIXME(eta): we aren't testing the error message like we used to; however, we can at least
            //             check whether the reactor dies as a result of receiving invalid data.
            while !circ.control.is_closed() {
                // TODO: Don't sleep in tests.
                rt.sleep(Duration::from_millis(100)).await;
                tries += 1;
                if tries > 10 {
                    panic!("reactor continued running after invalid sendme");
                }
            }

            
            // TODO: check that the circuit is shut down too
        });
    }

            
    #[test]
    fn basic_params() {
        use super::CircParameters;
        let mut p = CircParameters::default();
        assert_eq!(p.initial_send_window(), 1000);
        assert!(p.extend_by_ed25519_id());

            
        assert!(p.set_initial_send_window(500).is_ok());
        p.set_extend_by_ed25519_id(false);
        assert_eq!(p.initial_send_window(), 500);
        assert!(!p.extend_by_ed25519_id());

            
        assert!(p.set_initial_send_window(9000).is_err());
        assert_eq!(p.initial_send_window(), 500);
    }
}