1
//! A general interface for Tor client usage.
2
//!
3
//! To construct a client, run the [`TorClient::create_bootstrapped`] method.
4
//! Once the client is bootstrapped, you can make anonymous
5
//! connections ("streams") over the Tor network using
6
//! [`TorClient::connect`].
7
use crate::address::IntoTorAddr;
8

            
9
use crate::config::{ClientAddrConfig, StreamTimeoutConfig, TorClientConfig};
10
use tor_circmgr::{DirInfo, IsolationToken, StreamIsolationBuilder, TargetPort};
11
use tor_config::MutCfg;
12
use tor_dirmgr::DirEvent;
13
use tor_persist::{FsStateMgr, StateMgr};
14
use tor_proto::circuit::ClientCirc;
15
use tor_proto::stream::{DataStream, IpVersionPreference, StreamParameters};
16
use tor_rtcompat::{PreferredRuntime, Runtime, SleepProviderExt};
17

            
18
use educe::Educe;
19
use futures::lock::Mutex as AsyncMutex;
20
use futures::stream::StreamExt;
21
use futures::task::SpawnExt;
22
use std::convert::TryInto;
23
use std::net::IpAddr;
24
use std::result::Result as StdResult;
25
use std::sync::{Arc, Mutex, Weak};
26
use std::time::Duration;
27

            
28
use crate::err::ErrorDetail;
29
use crate::{status, util, TorClientBuilder};
30
use tracing::{debug, error, info, warn};
31

            
32
/// An active client session on the Tor network.
33
///
34
/// While it's running, it will fetch directory information, build
35
/// circuits, and make connections for you.
36
///
37
/// Cloning this object makes a new reference to the same underlying
38
/// handles: it's usually better to clone the `TorClient` than it is to
39
/// create a new one.
40
// TODO(nickm): This type now has 5 Arcs inside it, and 2 types that have
41
// implicit Arcs inside them! maybe it's time to replace much of the insides of
42
// this with an Arc<TorClientInner>?
43
#[derive(Clone)]
44
pub struct TorClient<R: Runtime> {
45
    /// Asynchronous runtime object.
46
    runtime: R,
47
    /// Default isolation token for streams through this client.
48
    ///
49
    /// This is eventually used for `owner_token` in `tor-circmgr/src/usage.rs`, and is orthogonal
50
    /// to the `stream_token` which comes from `connect_prefs` (or a passed-in `StreamPrefs`).
51
    /// (ie, both must be the same to share a circuit).
52
    client_isolation: IsolationToken,
53
    /// Connection preferences.  Starts out as `Default`,  Inherited by our clones.
54
    connect_prefs: StreamPrefs,
55
    /// Circuit manager for keeping our circuits up to date and building
56
    /// them on-demand.
57
    circmgr: Arc<tor_circmgr::CircMgr<R>>,
58
    /// Directory manager for keeping our directory material up to date.
59
    dirmgr: Arc<dyn tor_dirmgr::DirProvider + Send + Sync>,
60
    /// Location on disk where we store persistent data.
61
    statemgr: FsStateMgr,
62
    /// Client address configuration
63
    addrcfg: Arc<MutCfg<ClientAddrConfig>>,
64
    /// Client DNS configuration
65
    timeoutcfg: Arc<MutCfg<StreamTimeoutConfig>>,
66
    /// Mutex used to serialize concurrent attempts to reconfigure a TorClient.
67
    ///
68
    /// See [`TorClient::reconfigure`] for more information on its use.
69
    reconfigure_lock: Arc<Mutex<()>>,
70

            
71
    /// A stream of bootstrap messages that we can clone when a client asks for
72
    /// it.
73
    ///
74
    /// (We don't need to observe this stream ourselves, since it drops each
75
    /// unobserved status change when the next status change occurs.)
76
    status_receiver: status::BootstrapEvents,
77

            
78
    /// mutex used to prevent two tasks from trying to bootstrap at once.
79
    bootstrap_in_progress: Arc<AsyncMutex<()>>,
80

            
81
    /// Whether or not we should call `bootstrap` before doing things that require
82
    /// bootstrapping. If this is `false`, we will just call `wait_for_bootstrap`
83
    /// instead.
84
    should_bootstrap: BootstrapBehavior,
85
}
86

            
87
/// Preferences for whether a [`TorClient`] should bootstrap on its own or not.
88
2
#[derive(Debug, Copy, Clone, PartialEq, Eq, Educe)]
89
#[educe(Default)]
90
#[non_exhaustive]
91
pub enum BootstrapBehavior {
92
    /// Bootstrap the client automatically when requests are made that require the client to be
93
    /// bootstrapped.
94
    #[educe(Default)]
95
    OnDemand,
96
    /// Make no attempts to automatically bootstrap. [`TorClient::bootstrap`] must be manually
97
    /// invoked in order for the [`TorClient`] to become useful.
98
    ///
99
    /// Attempts to use the client (e.g. by creating connections or resolving hosts over the Tor
100
    /// network) before calling [`bootstrap`](TorClient::bootstrap) will fail, and
101
    /// return an error that has kind [`ErrorKind::BootstrapRequired`](crate::ErrorKind::BootstrapRequired).
102
    Manual,
103
}
104

            
105
/// Preferences for how to route a stream over the Tor network.
106
2
#[derive(Debug, Clone, Default)]
107
pub struct StreamPrefs {
108
    /// What kind of IPv6/IPv4 we'd prefer, and how strongly.
109
    ip_ver_pref: IpVersionPreference,
110
    /// How should we isolate connection(s) ?
111
    isolation: StreamIsolationPreference,
112
    /// Whether to return the stream optimistically.
113
    optimistic_stream: bool,
114
}
115

            
116
/// Record of how we are isolating connections
117
2
#[derive(Debug, Clone, Educe)]
118
#[educe(Default)]
119
enum StreamIsolationPreference {
120
    /// No additional isolation
121
    #[educe(Default)]
122
    None,
123
    /// Id of the isolation group the connection should be part of
124
    Explicit(IsolationToken),
125
    /// Isolate every connection!
126
    EveryStream,
127
}
128

            
129
impl StreamPrefs {
130
    /// Construct a new StreamPrefs.
131
    pub fn new() -> Self {
132
        Self::default()
133
    }
134

            
135
    /// Indicate that a stream may be made over IPv4 or IPv6, but that
136
    /// we'd prefer IPv6.
137
    pub fn ipv6_preferred(&mut self) -> &mut Self {
138
        self.ip_ver_pref = IpVersionPreference::Ipv6Preferred;
139
        self
140
    }
141

            
142
    /// Indicate that a stream may only be made over IPv6.
143
    ///
144
    /// When this option is set, we will only pick exit relays that
145
    /// support IPv6, and we will tell them to only give us IPv6
146
    /// connections.
147
    pub fn ipv6_only(&mut self) -> &mut Self {
148
        self.ip_ver_pref = IpVersionPreference::Ipv6Only;
149
        self
150
    }
151

            
152
    /// Indicate that a stream may be made over IPv4 or IPv6, but that
153
    /// we'd prefer IPv4.
154
    ///
155
    /// This is the default.
156
    pub fn ipv4_preferred(&mut self) -> &mut Self {
157
        self.ip_ver_pref = IpVersionPreference::Ipv4Preferred;
158
        self
159
    }
160

            
161
    /// Indicate that a stream may only be made over IPv4.
162
    ///
163
    /// When this option is set, we will only pick exit relays that
164
    /// support IPv4, and we will tell them to only give us IPv4
165
    /// connections.
166
    pub fn ipv4_only(&mut self) -> &mut Self {
167
        self.ip_ver_pref = IpVersionPreference::Ipv4Only;
168
        self
169
    }
170

            
171
    /// Indicate that the stream should be opened "optimistically".
172
    ///
173
    /// By default, streams are not "optimistic". When you call
174
    /// [`TorClient::connect()`], it won't give you a stream until the
175
    /// exit node has confirmed that it has successfully opened a
176
    /// connection to your target address.  It's safer to wait in this
177
    /// way, but it is slower: it takes an entire round trip to get
178
    /// your confirmation.
179
    ///
180
    /// If a stream _is_ configured to be "optimistic", on the other
181
    /// hand, then `TorClient::connect()` will return the stream
182
    /// immediately, without waiting for an answer from the exit.  You
183
    /// can start sending data on the stream right away, though of
184
    /// course this data will be lost if the connection is not
185
    /// actually successful.
186
    pub fn optimistic(&mut self) -> &mut Self {
187
        self.optimistic_stream = true;
188
        self
189
    }
190

            
191
    /// Return a TargetPort to describe what kind of exit policy our
192
    /// target circuit needs to support.
193
1
    fn wrap_target_port(&self, port: u16) -> TargetPort {
194
1
        match self.ip_ver_pref {
195
            IpVersionPreference::Ipv6Only => TargetPort::ipv6(port),
196
1
            _ => TargetPort::ipv4(port),
197
        }
198
1
    }
199

            
200
    /// Return a new StreamParameters based on this configuration.
201
    fn stream_parameters(&self) -> StreamParameters {
202
        let mut params = StreamParameters::default();
203
        params
204
            .ip_version(self.ip_ver_pref)
205
            .optimistic(self.optimistic_stream);
206
        params
207
    }
208

            
209
    /// Indicate which other connections might use the same circuit
210
    /// as this one.
211
    ///
212
    /// By default all connections made on all clones of a `TorClient` may share connections.
213
    /// Connections made with a particular `isolation_group` may share circuits with each other.
214
    ///
215
    /// This connection preference is orthogonal to isolation established by
216
    /// [`TorClient::isolated_client`].  Connections made with an `isolated_client` (and its
217
    /// clones) will not share circuits with the original client, even if the same
218
    /// `isolation_group` is specified via the `ConnectionPrefs` in force.
219
    pub fn set_isolation_group(&mut self, isolation_group: IsolationToken) -> &mut Self {
220
        self.isolation = StreamIsolationPreference::Explicit(isolation_group);
221
        self
222
    }
223

            
224
    /// Indicate that connections with these preferences should have their own isolation group
225
    ///
226
    /// This is a convenience method which creates a fresh [`IsolationToken`]
227
    /// and sets it for these preferences.
228
    ///
229
    /// This connection preference is orthogonal to isolation established by
230
    /// [`TorClient::isolated_client`].  Connections made with an `isolated_client` (and its
231
    /// clones) will not share circuits with the original client, even if the same
232
    /// `isolation_group` is specified via the `ConnectionPrefs` in force.
233
    pub fn new_isolation_group(&mut self) -> &mut Self {
234
        self.isolation = StreamIsolationPreference::Explicit(IsolationToken::new());
235
        self
236
    }
237

            
238
    /// Indicate that no connection should share a circuit with any other.
239
    ///
240
    /// **Use with care:** This is likely to have poor performance, and imposes a much greater load
241
    /// on the Tor network.  Use this option only to make small numbers of connections each of
242
    /// which needs to be isolated from all other connections.
243
    ///
244
    /// (Don't just use this as a "get more privacy!!" method: the circuits
245
    /// that it put connections on will have no more privacy than any other
246
    /// circuits.  The only benefit is that these circuits will not be shared
247
    /// by multiple streams.)
248
    ///
249
    /// This can be undone by calling `set_isolation_group` or `new_isolation_group` on these
250
    /// preferences.
251
    pub fn isolate_every_stream(&mut self) -> &mut Self {
252
        self.isolation = StreamIsolationPreference::EveryStream;
253
        self
254
    }
255

            
256
    /// Return a token to describe which connections might use
257
    /// the same circuit as this one.
258
    fn isolation_group(&self) -> Option<IsolationToken> {
259
        use StreamIsolationPreference as SIP;
260
        match self.isolation {
261
            SIP::None => None,
262
            SIP::Explicit(ig) => Some(ig),
263
            SIP::EveryStream => Some(IsolationToken::new()),
264
        }
265
    }
266

            
267
    // TODO: Add some way to be IPFlexible, and require exit to support both.
268
}
269

            
270
impl TorClient<PreferredRuntime> {
271
    /// Bootstrap a connection to the Tor network, using the provided `config`.
272
    ///
273
    /// Returns a client once there is enough directory material to
274
    /// connect safely over the Tor network.
275
    ///
276
    /// Consider using [`TorClient::builder`] for more fine-grained control.
277
    ///
278
    /// # Panics
279
    ///
280
    /// If Tokio is being used (the default), panics if created outside the context of a currently
281
    /// running Tokio runtime. See the documentation for [`PreferredRuntime::current`] for
282
    /// more information.
283
    ///
284
    /// If using `async-std`, either take care to ensure Arti is not compiled with Tokio support,
285
    /// or manually create an `async-std` runtime using [`tor_rtcompat`] and use it with
286
    /// [`TorClient::with_runtime`].
287
    pub async fn create_bootstrapped(config: TorClientConfig) -> crate::Result<Self> {
288
        let runtime = PreferredRuntime::current()
289
            .expect("TorClient could not get an asynchronous runtime; are you running in the right context?");
290

            
291
        Self::with_runtime(runtime)
292
            .config(config)
293
            .create_bootstrapped()
294
            .await
295
    }
296

            
297
    /// Return a new builder for creating TorClient objects.
298
    ///
299
    /// If you want to make a [`TorClient`] synchronously, this is what you want; call
300
    /// `TorClientBuilder::create_unbootstrapped` on the returned builder.
301
    ///
302
    /// # Panics
303
    ///
304
    /// If Tokio is being used (the default), panics if created outside the context of a currently
305
    /// running Tokio runtime. See the documentation for `tokio::runtime::Handle::current` for
306
    /// more information.
307
    ///
308
    /// If using `async-std`, either take care to ensure Arti is not compiled with Tokio support,
309
    /// or manually create an `async-std` runtime using [`tor_rtcompat`] and use it with
310
    /// [`TorClient::with_runtime`].
311
    pub fn builder() -> TorClientBuilder<PreferredRuntime> {
312
        let runtime = PreferredRuntime::current()
313
            .expect("TorClient could not get an asynchronous runtime; are you running in the right context?");
314

            
315
        TorClientBuilder::new(runtime)
316
    }
317
}
318

            
319
impl<R: Runtime> TorClient<R> {
320
    /// Return a new builder for creating TorClient objects, with a custom provided [`Runtime`].
321
    ///
322
    /// See the [`tor_rtcompat`] crate for more information on custom runtimes.
323
2
    pub fn with_runtime(runtime: R) -> TorClientBuilder<R> {
324
2
        TorClientBuilder::new(runtime)
325
2
    }
326

            
327
    /// Implementation of `create_unbootstrapped`, split out in order to avoid manually specifying
328
    /// double error conversions.
329
2
    pub(crate) fn create_inner(
330
2
        runtime: R,
331
2
        config: TorClientConfig,
332
2
        autobootstrap: BootstrapBehavior,
333
2
        dirmgr_builder: &dyn crate::builder::DirProviderBuilder<R>,
334
2
    ) -> StdResult<Self, ErrorDetail> {
335
2
        let circ_cfg = config.get_circmgr_config()?;
336
2
        let dir_cfg = config.get_dirmgr_config()?;
337
2
        let statemgr = FsStateMgr::from_path(config.storage.expand_state_dir()?)?;
338
2
        let addr_cfg = config.address_filter.clone();
339
2
        let timeout_cfg = config.stream_timeouts;
340
2

            
341
2
        let (status_sender, status_receiver) = postage::watch::channel();
342
2
        let status_receiver = status::BootstrapEvents {
343
2
            inner: status_receiver,
344
2
        };
345
2
        let chanmgr = Arc::new(tor_chanmgr::ChanMgr::new(runtime.clone()));
346
2
        let circmgr =
347
2
            tor_circmgr::CircMgr::new(circ_cfg, statemgr.clone(), &runtime, Arc::clone(&chanmgr))
348
2
                .map_err(ErrorDetail::CircMgrSetup)?;
349

            
350
2
        let dirmgr = dirmgr_builder
351
2
            .build(runtime.clone(), Arc::clone(&circmgr), dir_cfg)
352
2
            .map_err(crate::Error::into_detail)?;
353

            
354
2
        let conn_status = chanmgr.bootstrap_events();
355
2
        let dir_status = dirmgr.bootstrap_events();
356
2
        runtime
357
2
            .spawn(status::report_status(
358
2
                status_sender,
359
2
                conn_status,
360
2
                dir_status,
361
2
            ))
362
2
            .map_err(|e| ErrorDetail::from_spawn("top-level status reporter", e))?;
363

            
364
2
        runtime
365
2
            .spawn(continually_expire_channels(
366
2
                runtime.clone(),
367
2
                Arc::downgrade(&chanmgr),
368
2
            ))
369
2
            .map_err(|e| ErrorDetail::from_spawn("channel expiration task", e))?;
370

            
371
        // Launch a daemon task to inform the circmgr about new
372
        // network parameters.
373
2
        runtime
374
2
            .spawn(keep_circmgr_params_updated(
375
2
                dirmgr.events(),
376
2
                Arc::downgrade(&circmgr),
377
2
                Arc::downgrade(&dirmgr),
378
2
            ))
379
2
            .map_err(|e| ErrorDetail::from_spawn("circmgr parameter updater", e))?;
380

            
381
2
        runtime
382
2
            .spawn(update_persistent_state(
383
2
                runtime.clone(),
384
2
                Arc::downgrade(&circmgr),
385
2
                statemgr.clone(),
386
2
            ))
387
2
            .map_err(|e| ErrorDetail::from_spawn("persistent state updater", e))?;
388

            
389
2
        runtime
390
2
            .spawn(continually_launch_timeout_testing_circuits(
391
2
                runtime.clone(),
392
2
                Arc::downgrade(&circmgr),
393
2
                Arc::downgrade(&dirmgr),
394
2
            ))
395
2
            .map_err(|e| ErrorDetail::from_spawn("timeout-probe circuit launcher", e))?;
396

            
397
2
        runtime
398
2
            .spawn(continually_preemptively_build_circuits(
399
2
                runtime.clone(),
400
2
                Arc::downgrade(&circmgr),
401
2
                Arc::downgrade(&dirmgr),
402
2
            ))
403
2
            .map_err(|e| ErrorDetail::from_spawn("preemptive circuit launcher", e))?;
404

            
405
2
        let client_isolation = IsolationToken::new();
406
2

            
407
2
        Ok(TorClient {
408
2
            runtime,
409
2
            client_isolation,
410
2
            connect_prefs: Default::default(),
411
2
            circmgr,
412
2
            dirmgr,
413
2
            statemgr,
414
2
            addrcfg: Arc::new(addr_cfg.into()),
415
2
            timeoutcfg: Arc::new(timeout_cfg.into()),
416
2
            reconfigure_lock: Arc::new(Mutex::new(())),
417
2
            status_receiver,
418
2
            bootstrap_in_progress: Arc::new(AsyncMutex::new(())),
419
2
            should_bootstrap: autobootstrap,
420
2
        })
421
2
    }
422

            
423
    /// Bootstrap a connection to the Tor network, with a client created by `create_unbootstrapped`.
424
    ///
425
    /// Since cloned copies of a `TorClient` share internal state, you can bootstrap a client by
426
    /// cloning it and running this function in a background task (or similar). This function
427
    /// only needs to be called on one client in order to bootstrap all of its clones.
428
    ///
429
    /// Returns once there is enough directory material to connect safely over the Tor network.
430
    /// If the client or one of its clones has already been bootstrapped, returns immediately with
431
    /// success. If a bootstrap is in progress, waits for it to finish, then retries it if it
432
    /// failed (returning success if it succeeded).
433
    ///
434
    /// Bootstrap progress can be tracked by listening to the event receiver returned by
435
    /// [`bootstrap_events`](TorClient::bootstrap_events).
436
    ///
437
    /// # Failures
438
    ///
439
    /// If the bootstrapping process fails, returns an error. This function can safely be called
440
    /// again later to attempt to bootstrap another time.
441
    pub async fn bootstrap(&self) -> crate::Result<()> {
442
        self.bootstrap_inner().await.map_err(ErrorDetail::into)
443
    }
444

            
445
    /// Implementation of `bootstrap`, split out in order to avoid manually specifying
446
    /// double error conversions.
447
    async fn bootstrap_inner(&self) -> StdResult<(), ErrorDetail> {
448
        // Wait for an existing bootstrap attempt to finish first.
449
        //
450
        // This is a futures::lock::Mutex, so it's okay to await while we hold it.
451
        let _bootstrap_lock = self.bootstrap_in_progress.lock().await;
452

            
453
        if self.statemgr.try_lock()?.held() {
454
            debug!("It appears we have the lock on our state files.");
455
        } else {
456
            info!(
457
                "Another process has the lock on our state files. We'll proceed in read-only mode."
458
            );
459
        }
460

            
461
        // If we fail to bootstrap (i.e. we return before the disarm() point below), attempt to
462
        // unlock the state files.
463
        let unlock_guard = util::StateMgrUnlockGuard::new(&self.statemgr);
464

            
465
        self.dirmgr.bootstrap().await?;
466

            
467
        self.circmgr.update_network_parameters(
468
            self.dirmgr
469
                .latest_netdir()
470
                .ok_or(ErrorDetail::DirMgr(tor_dirmgr::Error::DirectoryNotPresent))?
471
                .params(),
472
        );
473

            
474
        // Since we succeeded, disarm the unlock guard.
475
        unlock_guard.disarm();
476

            
477
        Ok(())
478
    }
479

            
480
    /// ## For `BootstrapBehavior::Ondemand` clients
481
    ///
482
    /// Initiate a bootstrap by calling `bootstrap` (which is idempotent, so attempts to
483
    /// bootstrap twice will just do nothing).
484
    ///
485
    /// ## For `BootstrapBehavior::Manual` clients
486
    ///
487
    /// Check whether a bootstrap is in progress; if one is, wait until it finishes
488
    /// and then return. (Otherwise, return immediately.)
489
1
    async fn wait_for_bootstrap(&self) -> StdResult<(), ErrorDetail> {
490
1
        match self.should_bootstrap {
491
            BootstrapBehavior::OnDemand => {
492
                self.bootstrap_inner().await?;
493
            }
494
            BootstrapBehavior::Manual => {
495
                // Grab the lock, and immediately release it.  That will ensure that nobody else is trying to bootstrap.
496
1
                self.bootstrap_in_progress.lock().await;
497
            }
498
        }
499
1
        Ok(())
500
1
    }
501

            
502
    /// Change the configuration of this TorClient to `new_config`.
503
    ///
504
    /// The `how` describes whether to perform an all-or-nothing
505
    /// reconfiguration: either all of the configuration changes will be
506
    /// applied, or none will. If you have disabled all-or-nothing changes, then
507
    /// only fatal errors will be reported in this function's return value.
508
    ///
509
    /// This function applies its changes to **all** TorClient instances derived
510
    /// from the same call to `TorClient::create_*`: even ones whose circuits
511
    /// are isolated from this handle.
512
    ///
513
    /// # Limitations
514
    ///
515
    /// Although most options are reconfigurable, there are some whose values
516
    /// can't be changed on an a running TorClient.  Those options (or their
517
    /// sections) are explicitly documented not to be changeable.
518
    ///
519
    /// Changing some options do not take effect immediately on all open streams
520
    /// and circuits, but rather affect only future streams and circuits.  Those
521
    /// are also explicitly documented.
522
    pub fn reconfigure(
523
        &self,
524
        new_config: &TorClientConfig,
525
        how: tor_config::Reconfigure,
526
    ) -> crate::Result<()> {
527
        // We need to hold this lock while we're reconfiguring the client: even
528
        // though the individual fields have their own synchronization, we can't
529
        // safely let two threads change them at once.  If we did, then we'd
530
        // introduce time-of-check/time-of-use bugs in checking our configuration,
531
        // deciding how to change it, then applying the changes.
532
        let _guard = self.reconfigure_lock.lock().expect("Poisoned lock");
533

            
534
        match how {
535
            tor_config::Reconfigure::AllOrNothing => {
536
                // We have to check before we make any changes.
537
                self.reconfigure(new_config, tor_config::Reconfigure::CheckAllOrNothing)?;
538
            }
539
            tor_config::Reconfigure::CheckAllOrNothing => {}
540
            tor_config::Reconfigure::WarnOnFailures => {}
541
            _ => {}
542
        }
543

            
544
        let circ_cfg = new_config.get_circmgr_config().map_err(wrap_err)?;
545
        let dir_cfg = new_config.get_dirmgr_config().map_err(wrap_err)?;
546
        let state_cfg = new_config.storage.expand_state_dir().map_err(wrap_err)?;
547
        let addr_cfg = &new_config.address_filter;
548
        let timeout_cfg = &new_config.stream_timeouts;
549

            
550
        if state_cfg != self.statemgr.path() {
551
            how.cannot_change("storage.state_dir").map_err(wrap_err)?;
552
        }
553

            
554
        self.circmgr.reconfigure(&circ_cfg, how).map_err(wrap_err)?;
555
        self.dirmgr.reconfigure(&dir_cfg, how).map_err(wrap_err)?;
556

            
557
        if how == tor_config::Reconfigure::CheckAllOrNothing {
558
            return Ok(());
559
        }
560

            
561
        self.addrcfg.replace(addr_cfg.clone());
562
        self.timeoutcfg.replace(timeout_cfg.clone());
563

            
564
        Ok(())
565
    }
566

            
567
    /// Return a new isolated `TorClient` handle.
568
    ///
569
    /// The two `TorClient`s will share internal state and configuration, but
570
    /// their streams will never share circuits with one another.
571
    ///
572
    /// Use this function when you want separate parts of your program to
573
    /// each have a TorClient handle, but where you don't want their
574
    /// activities to be linkable to one another over the Tor network.
575
    ///
576
    /// Calling this function is usually preferable to creating a
577
    /// completely separate TorClient instance, since it can share its
578
    /// internals with the existing `TorClient`.
579
    ///
580
    /// (Connections made with clones of the returned `TorClient` may
581
    /// share circuits with each other.)
582
    #[must_use]
583
    pub fn isolated_client(&self) -> TorClient<R> {
584
        let mut result = self.clone();
585
        result.client_isolation = IsolationToken::new();
586
        result
587
    }
588

            
589
    /// Launch an anonymized connection to the provided address and port over
590
    /// the Tor network.
591
    ///
592
    /// Note that because Tor prefers to do DNS resolution on the remote side of
593
    /// the network, this function takes its address as a string:
594
    ///
595
    /// ```no_run
596
    /// # use arti_client::*;use tor_rtcompat::Runtime;
597
    /// # async fn ex<R:Runtime>(tor_client: TorClient<R>) -> Result<()> {
598
    /// // The most usual way to connect is via an address-port tuple.
599
    /// let socket = tor_client.connect(("www.example.com", 443)).await?;
600
    ///
601
    /// // You can also specify an address and port as a colon-separated string.
602
    /// let socket = tor_client.connect("www.example.com:443").await?;
603
    /// # Ok(())
604
    /// # }
605
    /// ```
606
    ///
607
    /// Hostnames are _strongly_ preferred here: if this function allowed the
608
    /// caller here to provide an IPAddr or [`IpAddr`] or
609
    /// [`SocketAddr`](std::net::SocketAddr) address, then  
610
    ///
611
    /// ```no_run
612
    /// # use arti_client::*; use tor_rtcompat::Runtime;
613
    /// # async fn ex<R:Runtime>(tor_client: TorClient<R>) -> Result<()> {
614
    /// # use std::net::ToSocketAddrs;
615
    /// // BAD: We're about to leak our target address to the local resolver!
616
    /// let address = "www.example.com:443".to_socket_addrs().unwrap().next().unwrap();
617
    /// // 🤯 Oh no! Now any eavesdropper can tell where we're about to connect! 🤯
618
    ///
619
    /// // Fortunately, this won't compile, since SocketAddr doesn't implement IntoTorAddr.
620
    /// // let socket = tor_client.connect(address).await?;
621
    /// //                                 ^^^^^^^ the trait `IntoTorAddr` is not implemented for `std::net::SocketAddr`
622
    /// # Ok(())
623
    /// # }
624
    /// ```
625
    ///
626
    /// If you really do need to connect to an IP address rather than a
627
    /// hostname, and if you're **sure** that the IP address came from a safe
628
    /// location, there are a few ways to do so.
629
    ///
630
    /// ```no_run
631
    /// # use arti_client::{TorClient,Result};use tor_rtcompat::Runtime;
632
    /// # use std::net::{SocketAddr,IpAddr};
633
    /// # async fn ex<R:Runtime>(tor_client: TorClient<R>) -> Result<()> {
634
    /// # use std::net::ToSocketAddrs;
635
    /// // ⚠️This is risky code!⚠️
636
    /// // (Make sure your addresses came from somewhere safe...)
637
    ///
638
    /// // If we have a fixed address, we can just provide it as a string.
639
    /// let socket = tor_client.connect("192.0.2.22:443").await?;
640
    /// let socket = tor_client.connect(("192.0.2.22", 443)).await?;
641
    ///
642
    /// // If we have a SocketAddr or an IpAddr, we can use the
643
    /// // DangerouslyIntoTorAddr trait.
644
    /// use arti_client::DangerouslyIntoTorAddr;
645
    /// let sockaddr = SocketAddr::from(([192, 0, 2, 22], 443));
646
    /// let ipaddr = IpAddr::from([192, 0, 2, 22]);
647
    /// let socket = tor_client.connect(sockaddr.into_tor_addr_dangerously().unwrap()).await?;
648
    /// let socket = tor_client.connect((ipaddr, 443).into_tor_addr_dangerously().unwrap()).await?;
649
    /// # Ok(())
650
    /// # }
651
    /// ```
652
1
    pub async fn connect<A: IntoTorAddr>(&self, target: A) -> crate::Result<DataStream> {
653
1
        self.connect_with_prefs(target, &self.connect_prefs).await
654
1
    }
655

            
656
    /// Launch an anonymized connection to the provided address and
657
    /// port over the Tor network, with explicit connection preferences.
658
    ///
659
    /// Note that because Tor prefers to do DNS resolution on the remote
660
    /// side of the network, this function takes its address as a string.
661
    /// (See [`TorClient::connect()`] for more information.)
662
1
    pub async fn connect_with_prefs<A: IntoTorAddr>(
663
1
        &self,
664
1
        target: A,
665
1
        prefs: &StreamPrefs,
666
1
    ) -> crate::Result<DataStream> {
667
1
        let addr = target.into_tor_addr().map_err(wrap_err)?;
668
1
        addr.enforce_config(&self.addrcfg.get())?;
669
1
        let (addr, port) = addr.into_string_and_port();
670
1

            
671
1
        let exit_ports = [prefs.wrap_target_port(port)];
672
1
        let circ = self
673
1
            .get_or_launch_exit_circ(&exit_ports, prefs)
674
            .await
675
1
            .map_err(wrap_err)?;
676
        info!("Got a circuit for {}:{}", addr, port);
677

            
678
        let stream_future = circ.begin_stream(&addr, port, Some(prefs.stream_parameters()));
679
        // This timeout is needless but harmless for optimistic streams.
680
        let stream = self
681
            .runtime
682
            .timeout(self.timeoutcfg.get().connect_timeout, stream_future)
683
            .await
684
            .map_err(|_| ErrorDetail::ExitTimeout)?
685
            .map_err(wrap_err)?;
686

            
687
        Ok(stream)
688
1
    }
689

            
690
    /// Sets the default preferences for future connections made with this client.
691
    ///
692
    /// The preferences set with this function will be inherited by clones of this client, but
693
    /// updates to the preferences in those clones will not propagate back to the original.  I.e.,
694
    /// the preferences are copied by `clone`.
695
    ///
696
    /// Connection preferences always override configuration, even configuration set later
697
    /// (eg, by a config reload).
698
    //
699
    // This function is private just because we're not sure we want to provide this API.
700
    // https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/250#note_2771238
701
    fn set_stream_prefs(&mut self, connect_prefs: StreamPrefs) {
702
        self.connect_prefs = connect_prefs;
703
    }
704

            
705
    /// Provides a new handle on this client, but with adjusted default preferences.
706
    ///
707
    /// Connections made with e.g. [`connect`](TorClient::connect) on the returned handle will use
708
    /// `connect_prefs`.  This is a convenience wrapper for `clone` and `set_connect_prefs`.
709
    #[must_use]
710
    pub fn clone_with_prefs(&self, connect_prefs: StreamPrefs) -> Self {
711
        let mut result = self.clone();
712
        result.set_stream_prefs(connect_prefs);
713
        result
714
    }
715

            
716
    /// On success, return a list of IP addresses.
717
    pub async fn resolve(&self, hostname: &str) -> crate::Result<Vec<IpAddr>> {
718
        self.resolve_with_prefs(hostname, &self.connect_prefs).await
719
    }
720

            
721
    /// On success, return a list of IP addresses, but use prefs.
722
    pub async fn resolve_with_prefs(
723
        &self,
724
        hostname: &str,
725
        prefs: &StreamPrefs,
726
    ) -> crate::Result<Vec<IpAddr>> {
727
        let addr = (hostname, 0).into_tor_addr().map_err(wrap_err)?;
728
        addr.enforce_config(&self.addrcfg.get()).map_err(wrap_err)?;
729

            
730
        let circ = self.get_or_launch_exit_circ(&[], prefs).await?;
731

            
732
        let resolve_future = circ.resolve(hostname);
733
        let addrs = self
734
            .runtime
735
            .timeout(self.timeoutcfg.get().resolve_timeout, resolve_future)
736
            .await
737
            .map_err(|_| ErrorDetail::ExitTimeout)?
738
            .map_err(wrap_err)?;
739

            
740
        Ok(addrs)
741
    }
742

            
743
    /// Perform a remote DNS reverse lookup with the provided IP address.
744
    ///
745
    /// On success, return a list of hostnames.
746
    pub async fn resolve_ptr(&self, addr: IpAddr) -> crate::Result<Vec<String>> {
747
        self.resolve_ptr_with_prefs(addr, &self.connect_prefs).await
748
    }
749

            
750
    /// Perform a remote DNS reverse lookup with the provided IP address.
751
    ///
752
    /// On success, return a list of hostnames.
753
    pub async fn resolve_ptr_with_prefs(
754
        &self,
755
        addr: IpAddr,
756
        prefs: &StreamPrefs,
757
    ) -> crate::Result<Vec<String>> {
758
        let circ = self.get_or_launch_exit_circ(&[], prefs).await?;
759

            
760
        let resolve_ptr_future = circ.resolve_ptr(addr);
761
        let hostnames = self
762
            .runtime
763
            .timeout(
764
                self.timeoutcfg.get().resolve_ptr_timeout,
765
                resolve_ptr_future,
766
            )
767
            .await
768
            .map_err(|_| ErrorDetail::ExitTimeout)?
769
            .map_err(wrap_err)?;
770

            
771
        Ok(hostnames)
772
    }
773

            
774
    /// Return a reference to this this client's directory manager.
775
    ///
776
    /// This function is unstable. It is only enabled if the crate was
777
    /// built with the `experimental-api` feature.
778
    #[cfg(feature = "experimental-api")]
779
    pub fn dirmgr(&self) -> &Arc<dyn tor_dirmgr::DirProvider + Send + Sync> {
780
        &self.dirmgr
781
    }
782

            
783
    /// Return a reference to this this client's circuit manager.
784
    ///
785
    /// This function is unstable. It is only enabled if the crate was
786
    /// built with the `experimental-api` feature.
787
    #[cfg(feature = "experimental-api")]
788
    pub fn circmgr(&self) -> &Arc<tor_circmgr::CircMgr<R>> {
789
        &self.circmgr
790
    }
791

            
792
    /// Return a reference to the runtime being used by this client.
793
    //
794
    // This API is not a hostage to fortune since we already require that R: Clone,
795
    // and necessarily a TorClient must have a clone of it.
796
    //
797
    // We provide it simply to save callers who have a TorClient from
798
    // having to separately keep their own handle,
799
    pub fn runtime(&self) -> &R {
800
        &self.runtime
801
    }
802

            
803
    /// Get or launch an exit-suitable circuit with a given set of
804
    /// exit ports.
805
1
    async fn get_or_launch_exit_circ(
806
1
        &self,
807
1
        exit_ports: &[TargetPort],
808
1
        prefs: &StreamPrefs,
809
1
    ) -> StdResult<ClientCirc, ErrorDetail> {
810
1
        self.wait_for_bootstrap().await?;
811
1
        let dir = self
812
1
            .dirmgr
813
1
            .latest_netdir()
814
1
            .ok_or(ErrorDetail::BootstrapRequired {
815
1
                action: "launch a circuit",
816
1
            })?;
817

            
818
        let isolation = {
819
            let mut b = StreamIsolationBuilder::new();
820
            // Always consider our client_isolation.
821
            b.owner_token(self.client_isolation);
822
            // Consider stream isolation too, if it's set.
823
            if let Some(tok) = prefs.isolation_group() {
824
                b.stream_token(tok);
825
            }
826
            // Failure should be impossible with this builder.
827
            b.build().expect("Failed to construct StreamIsolation")
828
        };
829

            
830
        let circ = self
831
            .circmgr
832
            .get_or_launch_exit(dir.as_ref().into(), exit_ports, isolation)
833
            .await
834
            .map_err(|cause| ErrorDetail::ObtainExitCircuit {
835
                cause,
836
                exit_ports: exit_ports.into(),
837
            })?;
838
        drop(dir); // This decreases the refcount on the netdir.
839

            
840
        Ok(circ)
841
1
    }
842

            
843
    /// Return a current [`status::BootstrapStatus`] describing how close this client
844
    /// is to being ready for user traffic.
845
    pub fn bootstrap_status(&self) -> status::BootstrapStatus {
846
        self.status_receiver.inner.borrow().clone()
847
    }
848

            
849
    /// Return a stream of [`status::BootstrapStatus`] events that will be updated
850
    /// whenever the client's status changes.
851
    ///
852
    /// The receiver might not receive every update sent to this stream, though
853
    /// when it does poll the stream it should get the most recent one.
854
    //
855
    // TODO(nickm): will this also need to implement Send and 'static?
856
    pub fn bootstrap_events(&self) -> status::BootstrapEvents {
857
        self.status_receiver.clone()
858
    }
859
}
860

            
861
/// Alias for TorError::from(Error)
862
1
pub(crate) fn wrap_err<T>(err: T) -> crate::Error
863
1
where
864
1
    ErrorDetail: From<T>,
