1
//! Code for talking directly (over a TLS connection) to a Tor client or relay.
2
//!
3
//! Channels form the basis of the rest of the Tor protocol: they are
4
//! the only way for two Tor instances to talk.
5
//!
6
//! Channels are not useful directly for application requests: after
7
//! making a channel, it needs to get used to build circuits, and the
8
//! circuits are used to anonymize streams.  The streams are the
9
//! objects corresponding to directory requests.
10
//!
11
//! In general, you shouldn't try to manage channels on your own;
12
//! however, there is no alternative in Arti today.  (A future
13
//! channel-manager library will probably fix that.)
14
//!
15
//! To launch a channel:
16
//!
17
//!  * Create a TLS connection as an object that implements AsyncRead
18
//!    + AsyncWrite, and pass it to a [ChannelBuilder].  This will
19
//!    yield an [handshake::OutboundClientHandshake] that represents
20
//!    the state of the handshake.
21
//!  * Call [handshake::OutboundClientHandshake::connect] on the result
22
//!    to negotiate the rest of the handshake.  This will verify
23
//!    syntactic correctness of the handshake, but not its cryptographic
24
//!    integrity.
25
//!  * Call [handshake::UnverifiedChannel::check] on the result.  This
26
//!    finishes the cryptographic checks.
27
//!  * Call [handshake::VerifiedChannel::finish] on the result. This
28
//!    completes the handshake and produces an open channel and Reactor.
29
//!  * Launch an asynchronous task to call the reactor's run() method.
30
//!
31
//! One you have a running channel, you can create circuits on it with
32
//! its [Channel::new_circ] method.  See
33
//! [crate::circuit::PendingClientCirc] for information on how to
34
//! proceed from there.
35
//!
36
//! # Design
37
//!
38
//! For now, this code splits the channel into two pieces: a "Channel"
39
//! object that can be used by circuits to write cells onto the
40
//! channel, and a "Reactor" object that runs as a task in the
41
//! background, to read channel cells and pass them to circuits as
42
//! appropriate.
43
//!
44
//! I'm not at all sure that's the best way to do that, but it's what
45
//! I could think of.
46
//!
47
//! # Limitations
48
//!
49
//! This is client-only, and only supports link protocol version 4.
50
//!
51
//! TODO: There is no channel padding.
52
//!
53
//! TODO: There is no flow control, rate limiting, queueing, or
54
//! fairness.
55

            
56
/// The size of the channel buffer for communication between `Channel` and its reactor.
57
pub const CHANNEL_BUFFER_SIZE: usize = 128;
58

            
59
mod circmap;
60
mod codec;
61
mod handshake;
62
mod reactor;
63
mod unique_id;
64

            
65
use crate::channel::reactor::{BoxedChannelSink, BoxedChannelStream, CtrlMsg, Reactor};
66
pub use crate::channel::unique_id::UniqId;
67
use crate::circuit;
68
use crate::circuit::celltypes::CreateResponse;
69
use crate::util::ts::OptTimestamp;
70
use crate::{Error, Result};
71
use std::pin::Pin;
72
use tor_cell::chancell::{msg, ChanCell, CircId};
73
use tor_error::internal;
74
use tor_linkspec::ChanTarget;
75
use tor_llcrypto::pk::ed25519::Ed25519Identity;
76
use tor_llcrypto::pk::rsa::RsaIdentity;
77

            
78
use asynchronous_codec as futures_codec;
79
use futures::channel::{mpsc, oneshot};
80
use futures::io::{AsyncRead, AsyncWrite};
81

            
82
use futures::{Sink, SinkExt};
83
use std::sync::atomic::{AtomicBool, Ordering};
84
use std::sync::Arc;
85
use std::task::{Context, Poll};
86

            
87
use tracing::trace;
88

            
89
// reexport
90
use crate::channel::unique_id::CircUniqIdContext;
91
#[cfg(test)]
92
pub(crate) use codec::CodecError;
93
pub use handshake::{OutboundClientHandshake, UnverifiedChannel, VerifiedChannel};
94

            
95
/// Type alias: A Sink and Stream that transforms a TLS connection into
96
/// a cell-based communication mechanism.
97
type CellFrame<T> = futures_codec::Framed<T, crate::channel::codec::ChannelCodec>;
98

            
99
/// An open client channel, ready to send and receive Tor cells.
100
///
101
/// A channel is a direct connection to a Tor relay, implemented using TLS.
102
///
103
/// This struct is a frontend that can be used to send cells (using the `Sink<ChanCell>`
104
/// impl and otherwise control the channel.  The main state is in the Reactor object.
105
/// `Channel` is cheap to clone.
106
///
107
/// (Users need a mutable reference because of the types in `Sink`, and ultimately because
108
/// `cell_tx: mpsc::Sender` doesn't work without mut.
109
4
#[derive(Clone, Debug)]
110
pub struct Channel {
111
    /// A channel used to send control messages to the Reactor.
112
    control: mpsc::UnboundedSender<CtrlMsg>,
113
    /// A channel used to send cells to the Reactor.
114
    cell_tx: mpsc::Sender<ChanCell>,
115
    /// Information shared with the reactor
116
    details: Arc<ChannelDetails>,
117
}
118

            
119
/// This is information shared between the reactor and the frontend.
120
///
121
/// This exists to make `Channel` cheap to clone, which is desirable because every circuit wants
122
/// an owned mutable `Channel`.
123
///
124
/// `control` can't be here because we rely on it getting dropped when the last user goes away.
125
#[derive(Debug)]
126
pub(crate) struct ChannelDetails {
127
    /// A unique identifier for this channel.
128
    unique_id: UniqId,
129
    /// Validated Ed25519 identity for this peer.
130
    ed25519_id: Ed25519Identity,
131
    /// Validated RSA identity for this peer.
132
    rsa_id: RsaIdentity,
133
    /// If true, this channel is closing.
134
    closed: AtomicBool,
135
    /// Since when the channel became unused.
136
    ///
137
    /// If calling `time_since_update` returns None,
138
    /// this channel is still in use by at least one circuit.
139
    unused_since: OptTimestamp,
140
}
141

            
142
impl Sink<ChanCell> for Channel {
143
    type Error = Error;
144

            
145
    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
146
        let this = self.get_mut();
147
        Pin::new(&mut this.cell_tx)
148
            .poll_ready(cx)
149
            .map_err(|_| Error::ChannelClosed)
150
    }
