Lines
73.2 %
Functions
63.35 %
Branches
100 %
//! Multi-hop paths over the Tor network.
//!
//! Right now, we only implement "client circuits" -- also sometimes
//! called "origin circuits". A client circuit is one that is
//! constructed by this Tor instance, and used in its own behalf to
//! send data over the Tor network.
//! Each circuit has multiple hops over the Tor network: each hop
//! knows only the hop before and the hop after. The client shares a
//! separate set of keys with each hop.
//! To build a circuit, first create a [crate::channel::Channel], then
//! call its [crate::channel::Channel::new_circ] method. This yields
//! a [PendingClientCirc] object that won't become live until you call
//! one of the methods that extends it to its first hop. After you've
//! done that, you can call [ClientCirc::extend_ntor] on the circuit to
//! build it into a multi-hop circuit. Finally, you can use
//! [ClientCirc::begin_stream] to get a Stream object that can be used
//! for anonymized data.
//! # Implementation
//! Each open circuit has a corresponding Reactor object that runs in
//! an asynchronous task, and manages incoming cells from the
//! circuit's upstream channel. These cells are either RELAY cells or
//! DESTROY cells. DESTROY cells are handled immediately.
//! RELAY cells are either for a particular stream, in which case they
//! get forwarded to a RawCellStream object, or for no particular stream,
//! in which case they are considered "meta" cells (like EXTENDED2)
//! that should only get accepted if something is waiting for them.
//! # Limitations
//! This is client-only.
//! There's one big mutex on the whole circuit: the reactor needs to hold
//! it to process a cell, and streams need to hold it to send.
//! There is no flow-control or rate-limiting or fairness.
pub(crate) mod celltypes;
pub(crate) mod halfcirc;
mod halfstream;
pub(crate) mod reactor;
pub(crate) mod sendme;
mod streammap;
mod unique_id;
use crate::channel::Channel;
use crate::circuit::celltypes::*;
use crate::circuit::reactor::{
CircuitHandshake, CtrlMsg, Reactor, RECV_WINDOW_INIT, STREAM_READER_BUFFER,
};
pub use crate::circuit::unique_id::UniqId;
use crate::crypto::cell::{HopNum, InboundClientCrypt, OutboundClientCrypt};
use crate::stream::{DataStream, ResolveStream, StreamParameters, StreamReader};
use crate::{Error, Result};
use tor_cell::{
chancell::{self, msg::ChanMsg, CircId},
relaycell::msg::{Begin, RelayMsg, Resolve, Resolved, ResolvedVal},
use tor_error::{bad_api_usage, internal};
use tor_linkspec::{CircTarget, LinkSpec};
use futures::channel::{mpsc, oneshot};
use crate::circuit::sendme::StreamRecvWindow;
use futures::SinkExt;
use std::net::IpAddr;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
use tor_cell::relaycell::StreamId;
// use std::time::Duration;
use crate::crypto::handshake::ntor::NtorPublicKey;
use self::reactor::RequireSendmeAuth;
/// The size of the buffer for communication between `ClientCirc` and its reactor.
pub const CIRCUIT_BUFFER_SIZE: usize = 128;
#[derive(Clone, Debug)]
/// A circuit that we have constructed over the Tor network.
///
/// This struct is the interface used by the rest of the code, It is fairly
/// cheaply cloneable. None of the public methods need mutable access, since
/// they all actually communicate with the Reactor which contains the primary
/// mutable state, and does the actual work.
//
// Effectively, this struct contains two Arcs: one for `hops` and one for
// `control` (which surely has something Arc-like in it). We cannot unify
// these by putting a single Arc around the whole struct, and passing
// an Arc strong reference to the `Reactor`, because then `control` would
// not be dropped when the last user of the circuit goes away. We could
// make the reactor have a weak reference but weak references are more
// expensive to dereference.
// Because of the above, cloning this struct is always going to involve
// two atomic refcount changes/checks. Wrapping it in another Arc would
// be overkill.
pub struct ClientCirc {
/// Number of hops on this circuit.
/// This value is incremented after the circuit successfully completes extending to a new hop.
hops: Arc<AtomicU8>,
/// A unique identifier for this circuit.
unique_id: UniqId,
/// Channel to send control messages to the reactor.
control: mpsc::UnboundedSender<CtrlMsg>,
/// For testing purposes: the CircId, for use in peek_circid().
#[cfg(test)]
circid: CircId,
}
/// A ClientCirc that needs to send a create cell and receive a created* cell.
/// To use one of these, call create_firsthop_fast() or create_firsthop_ntor()
/// to negotiate the cryptographic handshake with the first hop.
pub struct PendingClientCirc {
/// A oneshot receiver on which we'll receive a CREATED* cell,
/// or a DESTROY cell.
recvcreated: oneshot::Receiver<CreateResponse>,
/// The ClientCirc object that we can expose on success.
circ: ClientCirc,
/// Description of the network's current rules for building circuits.
pub struct CircParameters {
/// Initial value to use for our outbound circuit-level windows.
initial_send_window: u16,
/// Whether we should include ed25519 identities when we send
/// EXTEND2 cells.
extend_by_ed25519_id: bool,
impl Default for CircParameters {
fn default() -> CircParameters {
CircParameters {
initial_send_window: 1000,
extend_by_ed25519_id: true,
impl CircParameters {
/// Override the default initial send window for these parameters.
/// Gives an error on any value above 1000.
/// You should probably not call this.
pub fn set_initial_send_window(&mut self, v: u16) -> Result<()> {
if v <= 1000 {
self.initial_send_window = v;
Ok(())
} else {
Err(Error::from(bad_api_usage!(
"Tried to set an initial send window over 1000"
)))
/// Return the initial send window as set in this parameter set.
pub fn initial_send_window(&self) -> u16 {
self.initial_send_window
/// Override the default decision about whether to use ed25519
/// identities in outgoing EXTEND2 cells.
pub fn set_extend_by_ed25519_id(&mut self, v: bool) {
self.extend_by_ed25519_id = v;
/// Return true if we're configured to extend by ed25519 ID; false
/// otherwise.
pub fn extend_by_ed25519_id(&self) -> bool {
self.extend_by_ed25519_id
/// A stream on a particular circuit.
pub(crate) struct StreamTarget {
/// Which hop of the circuit this stream is with.
hop_num: HopNum,
/// Reactor ID for this stream.
stream_id: StreamId,
/// Channel to send cells down.
tx: mpsc::Sender<RelayMsg>,
/// Reference to the circuit that this stream is on.
impl ClientCirc {
/// Extend the circuit via the ntor handshake to a new target last
/// hop.
pub async fn extend_ntor<Tg>(&self, target: &Tg, params: &CircParameters) -> Result<()>
where
Tg: CircTarget,
{
let key = NtorPublicKey {
id: *target.rsa_identity(),
pk: *target.ntor_onion_key(),
let mut linkspecs = target.linkspecs();
if !params.extend_by_ed25519_id() {
linkspecs.retain(|ls| !matches!(ls, LinkSpec::Ed25519Id(_)));
// FlowCtrl=1 means that this hop supports authenticated SENDMEs
let require_sendme_auth = RequireSendmeAuth::from_protocols(target.protovers());
let (tx, rx) = oneshot::channel();
self.control
.unbounded_send(CtrlMsg::ExtendNtor {
public_key: key,
linkspecs,
require_sendme_auth,
params: params.clone(),
done: tx,
})
.map_err(|_| Error::CircuitClosed)?;
rx.await.map_err(|_| Error::CircuitClosed)??;
/// Helper, used to begin a stream.
/// This function allocates a stream ID, and sends the message
/// (like a BEGIN or RESOLVE), but doesn't wait for a response.
/// The caller will typically want to see the first cell in response,
/// to see whether it is e.g. an END or a CONNECTED.
async fn begin_stream_impl(&self, begin_msg: RelayMsg) -> Result<(StreamReader, StreamTarget)> {
// TODO: Possibly this should take a hop, rather than just
// assuming it's the last hop.
let num_hops = self.hops.load(Ordering::SeqCst);
if num_hops == 0 {
return Err(Error::from(internal!(
"Can't begin a stream at the 0th hop"
)));
let hop_num: HopNum = (num_hops - 1).into();
let (sender, receiver) = mpsc::channel(STREAM_READER_BUFFER);
let (msg_tx, msg_rx) = mpsc::channel(CIRCUIT_BUFFER_SIZE);
.unbounded_send(CtrlMsg::BeginStream {
hop_num,
message: begin_msg,
sender,
rx: msg_rx,
let stream_id = rx.await.map_err(|_| Error::CircuitClosed)??;
let target = StreamTarget {
circ: self.clone(),
tx: msg_tx,
stream_id,
let reader = StreamReader {
target: target.clone(),
receiver,
recv_window: StreamRecvWindow::new(RECV_WINDOW_INIT),
ended: false,
Ok((reader, target))
/// Start a DataStream (anonymized connection) to the given
/// address and port, using a BEGIN cell.
async fn begin_data_stream(&self, msg: RelayMsg, optimistic: bool) -> Result<DataStream> {
let (reader, target) = self.begin_stream_impl(msg).await?;
let mut stream = DataStream::new(reader, target);
if !optimistic {
stream.wait_for_connection().await?;
Ok(stream)
/// Start a stream to the given address and port, using a BEGIN
/// cell.
/// The use of a string for the address is intentional: you should let
/// the remote Tor relay do the hostname lookup for you.
pub async fn begin_stream(
&self,
target: &str,
port: u16,
parameters: Option<StreamParameters>,
) -> Result<DataStream> {
let parameters = parameters.unwrap_or_default();
let begin_flags = parameters.begin_flags();
let optimistic = parameters.is_optimistic();
let beginmsg = Begin::new(target, port, begin_flags)?;
self.begin_data_stream(beginmsg.into(), optimistic).await
/// Start a new stream to the last relay in the circuit, using
/// a BEGIN_DIR cell.
pub async fn begin_dir_stream(&self) -> Result<DataStream> {
// Note that we always open begindir connections optimistically.
// Since they are local to a relay that we've already authenticated
// with and built a circuit to, there should be no additional checks
// we need to perform to see whether the BEGINDIR will succeed.
self.begin_data_stream(RelayMsg::BeginDir, true).await
/// Perform a DNS lookup, using a RESOLVE cell with the last relay
/// in this circuit.
/// Note that this function does not check for timeouts; that's
/// the caller's responsibility.
pub async fn resolve(&self, hostname: &str) -> Result<Vec<IpAddr>> {
let resolve_msg = Resolve::new(hostname);
let resolved_msg = self.try_resolve(resolve_msg).await?;
resolved_msg
.into_answers()
.into_iter()
.filter_map(|(val, _)| match resolvedval_to_result(val) {
Ok(ResolvedVal::Ip(ip)) => Some(Ok(ip)),
Ok(_) => None,
Err(e) => Some(Err(e)),
.collect()
/// Perform a reverse DNS lookup, by sending a RESOLVE cell with
/// the last relay on this circuit.
pub async fn resolve_ptr(&self, addr: IpAddr) -> Result<Vec<String>> {
let resolve_ptr_msg = Resolve::new_reverse(&addr);
let resolved_msg = self.try_resolve(resolve_ptr_msg).await?;
Ok(ResolvedVal::Hostname(v)) => Some(
String::from_utf8(v)
.map_err(|_| Error::StreamProto("Resolved Hostname was not utf-8".into())),
),
/// Helper: Send the resolve message, and read resolved message from
/// resolve stream.
async fn try_resolve(&self, msg: Resolve) -> Result<Resolved> {
let (reader, _) = self.begin_stream_impl(msg.into()).await?;
let mut resolve_stream = ResolveStream::new(reader);
resolve_stream.read_msg().await
/// Shut down this circuit, along with all streams that are using it.
/// Happens asynchronously (i.e. the circuit won't necessarily be done shutting down
/// immediately after this function returns!).
/// Note that other references to this circuit may exist. If they
/// do, they will stop working after you call this function.
/// It's not necessary to call this method if you're just done
/// with a circuit: the channel should close on its own once nothing
/// is using it any more.
pub fn terminate(&self) {
let _ = self.control.unbounded_send(CtrlMsg::Shutdown);
/// Called when a circuit-level protocol error has occurred and the
/// circuit needs to shut down.
/// This is a separate function because we may eventually want to have
/// it do more than just shut down.
/// As with `terminate`, this function is asynchronous.
pub(crate) fn protocol_error(&self) {
self.terminate();
/// Return true if this circuit is closed and therefore unusable.
pub fn is_closing(&self) -> bool {
self.control.is_closed()
/// Return a process-unique identifier for this circuit.
pub fn unique_id(&self) -> UniqId {
self.unique_id
pub fn n_hops(&self) -> u8 {
self.hops.load(Ordering::SeqCst)
impl PendingClientCirc {
/// Instantiate a new circuit object: used from Channel::new_circ().
/// Does not send a CREATE* cell on its own.
pub(crate) fn new(
id: CircId,
channel: Channel,
createdreceiver: oneshot::Receiver<CreateResponse>,
input: mpsc::Receiver<ClientCircChanMsg>,
) -> (PendingClientCirc, reactor::Reactor) {
let crypto_out = OutboundClientCrypt::new();
let (control_tx, control_rx) = mpsc::unbounded();
let num_hops = Arc::new(AtomicU8::new(0));
let reactor = Reactor {
control: control_rx,
outbound: Default::default(),
channel,
input,
crypto_in: InboundClientCrypt::new(),
hops: vec![],
unique_id,
channel_id: id,
crypto_out,
meta_handler: None,
num_hops: Arc::clone(&num_hops),
let circuit = ClientCirc {
hops: num_hops,
control: control_tx,
circid: id,
let pending = PendingClientCirc {
recvcreated: createdreceiver,
circ: circuit,
(pending, reactor)
/// Testing only: Extract the circuit ID for this pending circuit.
pub(crate) fn peek_circid(&self) -> CircId {
self.circ.circid
/// Use the (questionable!) CREATE_FAST handshake to connect to the
/// first hop of this circuit.
/// There's no authentication in CRATE_FAST,
/// so we don't need to know whom we're connecting to: we're just
/// connecting to whichever relay the channel is for.
pub async fn create_firsthop_fast(self, params: &CircParameters) -> Result<ClientCirc> {
self.circ
.control
.unbounded_send(CtrlMsg::Create {
recv_created: self.recvcreated,
handshake: CircuitHandshake::CreateFast,
require_sendme_auth: RequireSendmeAuth::No,
Ok(self.circ)
/// Use the ntor handshake to connect to the first hop of this circuit.
/// Note that the provided 'target' must match the channel's target,
/// or the handshake will fail.
pub async fn create_firsthop_ntor<Tg>(
self,
target: &Tg,
params: CircParameters,
) -> Result<ClientCirc>
Tg: tor_linkspec::CircTarget,
handshake: CircuitHandshake::Ntor {
public_key: NtorPublicKey {
},
ed_identity: *target.ed_identity(),
/// An object that can put a given handshake into a ChanMsg for a CREATE*
/// cell, and unwrap a CREATED* cell.
trait CreateHandshakeWrap {
/// Construct an appropriate ChanMsg to hold this kind of handshake.
fn to_chanmsg(&self, bytes: Vec<u8>) -> ChanMsg;
/// Decode a ChanMsg to an appropriate handshake value, checking
/// its type.
fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>>;
/// A CreateHandshakeWrap that generates CREATE_FAST and handles CREATED_FAST.
struct CreateFastWrap;
impl CreateHandshakeWrap for CreateFastWrap {
fn to_chanmsg(&self, bytes: Vec<u8>) -> ChanMsg {
chancell::msg::CreateFast::new(bytes).into()
fn decode_chanmsg(&self, msg: CreateResponse) -> Result<Vec<u8>> {
use CreateResponse::*;
match msg {
CreatedFast(m) => Ok(m.into_body()),
Destroy(_) => Err(Error::CircRefused(
"Relay replied to CREATE_FAST with DESTROY.",
)),
_ => Err(Error::CircProto(format!(
"Relay replied to CREATE_FAST with unexpected cell: {:?}",
msg
))),
/// A CreateHandshakeWrap that generates CREATE2 and handles CREATED2
struct Create2Wrap {
/// The handshake type to put in the CREATE2 cell.
handshake_type: u16,
impl CreateHandshakeWrap for Create2Wrap {
chancell::msg::Create2::new(self.handshake_type, bytes).into()
Created2(m) => Ok(m.into_body()),
Destroy(_) => Err(Error::CircRefused("Relay replied to CREATE2 with DESTROY.")),
"Relay replied to CREATE2 with unexpected cell {:?}",
impl StreamTarget {
/// Deliver a relay message for the stream that owns this StreamTarget.
/// The StreamTarget will set the correct stream ID and pick the
/// right hop, but will not validate that the message is well-formed
/// or meaningful in context.
pub(crate) async fn send(&mut self, msg: RelayMsg) -> Result<()> {
self.tx.send(msg).await.map_err(|_| Error::CircuitClosed)?;
pub(crate) fn protocol_error(&mut self) {
self.circ.protocol_error();
/// Send a SENDME cell for this stream.
pub(crate) fn send_sendme(&mut self) -> Result<()> {
.unbounded_send(CtrlMsg::SendSendme {
stream_id: self.stream_id,
hop_num: self.hop_num,
/// Convert a [`ResolvedVal`] into a Result, based on whether or not
/// it represents an error.
fn resolvedval_to_result(val: ResolvedVal) -> Result<ResolvedVal> {
match val {
ResolvedVal::TransientError => Err(Error::ResolveError(
"Received retriable transient error".into(),
ResolvedVal::NontransientError => {
Err(Error::ResolveError("Received not retriable error.".into()))
_ => Ok(val),
mod test {
#![allow(clippy::unwrap_used)]
use super::*;
use crate::channel::{test::new_reactor, CodecError};
use crate::crypto::cell::RelayCellBody;
use chanmsg::{ChanMsg, Created2, CreatedFast};
use futures::channel::mpsc::{Receiver, Sender};
use futures::io::{AsyncReadExt, AsyncWriteExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use futures::task::SpawnExt;
use hex_literal::hex;
use rand::thread_rng;
use std::time::Duration;
use tor_cell::chancell::{msg as chanmsg, ChanCell};
use tor_cell::relaycell::{msg as relaymsg, RelayCell, StreamId};
use tor_llcrypto::pk;
use tor_rtcompat::{Runtime, SleepProvider};
use tracing::trace;
fn rmsg_to_ccmsg<ID>(id: ID, msg: relaymsg::RelayMsg) -> ClientCircChanMsg
ID: Into<StreamId>,
let body: RelayCellBody = RelayCell::new(id.into(), msg)
.encode(&mut thread_rng())
.unwrap()
.into();
let chanmsg = chanmsg::Relay::from_raw(body.into());
ClientCircChanMsg::Relay(chanmsg)
struct ExampleTarget {
ntor_key: pk::curve25519::PublicKey,
protovers: tor_protover::Protocols,
ed_id: pk::ed25519::Ed25519Identity,
rsa_id: pk::rsa::RsaIdentity,
impl tor_linkspec::ChanTarget for ExampleTarget {
fn addrs(&self) -> &[std::net::SocketAddr] {
&[]
fn ed_identity(&self) -> &pk::ed25519::Ed25519Identity {
&self.ed_id
fn rsa_identity(&self) -> &pk::rsa::RsaIdentity {
&self.rsa_id
impl tor_linkspec::CircTarget for ExampleTarget {
fn ntor_onion_key(&self) -> &pk::curve25519::PublicKey {
&self.ntor_key
fn protovers(&self) -> &tor_protover::Protocols {
&self.protovers
/// return an ExampleTarget that can get used for an ntor handshake.
fn example_target() -> ExampleTarget {
ExampleTarget {
ntor_key: hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253")
.into(),
protovers: "FlowCtrl=1".parse().unwrap(),
ed_id: [6_u8; 32].into(),
rsa_id: [10_u8; 20].into(),
fn example_ntor_key() -> crate::crypto::handshake::ntor::NtorSecretKey {
crate::crypto::handshake::ntor::NtorSecretKey::new(
hex!("7789d92a89711a7e2874c61ea495452cfd48627b3ca2ea9546aafa5bf7b55803").into(),
hex!("395cb26b83b3cd4b91dba9913e562ae87d21ecdd56843da7ca939a6a69001253").into(),
[10_u8; 20].into(),
)
fn working_fake_channel<R: Runtime>(
rt: &R,
) -> (
Channel,
Receiver<ChanCell>,
Sender<std::result::Result<ChanCell, CodecError>>,
) {
let (channel, chan_reactor, rx, tx) = new_reactor();
rt.spawn(async {
let _ignore = chan_reactor.run().await;
.unwrap();
(channel, rx, tx)
async fn test_create<R: Runtime>(rt: &R, fast: bool) {
// We want to try progressing from a pending circuit to a circuit
// via a crate_fast handshake.
use crate::crypto::handshake::{fast::CreateFastServer, ntor::NtorServer, ServerHandshake};
let (chan, mut rx, _sink) = working_fake_channel(rt);
let circid = 128.into();
let (created_send, created_recv) = oneshot::channel();
let (_circmsg_send, circmsg_recv) = mpsc::channel(64);
let unique_id = UniqId::new(23, 17);
let (pending, reactor) =
PendingClientCirc::new(circid, chan, created_recv, circmsg_recv, unique_id);
let _ignore = reactor.run().await;
// Future to pretend to be a relay on the other end of the circuit.
let simulate_relay_fut = async move {
let mut rng = rand::thread_rng();
let create_cell = rx.next().await.unwrap();
assert_eq!(create_cell.circid(), 128.into());
let reply = if fast {
let cf = match create_cell.msg() {
ChanMsg::CreateFast(cf) => cf,
_ => panic!(),
let (_, rep) = CreateFastServer::server(&mut rng, &[()], cf.body()).unwrap();
CreateResponse::CreatedFast(CreatedFast::new(rep))
let c2 = match create_cell.msg() {
ChanMsg::Create2(c2) => c2,
let (_, rep) =
NtorServer::server(&mut rng, &[example_ntor_key()], c2.body()).unwrap();
CreateResponse::Created2(Created2::new(rep))
created_send.send(reply).unwrap();
// Future to pretend to be a client.
let client_fut = async move {
let target = example_target();
let params = CircParameters::default();
let ret = if fast {
trace!("doing fast create");
pending.create_firsthop_fast(¶ms).await
trace!("doing ntor create");
pending.create_firsthop_ntor(&target, params).await
trace!("create done: result {:?}", ret);
ret
let (circ, _) = futures::join!(client_fut, simulate_relay_fut);
let _circ = circ.unwrap();
// pfew! We've build a circuit! Let's make sure it has one hop.
/* TODO: reinstate this.
let inner = Arc::get_mut(&mut circuit).unwrap().c.into_inner();
assert_eq!(inner.hops.len(), 1);
*/
#[test]
fn test_create_fast() {
tor_rtcompat::test_with_all_runtimes!(|rt| async move {
test_create(&rt, true).await;
});
fn test_create_ntor() {
test_create(&rt, false).await;
// An encryption layer that doesn't do any crypto. Can be used
// as inbound or outbound, but not both at once.
pub(crate) struct DummyCrypto {
counter_tag: [u8; 20],
counter: u32,
lasthop: bool,
impl DummyCrypto {
fn next_tag(&mut self) -> &[u8; 20] {
#![allow(clippy::identity_op)]
self.counter_tag[0] = ((self.counter >> 0) & 255) as u8;
self.counter_tag[1] = ((self.counter >> 8) & 255) as u8;
self.counter_tag[2] = ((self.counter >> 16) & 255) as u8;
self.counter_tag[3] = ((self.counter >> 24) & 255) as u8;
self.counter += 1;
&self.counter_tag
impl crate::crypto::cell::OutboundClientLayer for DummyCrypto {
fn originate_for(&mut self, _cell: &mut RelayCellBody) -> &[u8] {
self.next_tag()
fn encrypt_outbound(&mut self, _cell: &mut RelayCellBody) {}
impl crate::crypto::cell::InboundClientLayer for DummyCrypto {
fn decrypt_inbound(&mut self, _cell: &mut RelayCellBody) -> Option<&[u8]> {
if self.lasthop {
Some(self.next_tag())
None
pub(crate) fn new(lasthop: bool) -> Self {
DummyCrypto {
counter_tag: [0; 20],
counter: 0,
lasthop,
// Helper: set up a 3-hop circuit with no encryption, where the
// next inbound message seems to come from hop next_msg_from
async fn newcirc_ext<R: Runtime>(
chan: Channel,
next_msg_from: HopNum,
) -> (ClientCirc, mpsc::Sender<ClientCircChanMsg>) {
let (_created_send, created_recv) = oneshot::channel();
let (circmsg_send, circmsg_recv) = mpsc::channel(64);
let PendingClientCirc {
circ,
recvcreated: _,
} = pending;
for idx in 0_u8..3 {
circ.control
.unbounded_send(CtrlMsg::AddFakeHop {
supports_flowctrl_1: true,
fwd_lasthop: idx == 2,
rev_lasthop: idx == next_msg_from.into(),
params,
rx.await.unwrap().unwrap();
(circ, circmsg_send)
async fn newcirc<R: Runtime>(
newcirc_ext(rt, chan, 2.into()).await
// Try sending a cell via send_relay_cell
fn send_simple() {
let (chan, mut rx, _sink) = working_fake_channel(&rt);
let (circ, _send) = newcirc(&rt, chan).await;
let begindir = RelayCell::new(0.into(), RelayMsg::BeginDir);
.unbounded_send(CtrlMsg::SendRelayCell {
hop: 2.into(),
early: false,
cell: begindir,
// Here's what we tried to put on the TLS channel. Note that
// we're using dummy relay crypto for testing convenience.
let rcvd = rx.next().await.unwrap();
assert_eq!(rcvd.circid(), 128.into());
let m = match rcvd.into_circid_and_msg().1 {
ChanMsg::Relay(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
assert!(matches!(m.msg(), RelayMsg::BeginDir));
// NOTE(eta): this test is commented out because it basically tested implementation details
// of the old code which are hard to port to the reactor version, and the behaviour
// is covered by the extend tests anyway, so I don't think it's worth it.
/*
// Try getting a "meta-cell", which is what we're calling those not
// for a specific circuit.
#[async_test]
async fn recv_meta() {
let (chan, _, _sink) = working_fake_channel();
let (circ, mut reactor, mut sink) = newcirc(chan).await;
// 1: Try doing it via handle_meta_cell directly.
let meta_receiver = circ.register_meta_handler(2.into()).await.unwrap();
let extended: RelayMsg = relaymsg::Extended2::new((*b"123").into()).into();
circ.c
.lock()
.await
.handle_meta_cell(2.into(), extended.clone())
let msg = meta_receiver.await.unwrap().unwrap();
assert!(matches!(msg, RelayMsg::Extended2(_)));
// 2: Try doing it via the reactor.
sink.send(rmsg_to_ccmsg(0, extended.clone())).await.unwrap();
reactor.run_once().await.unwrap();
// 3: Try getting a meta cell that we didn't want.
let e = {
.err()
assert_eq!(
format!("{}", e),
"circuit protocol violation: Unexpected EXTENDED2 cell on client circuit"
);
// 3: Try getting a meta from a hop that we didn't want.
let _receiver = circ.register_meta_handler(2.into()).await.unwrap();
.handle_meta_cell(1.into(), extended.clone())
"circuit protocol violation: Unexpected EXTENDED2 cell from hop 1 on client circuit"
fn extend() {
use crate::crypto::handshake::{ntor::NtorServer, ServerHandshake};
let (circ, mut sink) = newcirc(&rt, chan).await;
let extend_fut = async move {
circ.extend_ntor(&target, ¶ms).await.unwrap();
circ // gotta keep the circ alive, or the reactor would exit.
let reply_fut = async move {
// We've disabled encryption on this circuit, so we can just
// read the extend2 cell.
let (id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
assert_eq!(id, 128.into());
let rmsg = match chmsg {
ChanMsg::RelayEarly(r) => RelayCell::decode(r.into_relay_body()).unwrap(),
let e2 = match rmsg.msg() {
RelayMsg::Extend2(e2) => e2,
let mut rng = thread_rng();
let (_, reply) =
NtorServer::server(&mut rng, &[example_ntor_key()], e2.handshake()).unwrap();
let extended2 = relaymsg::Extended2::new(reply).into();
sink.send(rmsg_to_ccmsg(0, extended2)).await.unwrap();
sink // gotta keep the sink alive, or the reactor will exit.
let (circ, _) = futures::join!(extend_fut, reply_fut);
// Did we really add another hop?
assert_eq!(circ.n_hops(), 4);
async fn bad_extend_test_impl<R: Runtime>(
reply_hop: HopNum,
bad_reply: ClientCircChanMsg,
) -> Error {
let (chan, _rx, _sink) = working_fake_channel(rt);
let (circ, mut sink) = newcirc_ext(rt, chan, reply_hop).await;
#[allow(clippy::clone_on_copy)]
let rtc = rt.clone();
let sink_handle = rt
.spawn_with_handle(async move {
rtc.sleep(Duration::from_millis(100)).await;
sink.send(bad_reply).await.unwrap();
sink
let outcome = circ.extend_ntor(&target, ¶ms).await;
let _sink = sink_handle.await;
assert_eq!(circ.n_hops(), 3);
assert!(outcome.is_err());
outcome.unwrap_err()
fn bad_extend_wronghop() {
let extended2 = relaymsg::Extended2::new(vec![]).into();
let cc = rmsg_to_ccmsg(0, extended2);
let error = bad_extend_test_impl(&rt, 1.into(), cc).await;
// This case shows up as a CircDestroy, since a message sent
// from the wrong hop won't even be delivered to the extend
// code's meta-handler. Instead the unexpected message will cause
// the circuit to get torn down.
match error {
Error::CircuitClosed => {}
x => panic!("got other error: {}", x),
fn bad_extend_wrongtype() {
let extended = relaymsg::Extended::new(vec![7; 200]).into();
let cc = rmsg_to_ccmsg(0, extended);
let error = bad_extend_test_impl(&rt, 2.into(), cc).await;
Error::CircProto(s) => {
assert_eq!(s, "wanted EXTENDED2; got EXTENDED");
fn bad_extend_destroy() {
let cc = ClientCircChanMsg::Destroy(chanmsg::Destroy::new(4.into()));
fn bad_extend_crypto() {
let extended2 = relaymsg::Extended2::new(vec![99; 256]).into();
assert!(matches!(error, Error::BadCircHandshake));
fn begindir() {
let begin_and_send_fut = async move {
// Here we'll say we've got a circuit, and we want to
// make a simple BEGINDIR request with it.
let mut stream = circ.begin_dir_stream().await.unwrap();
stream.write_all(b"HTTP/1.0 GET /\r\n").await.unwrap();
stream.flush().await.unwrap();
let mut buf = [0_u8; 1024];
let n = stream.read(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"HTTP/1.0 404 Not found\r\n");
assert_eq!(n, 0);
stream
// read the begindir cell.
assert_eq!(id, 128.into()); // hardcoded circid.
let (streamid, rmsg) = rmsg.into_streamid_and_msg();
assert!(matches!(rmsg, RelayMsg::BeginDir));
// Reply with a Connected cell to indicate success.
let connected = relaymsg::Connected::new_empty().into();
sink.send(rmsg_to_ccmsg(streamid, connected)).await.unwrap();
// Now read a DATA cell...
let (streamid_2, rmsg) = rmsg.into_streamid_and_msg();
assert_eq!(streamid_2, streamid);
if let RelayMsg::Data(d) = rmsg {
assert_eq!(d.as_ref(), &b"HTTP/1.0 GET /\r\n"[..]);
panic!();
// Write another data cell in reply!
let data = relaymsg::Data::new(b"HTTP/1.0 404 Not found\r\n")
sink.send(rmsg_to_ccmsg(streamid, data)).await.unwrap();
// Send an END cell to say that the conversation is over.
let end = relaymsg::End::new_with_reason(relaymsg::EndReason::DONE).into();
sink.send(rmsg_to_ccmsg(streamid, end)).await.unwrap();
(rx, sink) // gotta keep these alive, or the reactor will exit.
let (_stream, (_rx, _sink)) = futures::join!(begin_and_send_fut, reply_fut);
// Set up a circuit and stream that expects some incoming SENDMEs.
async fn setup_incoming_sendme_case<R: Runtime>(
n_to_send: usize,
ClientCirc,
DataStream,
mpsc::Sender<ClientCircChanMsg>,
StreamId,
usize,
let (chan, mut rx, sink2) = working_fake_channel(rt);
let (circ, mut sink) = newcirc(rt, chan).await;
let circ_clone = circ.clone();
// Take our circuit and make a stream on it.
let mut stream = circ_clone
.begin_stream("www.example.com", 443, None)
let junk = [0_u8; 1024];
let mut remaining = n_to_send;
while remaining > 0 {
let n = std::cmp::min(remaining, junk.len());
stream.write_all(&junk[..n]).await.unwrap();
remaining -= n;
let receive_fut = async move {
// Read the begindir cell.
let (_id, chmsg) = rx.next().await.unwrap().into_circid_and_msg();
assert!(matches!(rmsg, RelayMsg::Begin(_)));
// Reply with a connected cell...
// Now read bytes from the stream until we have them all.
let mut bytes_received = 0_usize;
let mut cells_received = 0_usize;
while bytes_received < n_to_send {
// Read a data cell, and remember how much we got.
let (streamid2, rmsg) = rmsg.into_streamid_and_msg();
assert_eq!(streamid2, streamid);
if let RelayMsg::Data(dat) = rmsg {
cells_received += 1;
bytes_received += dat.as_ref().len();
(sink, streamid, cells_received, rx)
let (stream, (sink, streamid, cells_received, rx)) =
futures::join!(begin_and_send_fut, receive_fut);
(circ, stream, sink, streamid, cells_received, rx, sink2)
fn accept_valid_sendme() {
let (circ, _stream, mut sink, streamid, cells_received, _rx, _sink2) =
setup_incoming_sendme_case(&rt, 300 * 498 + 3).await;
assert_eq!(cells_received, 301);
// Make sure that the circuit is indeed expecting the right sendmes
.unbounded_send(CtrlMsg::QuerySendWindow {
let (window, tags) = rx.await.unwrap().unwrap();
assert_eq!(window, 1000 - 301);
assert_eq!(tags.len(), 3);
// 100
tags[0],
sendme::CircTag::from(hex!("6400000000000000000000000000000000000000"))
// 200
tags[1],
sendme::CircTag::from(hex!("c800000000000000000000000000000000000000"))
// 300
tags[2],
sendme::CircTag::from(hex!("2c01000000000000000000000000000000000000"))
let reply_with_sendme_fut = async move {
// make and send a circuit-level sendme.
let c_sendme =
relaymsg::Sendme::new_tag(hex!("6400000000000000000000000000000000000000"))
sink.send(rmsg_to_ccmsg(0_u16, c_sendme)).await.unwrap();
// Make and send a stream-level sendme.
let s_sendme = relaymsg::Sendme::new_empty().into();
sink.send(rmsg_to_ccmsg(streamid, s_sendme)).await.unwrap();
let _sink = reply_with_sendme_fut.await;
// FIXME(eta): this is a hacky way of waiting for the reactor to run before doing the below
// query; should find some way to properly synchronize to avoid flakiness
rt.sleep(Duration::from_millis(100)).await;
// Now make sure that the circuit is still happy, and its
// window is updated.
let (window, _tags) = rx.await.unwrap().unwrap();
assert_eq!(window, 1000 - 201);
fn invalid_circ_sendme() {
// Same setup as accept_valid_sendme() test above but try giving
// a sendme with the wrong tag.
let (circ, _stream, mut sink, _streamid, _cells_received, _rx, _sink2) =
// make and send a circuit-level sendme with a bad tag.
relaymsg::Sendme::new_tag(hex!("FFFF0000000000000000000000000000000000FF"))
let mut tries = 0;
// FIXME(eta): we aren't testing the error message like we used to; however, we can at least
// check whether the reactor dies as a result of receiving invalid data.
while !circ.control.is_closed() {
// TODO: Don't sleep in tests.
tries += 1;
if tries > 10 {
panic!("reactor continued running after invalid sendme");
// TODO: check that the circuit is shut down too
fn basic_params() {
use super::CircParameters;
let mut p = CircParameters::default();
assert_eq!(p.initial_send_window(), 1000);
assert!(p.extend_by_ed25519_id());
assert!(p.set_initial_send_window(500).is_ok());
p.set_extend_by_ed25519_id(false);
assert_eq!(p.initial_send_window(), 500);
assert!(!p.extend_by_ed25519_id());
assert!(p.set_initial_send_window(9000).is_err());