865
1
{
866
1
    ErrorDetail::from(err).into()
867
1
}
868

            
869
/// Whenever a [`DirEvent::NewConsensus`] arrives on `events`, update
870
/// `circmgr` with the consensus parameters from `dirmgr`.
871
///
872
/// Exit when `events` is closed, or one of `circmgr` or `dirmgr` becomes
873
/// dangling.
874
///
875
/// This is a daemon task: it runs indefinitely in the background.
876
2
async fn keep_circmgr_params_updated<R: Runtime>(
877
2
    mut events: impl futures::Stream<Item = DirEvent> + Unpin,
878
2
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
879
2
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
880
2
) {
881
    use DirEvent::*;
882
2
    while let Some(event) = events.next().await {
883
        match event {
884
            NewConsensus => {
885
                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
886
                    let netdir = dm
887
                        .latest_netdir()
888
                        .expect("got new consensus event, without a netdir?");
889
                    cm.update_network_parameters(netdir.params());
890
                    cm.update_network(&netdir);
891
                } else {
892
                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
893
                    break;
894
                }
895
            }
896
            NewDescriptors => {
897
                if let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
898
                    let netdir = dm
899
                        .latest_netdir()
900
                        .expect("got new descriptors event, without a netdir?");
901
                    cm.update_network(&netdir);
902
                } else {
903
                    debug!("Circmgr or dirmgr has disappeared; task exiting.");
904
                    break;
905
                }
906
            }
907
            _ => {
908
                // Nothing we recognize.
909
            }
910
        }