151

            
152
2456
    fn start_send(self: Pin<&mut Self>, cell: ChanCell) -> Result<()> {
153
2456
        let this = self.get_mut();
154
2456
        if this.details.closed.load(Ordering::SeqCst) {
155
            return Err(Error::ChannelClosed);
156
2456
        }
157
2456
        this.check_cell(&cell)?;
158
        {
159
            use msg::ChanMsg::*;
160
2456
            match cell.msg() {
161
2428
                Relay(_) | Padding(_) | VPadding(_) => {} // too frequent to log.
162
28
                _ => trace!(
163
                    "{}: Sending {} for {}",
164
                    this.details.unique_id,
165
                    cell.msg().cmd(),
166
                    cell.circid()
167
                ),
168
            }
169
        }
170

            
171
2456
        Pin::new(&mut this.cell_tx)
172
2456
            .start_send(cell)
173
2456
            .map_err(|_| Error::ChannelClosed)
174
2456
    }
175

            
176
2804
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
177
2804
        let this = self.get_mut();
178
2804
        Pin::new(&mut this.cell_tx)
179
2804
            .poll_flush(cx)
180
2804
            .map_err(|_| Error::ChannelClosed)
181
2804
    }
182

            
183
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
184
        let this = self.get_mut();
185
        Pin::new(&mut this.cell_tx)
186
            .poll_close(cx)
187
            .map_err(|_| Error::ChannelClosed)
188
    }
