88.49 %
18.84 %
100 %
//! Implement a concrete type to build channels.
use std::io;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use crate::{event::ChanMgrEventSender, Error};
use std::time::Duration;
use tor_error::{bad_api_usage, internal};
use tor_linkspec::{ChanTarget, OwnedChanTarget};
use tor_llcrypto::pk;
use tor_rtcompat::{tls::TlsConnector, Runtime, TcpProvider, TlsProvider};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
use futures::task::SpawnExt;
use futures::StreamExt;
use futures::{FutureExt, TryFutureExt};
/// Time to wait between starting parallel connections to the same relay.
static CONNECTION_DELAY: Duration = Duration::from_millis(150);
/// TLS-based channel builder.
/// This is a separate type so that we can keep our channel management
/// code network-agnostic.
pub(crate) struct ChanBuilder<R: Runtime> {
/// Asynchronous runtime for TLS, TCP, spawning, and timeouts.
runtime: R,
/// Used to update our bootstrap reporting status.
event_sender: Mutex<ChanMgrEventSender>,
/// Object to build TLS connections.
tls_connector: <R as TlsProvider<R::TcpStream>>::Connector,
impl<R: Runtime> ChanBuilder<R> {
/// Construct a new ChanBuilder.
pub(crate) fn new(runtime: R, event_sender: ChanMgrEventSender) -> Self {
let tls_connector = runtime.tls_connector();
ChanBuilder {
event_sender: Mutex::new(event_sender),
impl<R: Runtime> crate::mgr::ChannelFactory for ChanBuilder<R> {
type Channel = tor_proto::channel::Channel;
type BuildSpec = OwnedChanTarget;
async fn build_channel(&self, target: &Self::BuildSpec) -> crate::Result<Self::Channel> {
use tor_rtcompat::SleepProviderExt;
// TODO: make this an option. And make a better value.
let five_seconds = std::time::Duration::new(5, 0);
.timeout(five_seconds, self.build_channel_notimeout(target))
/// Connect to one of the addresses in `addrs` by running connections in parallel until one works.
/// This implements a basic version of RFC 8305 "happy eyeballs".
async fn connect_to_one<R: Runtime>(
rt: &R,
addrs: &[SocketAddr],
) -> crate::Result<(<R as TcpProvider>::TcpStream, SocketAddr)> {
// We need *some* addresses to connect to.
if addrs.is_empty() {
return Err(Error::UnusableTarget(bad_api_usage!(
"No addresses for chosen relay"
// Turn each address into a future that waits (i * CONNECTION_DELAY), then
// attempts to connect to the address using the runtime (where i is the
// array index). Shove all of these into a `FuturesUnordered`, polling them
// simultaneously and returning the results in completion order.
// This is basically the concurrent-connection stuff from RFC 8305, ish.
// TODO(eta): sort the addresses first?
let mut connections = addrs
.map(|(i, a)| {
let delay = rt.sleep(CONNECTION_DELAY * i as u32);
delay.then(move |_| {
tracing::info!("Connecting to {}", a);
.map_ok(move |stream| (stream, *a))
.map_err(move |e| (e, *a))
let mut ret = None;
let mut errors = vec![];
while let Some(result) = {
match result {
Ok(s) => {
// We got a stream (and address).
ret = Some(s);
Err((e, a)) => {
// We got a failure on one of the streams. Store the error.
// TODO(eta): ideally we'd start the next connection attempt immediately.
tracing::warn!("Connection to {} failed: {}", a, e);
errors.push((e, a));
// Ensure we don't continue trying to make connections.
ret.ok_or_else(|| Error::ChannelBuild {
addresses: errors.into_iter().map(|(e, a)| (a, Arc::new(e))).collect(),
/// As build_channel, but don't include a timeout.
async fn build_channel_notimeout(
target: &OwnedChanTarget,
) -> crate::Result<tor_proto::channel::Channel> {
use tor_proto::channel::ChannelBuilder;
use tor_rtcompat::tls::CertifiedConn;
// 1. Negotiate the TLS connection.
.expect("Lock poisoned")
let (stream, addr) = connect_to_one(&self.runtime, target.addrs()).await?;
let map_ioe = |action: &'static str| {
move |ioe: io::Error| Error::Io {
peer: addr,
source: ioe.into(),
// TODO: add a random hostname here if it will be used for SNI?
let tls = self
.negotiate_unvalidated(stream, "ignored")
.map_err(map_ioe("TLS negotiation"))?;
let peer_cert = tls
.map_err(map_ioe("TLS certs"))?
.ok_or_else(|| Error::Internal(internal!("TLS connection with no peer certificate")))?;
// 2. Set up the channel.
let mut builder = ChannelBuilder::new();
let chan = builder.launch(tls).connect().await?;
let now = self.runtime.wallclock();
let chan = chan.check(target, &peer_cert, Some(now))?;
let (chan, reactor) = chan.finish().await?;
// 3. Launch a task to run the channel reactor.
.spawn(async {
let _ =;
.map_err(|e| Error::from_spawn("channel reactor", e))?;
impl crate::mgr::AbstractChannel for tor_proto::channel::Channel {
type Ident = pk::ed25519::Ed25519Identity;
fn ident(&self) -> &Self::Ident {
fn is_usable(&self) -> bool {
fn duration_unused(&self) -> Option<Duration> {
mod test {
use super::*;
use crate::{
mgr::{AbstractChannel, ChannelFactory},
use pk::ed25519::Ed25519Identity;
use pk::rsa::RsaIdentity;
use std::time::{Duration, SystemTime};
use tor_proto::channel::Channel;
use tor_rtcompat::{test_with_one_runtime, TcpListener};
use tor_rtmock::{io::LocalStream, net::MockNetwork, MockSleepRuntime};
// Make sure that the builder can build a real channel. To test
// this out, we set up a listener that pretends to have the right
// IP, fake the current time, and use a canned response from
// [`testing::msgs`] crate.
fn build_ok() -> Result<()> {
use crate::testing::msgs;
let orport: SocketAddr = msgs::ADDR.parse().unwrap();
let ed: Ed25519Identity = msgs::ED_ID.into();
let rsa: RsaIdentity = msgs::RSA_ID.into();
let client_addr = "".parse().unwrap();
let tls_cert = msgs::X509_CERT.into();
let target = OwnedChanTarget::new(vec![orport], ed, rsa);
let now = SystemTime::UNIX_EPOCH + Duration::new(msgs::NOW, 0);
test_with_one_runtime!(|rt| async move {
// Stub out the internet so that this connection can work.
let network = MockNetwork::new();
// Set up a client runtime with a given IP
let client_rt = network
// Mock the current time too
let client_rt = MockSleepRuntime::new(client_rt);
// Set up a relay runtime with a different IP
let relay_rt = network
// open a fake TLS listener and be ready to handle a request.
let lis = relay_rt.mock_net().listen_tls(&orport, tls_cert).unwrap();
// Tell the client to believe in a different timestamp.
// Create the channelbuilder that we want to test.
let (snd, _rcv) = crate::event::channel();
let builder = ChanBuilder::new(client_rt, snd);
let (r1, r2): (Result<Channel>, Result<LocalStream>) = futures::join!(
async {
// client-side: build a channel!
// relay-side: accept the channel
// (and pretend to know what we're doing).
let (mut con, addr) = lis.accept().await.expect("accept failed");
assert_eq!(client_addr, addr.ip());
crate::testing::answer_channel_req(&mut con)
.expect("answer failed");
let chan = r1.unwrap();
assert_eq!(chan.ident(), &ed);
// TODO: Write tests for timeout logic, once there is smarter logic.