911
    }
912
}
913

            
914
/// Run forever, periodically telling `circmgr` to update its persistent
915
/// state.
916
///
917
/// Exit when we notice that `circmgr` has been dropped.
918
///
919
/// This is a daemon task: it runs indefinitely in the background.
920
2
async fn update_persistent_state<R: Runtime>(
921
2
    runtime: R,
922
2
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
923
2
    statemgr: FsStateMgr,
924
2
) {
925
    // TODO: Consider moving this function into tor-circmgr after we have more
926
    // experience with the state system.
927

            
928
    loop {
929
2
        if let Some(circmgr) = Weak::upgrade(&circmgr) {
930
            use tor_persist::LockStatus::*;
931

            
932
2
            match statemgr.try_lock() {
933
                Err(e) => {
934
                    error!("Problem with state lock file: {}", e);
935
                    break;
936
                }
937
                Ok(NewlyAcquired) => {
938
                    info!("We now own the lock on our state files.");
939
2
                    if let Err(e) = circmgr.upgrade_to_owned_persistent_state() {
940
                        error!("Unable to upgrade to owned state files: {}", e);
941
                        break;
942
2
                    }
943
                }
944
                Ok(AlreadyHeld) => {
945
                    if let Err(e) = circmgr.store_persistent_state() {
946
                        error!("Unable to flush circmgr state: {}", e);
947
                        break;
948
                    }
949
                }
950
                Ok(NoLock) => {
951
                    if let Err(e) = circmgr.reload_persistent_state() {
952
                        error!("Unable to reload circmgr state: {}", e);
953
                        break;
954
                    }
955
                }
956
            }
957
        } else {
958
            debug!("Circmgr has disappeared; task exiting.");
959
            return;
960
        }
961
        // TODO(nickm): This delay is probably too small.
962
        //
963
        // Also, we probably don't even want a fixed delay here.  Instead,
964
        // we should be updating more frequently when the data is volatile
965
        // or has important info to save, and not at all when there are no
966
        // changes.
967
2
        runtime.sleep(Duration::from_secs(60)).await;
968
    }
969

            
970
    error!("State update task is exiting prematurely.");
971
}
972

            
973
/// Run indefinitely, launching circuits as needed to get a good
974
/// estimate for our circuit build timeouts.
975
///
976
/// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
977
///
978
/// This is a daemon task: it runs indefinitely in the background.
979
///
980
/// # Note
981
///
982
/// I'd prefer this to be handled entirely within the tor-circmgr crate;
983
/// see [`tor_circmgr::CircMgr::launch_timeout_testing_circuit_if_appropriate`]
984
/// for more information.
985
2
async fn continually_launch_timeout_testing_circuits<R: Runtime>(
986
2
    rt: R,
987
2
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
988
2
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
989
2
) {
990
2
    while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
991
2
        if let Some(netdir) = dm.latest_netdir() {
992
            if let Err(e) = cm.launch_timeout_testing_circuit_if_appropriate(&netdir) {
993
                warn!("Problem launching a timeout testing circuit: {}", e);
994
            }
995
            let delay = netdir
996
                .params()
997
                .cbt_testing_delay
998
                .try_into()
999
                .expect("Out-of-bounds value from BoundedInt32");

            
            drop((cm, dm));
            rt.sleep(delay).await;
        } else {
            // TODO(eta): ideally, this should wait until we successfully bootstrap using
            //            the bootstrap status API
2
            rt.sleep(Duration::from_secs(10)).await;
        }
    }
}

            
/// Run indefinitely, launching circuits where the preemptive circuit
/// predictor thinks it'd be a good idea to have them.
///
/// Exit when we notice that `circmgr` or `dirmgr` has been dropped.
///
/// This is a daemon task: it runs indefinitely in the background.
///
/// # Note
///
/// This would be better handled entirely within `tor-circmgr`, like
/// other daemon tasks.
2
async fn continually_preemptively_build_circuits<R: Runtime>(
2
    rt: R,
2
    circmgr: Weak<tor_circmgr::CircMgr<R>>,
2
    dirmgr: Weak<dyn tor_dirmgr::DirProvider + Send + Sync>,
2
) {
2
    while let (Some(cm), Some(dm)) = (Weak::upgrade(&circmgr), Weak::upgrade(&dirmgr)) {
2
        if let Some(netdir) = dm.latest_netdir() {
            cm.launch_circuits_preemptively(DirInfo::Directory(&netdir))
                .await;
            rt.sleep(Duration::from_secs(10)).await;
        } else {
            // TODO(eta): ideally, this should wait until we successfully bootstrap using
            //            the bootstrap status API
2
            rt.sleep(Duration::from_secs(10)).await;
        }
    }
}
/// Periodically expire any channels that have been unused beyond
/// the maximum duration allowed.
///
/// Exist when we find that `chanmgr` is dropped
///
/// This is a daemon task that runs indefinitely in the background
2
async fn continually_expire_channels<R: Runtime>(rt: R, chanmgr: Weak<tor_chanmgr::ChanMgr<R>>) {
    loop {
2
        let delay = if let Some(cm) = Weak::upgrade(&chanmgr) {
2
            cm.expire_channels()
        } else {
            // channel manager is closed.
            return;
        };
        // This will sometimes be an underestimate, but it's no big deal; we just sleep some more.
2
        rt.sleep(Duration::from_secs(delay.as_secs())).await;
    }
}

            
#[cfg(test)]
mod test {
    #![allow(clippy::unwrap_used)]

            
    use super::*;
    use crate::config::TorClientConfigBuilder;
    use crate::{ErrorKind, HasKind};

            
    #[test]
    fn create_unbootstrapped() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let state_dir = tempfile::tempdir().unwrap();
            let cache_dir = tempfile::tempdir().unwrap();
            let cfg = TorClientConfigBuilder::from_directories(state_dir, cache_dir)
                .build()
                .unwrap();
            let _ = TorClient::with_runtime(rt)
                .config(cfg)
                .bootstrap_behavior(BootstrapBehavior::Manual)
                .create_unbootstrapped()
                .unwrap();
        });
    }

            
    #[test]
    fn unbootstrapped_client_unusable() {
        tor_rtcompat::test_with_one_runtime!(|rt| async {
            let state_dir = tempfile::tempdir().unwrap();
            let cache_dir = tempfile::tempdir().unwrap();
            let cfg = TorClientConfigBuilder::from_directories(state_dir, cache_dir)
                .build()
                .unwrap();
            let client = TorClient::with_runtime(rt)
                .config(cfg)
                .bootstrap_behavior(BootstrapBehavior::Manual)
                .create_unbootstrapped()
                .unwrap();
            let result = client.connect("example.com:80").await;
            assert!(result.is_err());
            assert_eq!(result.err().unwrap().kind(), ErrorKind::BootstrapRequired);
        });
    }
}