189
}
190

            
191
/// Structure for building and launching a Tor channel.
192
18
#[derive(Default)]
193
pub struct ChannelBuilder {
194
    /// If present, a description of the address we're trying to connect to,
195
    /// to be used in log messages.
196
    ///
197
    /// TODO: at some point, check this against the addresses in the
198
    /// netinfo cell too.
199
    target: Option<std::net::SocketAddr>,
200
}
201

            
202
impl ChannelBuilder {
203
    /// Construct a new ChannelBuilder.
204
17
    pub fn new() -> Self {
205
17
        ChannelBuilder::default()
206
17
    }
207

            
208
    /// Set the declared target address of this channel.
209
    ///
210
    /// Note that nothing enforces the correctness of this address: it
211
    /// doesn't have to match the real address target of the TLS
212
    /// stream.  For now it is only used for logging.
213
18
    pub fn set_declared_addr(&mut self, target: std::net::SocketAddr) {
214
18
        self.target = Some(target);
215
18
    }
216

            
217
    /// Launch a new client handshake over a TLS stream.
218
    ///
219
    /// After calling this function, you'll need to call `connect()` on
220
    /// the result to start the handshake.  If that succeeds, you'll have
221
    /// authentication info from the relay: call `check()` on the result
222
    /// to check that.  Finally, to finish the handshake, call `finish()`
223
    /// on the result of _that_.
224
2
    pub fn launch<T>(self, tls: T) -> OutboundClientHandshake<T>
225
2
    where
226
2
        T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
227
2
    {
228
2
        handshake::OutboundClientHandshake::new(tls, self.target)
229
2
    }
230
}
231

            
232
impl Channel {
233
    /// Construct a channel and reactor.
234
    ///
235
    /// Internal method, called to finalize the channel when we've
236
    /// sent our netinfo cell, received the peer's netinfo cell, and
237
    /// we're finally ready to create circuits.
238
86
    fn new(
239
86
        link_protocol: u16,
240
86
        sink: BoxedChannelSink,
241
86
        stream: BoxedChannelStream,
242
86
        unique_id: UniqId,
243
86
        ed25519_id: Ed25519Identity,
244
86
        rsa_id: RsaIdentity,
245
86
    ) -> (Self, reactor::Reactor) {
246
86
        use circmap::{CircIdRange, CircMap};
247
86
        let circmap = CircMap::new(CircIdRange::High);
248
86

            
249
86
        let (control_tx, control_rx) = mpsc::unbounded();
250
86
        let (cell_tx, cell_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
251
86
        let closed = AtomicBool::new(false);
252
86
        let unused_since = OptTimestamp::new();
253
86
        unused_since.update();
254
86

            
255
86
        let details = ChannelDetails {
256
86
            unique_id,
257
86
            ed25519_id,
258
86
            rsa_id,
259
86
            closed,
260
86
            unused_since,
261
86
        };
262
86
        let details = Arc::new(details);
263
86

            
264
86
        let channel = Channel {
265
86
            control: control_tx,
266
86
            cell_tx,
267
86
            details: Arc::clone(&details),
268
86
        };
269
86

            
270
86
        let reactor = Reactor {
271
86
            control: control_rx,
272
86
            cells: cell_rx,
273
86
            input: futures::StreamExt::fuse(stream),
274
86
            output: sink,
275
86
            circs: circmap,
276
86
            circ_unique_id_ctx: CircUniqIdContext::new(),
277
86
            link_protocol,
278
86
            details,
279
86
        };
280
86

            
281
86
        (channel, reactor)
282
86
    }
283

            
284
    /// Return a process-unique identifier for this channel.
285
2
    pub fn unique_id(&self) -> UniqId {
286
2
        self.details.unique_id
287
2
    }
288

            
289
    /// Return the Ed25519 identity for the peer of this channel.
290
26
    pub fn peer_ed25519_id(&self) -> &Ed25519Identity {
291
26
        &self.details.ed25519_id
292
26
    }
293

            
294
    /// Return the (legacy) RSA identity for the peer of this channel.
295
5
    pub fn peer_rsa_id(&self) -> &RsaIdentity {
296
5
        &self.details.rsa_id
297
5
    }
298

            
299
    /// Return an error if this channel is somehow mismatched with the
300
    /// given target.
301
3
    pub fn check_match<T: ChanTarget + ?Sized>(&self, target: &T) -> Result<()> {
302
3
        if self.peer_ed25519_id() != target.ed_identity() {
303
2
            return Err(Error::ChanMismatch(format!(
304
2
                "Identity {} does not match target {}",
305
2
                self.peer_ed25519_id(),
306
2
                target.ed_identity()
307
2
            )));
308
1
        }
309
1

            
310
1
        if self.peer_rsa_id() != target.rsa_identity() {
311
            return Err(Error::ChanMismatch(format!(
312
                "Identity {} does not match target {}",
313
                self.peer_rsa_id(),
314
                target.rsa_identity()
315
            )));
316
1
        }
317
1

            
318
1
        Ok(())
319
3
    }
320

            
321
    /// Return true if this channel is closed and therefore unusable.
322
21
    pub fn is_closing(&self) -> bool {
323
21
        self.details.closed.load(Ordering::SeqCst)
324
21
    }
325

            
326
    /// If the channel is not in use, return the amount of time
327
    /// it has had with no circuits.
328
    ///
329
    /// Return `None` if the channel is currently in use.
330
13
    pub fn duration_unused(&self) -> Option<std::time::Duration> {
331
13
        self.details
332
13
            .unused_since
333
13
            .time_since_update()
334
13
            .map(Into::into)
335
13
    }
336

            
337
    /// Check whether a cell type is permissible to be _sent_ on an
338
    /// open client channel.
339
2468
    fn check_cell(&self, cell: &ChanCell) -> Result<()> {
340
2468
        use msg::ChanMsg::*;
341
2468
        let msg = cell.msg();
342
2468
        match msg {
343
4
            Created(_) | Created2(_) | CreatedFast(_) => Err(Error::from(internal!(
344
4
                "Can't send {} cell on client channel",
345
4
                msg.cmd()
346
4
            ))),
347
            Certs(_) | Versions(_) | Authenticate(_) | Authorize(_) | AuthChallenge(_)
348
4
            | Netinfo(_) => Err(Error::from(internal!(
349
4
                "Can't send {} cell after handshake is done",
350
4
                msg.cmd()
351
4
            ))),
352
2460
            _ => Ok(()),
353
        }
354
2468
    }
355

            
356
    /// Like `futures::Sink::poll_ready`.
357
5250
    pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result<bool> {
358
5250
        Ok(match Pin::new(&mut self.cell_tx).poll_ready(cx) {
359
5244
            Poll::Ready(Ok(_)) => true,
360
            Poll::Ready(Err(_)) => return Err(Error::CircuitClosed),
361
8
            Poll::Pending => false,
362
        })
363
5252
    }
364

            
365
    /// Transmit a single cell on a channel.
366
    pub async fn send_cell(&mut self, cell: ChanCell) -> Result<()> {
367
        self.send(cell).await?;
368

            
369
        Ok(())
370
    }
371

            
372
    /// Return a newly allocated PendingClientCirc object with
373
    /// a corresponding circuit reactor. A circuit ID is allocated, but no
374
    /// messages are sent, and no cryptography is done.
375
    ///
376
    /// To use the results of this method, call Reactor::run() in a
377
    /// new task, then use the methods of
378
    /// [crate::circuit::PendingClientCirc] to build the circuit.
379
4
    pub async fn new_circ(
380
4
        &self,
381
4
    ) -> Result<(circuit::PendingClientCirc, circuit::reactor::Reactor)> {
382
4
        if self.is_closing() {
383
            return Err(Error::ChannelClosed);
384
4
        }
385
4

            
386
4
        // TODO: blocking is risky, but so is unbounded.
387
4
        let (sender, receiver) = mpsc::channel(128);
388
4
        let (createdsender, createdreceiver) = oneshot::channel::<CreateResponse>();
389
4

            
390
4
        let (tx, rx) = oneshot::channel();
391
4
        self.control
392
4
            .unbounded_send(CtrlMsg::AllocateCircuit {
393
4
                created_sender: createdsender,
394
4
                sender,
395
4
                tx,
396
4
            })
397
4
            .map_err(|_| Error::ChannelClosed)?;
398
4
        let (id, circ_unique_id) = rx.await.map_err(|_| Error::ChannelClosed)??;
399

            
400
        trace!("{}: Allocated CircId {}", circ_unique_id, id);
401

            
402
4
        Ok(circuit::PendingClientCirc::new(
403
4
            id,
404
4
            self.clone(),
405
4
            createdreceiver,
406
4
            receiver,
407
4
            circ_unique_id,
408
4
        ))
409
4
    }
410

            
411
    /// Shut down this channel immediately, along with all circuits that
412
    /// are using it.
413
    ///
414
    /// Note that other references to this channel may exist.  If they
415
    /// do, they will stop working after you call this function.
416
    ///
417
    /// It's not necessary to call this method if you're just done
418
    /// with a channel: the channel should close on its own once nothing
419
    /// is using it any more.
420
8
    pub fn terminate(&self) {
421
8
        let _ = self.control.unbounded_send(CtrlMsg::Shutdown);
422
8
    }
423

            
424
    /// Tell the reactor that the circuit with the given ID has gone away.
425
    pub fn close_circuit(&self, circid: CircId) -> Result<()> {
426
48
        self.control
427
48
            .unbounded_send(CtrlMsg::CloseCircuit(circid))
428
48
            .map_err(|_| Error::ChannelClosed)?;
429
43
        Ok(())
430
48
    }
431
}
432

            
433
#[cfg(test)]
434
pub(crate) mod test {
435
    // Most of this module is tested via tests that also check on the
436
    // reactor code; there are just a few more cases to examine here.
437
    #![allow(clippy::unwrap_used)]
438
    use super::*;
439
    use crate::channel::codec::test::MsgBuf;
440
    pub(crate) use crate::channel::reactor::test::new_reactor;
441
    use tor_cell::chancell::{msg, ChanCell};
442

            
443
    /// Make a new fake reactor-less channel.  For testing only, obviously.
444
8
    pub(crate) fn fake_channel(details: Arc<ChannelDetails>) -> Channel {
445
8
        Channel {
446
8
            control: mpsc::unbounded().0,
447
8
            cell_tx: mpsc::channel(CHANNEL_BUFFER_SIZE).0,
448
8
            details,
449
8
        }
450
8
    }
451

            
452
8
    fn fake_channel_details() -> Arc<ChannelDetails> {
453
8
        let unique_id = UniqId::new();
454
8
        let unused_since = OptTimestamp::new();
455
8

            
456
8
        Arc::new(ChannelDetails {
457
8
            unique_id,
458
8
            ed25519_id: [6_u8; 32].into(),
459
8
            rsa_id: [10_u8; 20].into(),
460
8
            closed: AtomicBool::new(false),
461
8
            unused_since,
462
8
        })
463
8
    }
464

            
465
1
    #[test]
466
1
    fn send_bad() {
467
4
        tor_rtcompat::test_with_all_runtimes!(|_rt| async move {
468
4
            let chan = fake_channel(fake_channel_details());
469
4

            
470
4
            let cell = ChanCell::new(7.into(), msg::Created2::new(&b"hihi"[..]).into());
471
4
            let e = chan.check_cell(&cell);
472
4
            assert!(e.is_err());
473
4
            assert!(format!("{}", e.unwrap_err())
474
4
                .contains("Can't send CREATED2 cell on client channel"));
475
4
            let cell = ChanCell::new(0.into(), msg::Certs::new_empty().into());
476
4
            let e = chan.check_cell(&cell);
477
4
            assert!(e.is_err());
478
4
            assert!(format!("{}", e.unwrap_err())
479
4
                .contains("Can't send CERTS cell after handshake is done"));
480

            
481
4
            let cell = ChanCell::new(5.into(), msg::Create2::new(2, &b"abc"[..]).into());
482
4
            let e = chan.check_cell(&cell);
483
4
            assert!(e.is_ok());
484
            // FIXME(eta): more difficult to test that sending works now that it has to go via reactor
485
            // let got = output.next().await.unwrap();
486
            // assert!(matches!(got.msg(), ChanMsg::Create2(_)));
487
4
        });
488
1
    }
489

            
490
1
    #[test]
491
1
    fn chanbuilder() {
492
1
        let mut builder = ChannelBuilder::default();
493
1
        builder.set_declared_addr("127.0.0.1:9001".parse().unwrap());
494
1
        let tls = MsgBuf::new(&b""[..]);
495
1
        let _outbound = builder.launch(tls);
496
1
    }
497

            
498
1
    #[test]
499
1
    fn check_match() {
500
1
        use std::net::SocketAddr;
501
1
        let chan = fake_channel(fake_channel_details());
502
1

            
503
1
        struct ChanT {
504
1
            ed_id: Ed25519Identity,
505
1
            rsa_id: RsaIdentity,
506
1
        }
507
1

            
508
1
        impl ChanTarget for ChanT {
509
5
            fn ed_identity(&self) -> &Ed25519Identity {
510
5
                &self.ed_id
511
5
            }
512
1
            fn rsa_identity(&self) -> &RsaIdentity {
513
1
                &self.rsa_id
514
1
            }
515
1
            fn addrs(&self) -> &[SocketAddr] {
516
                &[]
517
            }
518
1
        }
519
1

            
520
1
        let t1 = ChanT {
521
1
            ed_id: [6; 32].into(),
522
1
            rsa_id: [10; 20].into(),
523
1
        };
524
1
        let t2 = ChanT {
525
1
            ed_id: [0x1; 32].into(),
526
1
            rsa_id: [0x3; 20].into(),
527
1
        };
528
1
        let t3 = ChanT {
529
1
            ed_id: [0x3; 32].into(),
530
1
            rsa_id: [0x2; 20].into(),
531
1
        };
532
1

            
533
1
        assert!(chan.check_match(&t1).is_ok());
534
1
        assert!(chan.check_match(&t2).is_err());
535
1
        assert!(chan.check_match(&t3).is_err());
536
1
    }
537

            
538
1
    #[test]
539
1
    fn unique_id() {
540
1
        let ch1 = fake_channel(fake_channel_details());
541
1
        let ch2 = fake_channel(fake_channel_details());
542
1
        assert_ne!(ch1.unique_id(), ch2.unique_id());
543
1
    }
544

            
545
1
    #[test]
546
1
    fn duration_unused_at() {
547
1
        let details = fake_channel_details();
548
1
        let ch = fake_channel(Arc::clone(&details));
549
1
        details.unused_since.update();
550
1
        assert!(ch.duration_unused().is_some());
551
1
    }
552
}