1
//! Implement a simple SOCKS proxy that relays connections over Tor.
2
//!
3
//! A proxy is launched with [`run_socks_proxy()`], which listens for new
4
//! connections and then runs
5

            
6
use futures::future::FutureExt;
7
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, Error as IoError};
8
use futures::stream::StreamExt;
9
use futures::task::SpawnExt;
10
use std::collections::HashMap;
11
use std::convert::TryInto;
12
use std::io::Result as IoResult;
13
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
14
use std::sync::{self, Arc};
15
use std::time::{Duration, Instant};
16
use tracing::{error, info, warn};
17

            
18
use arti_client::{ErrorKind, HasKind, IsolationToken, StreamPrefs, TorClient};
19
use tor_rtcompat::{Runtime, TcpListener};
20
use tor_socksproto::{SocksAddr, SocksAuth, SocksCmd, SocksRequest};
21

            
22
use anyhow::{anyhow, Context, Result};
23

            
24
/// Find out which kind of address family we can/should use for a
25
/// given `SocksRequest`.
26
pub fn stream_preference(req: &SocksRequest, addr: &str) -> StreamPrefs {
27
    let mut prefs = StreamPrefs::new();
28
    if addr.parse::<Ipv4Addr>().is_ok() {
29
        // If they asked for an IPv4 address correctly, nothing else will do.
30
        prefs.ipv4_only();
31
    } else if addr.parse::<Ipv6Addr>().is_ok() {
32
        // If they asked for an IPv6 address correctly, nothing else will do.
33
        prefs.ipv6_only();
34
    } else if req.version() == tor_socksproto::SocksVersion::V4 {
35
        // SOCKS4 and SOCKS4a only support IPv4
36
        prefs.ipv4_only();
37
    } else {
38
        // Otherwise, default to saying IPv4 is preferred.
39
        prefs.ipv4_preferred();
40
    }
41
    prefs
42
}
43

            
44
/// A Key used to isolate connections.
45
///
46
/// Composed of an usize (representing which listener socket accepted
47
/// the connection, the source IpAddr of the client, and the
48
/// authentication string provided by the client).
49
type IsolationKey = (usize, IpAddr, SocksAuth);
50

            
51
/// Shared and garbage-collected Map used to isolate connections.
52
struct IsolationMap {
53
    /// Inner map guarded by a Mutex
54
    inner: sync::Mutex<IsolationMapInner>,
55
}
56

            
57
/// Inner map, generally guarded by a Mutex
58
struct IsolationMapInner {
59
    /// Map storing isolation token and last time they where used
60
    map: HashMap<IsolationKey, (IsolationToken, Instant)>,
61
    /// Instant after which the garbage collector will be run again
62
    next_gc: Instant,
63
}
64

            
65
/// How frequently should we discard entries from the isolation map, and
66
/// how old should we let them get?
67
const ISOMAP_GC_INTERVAL: Duration = Duration::from_secs(60 * 30);
68

            
69
impl IsolationMap {
70
    /// Create a new, empty, IsolationMap
71
1
    fn new() -> Self {
72
1
        IsolationMap {
73
1
            inner: sync::Mutex::new(IsolationMapInner {
74
1
                map: HashMap::new(),
75
1
                next_gc: Instant::now() + ISOMAP_GC_INTERVAL,
76
1
            }),
77
1
        }
78
1
    }
79

            
80
    /// Get the IsolationToken corresponding to the given key-tuple, creating a new IsolationToken
81
    /// if none exists for this key.
82
    ///
83
    /// Every 30 minutes, on next call to this functions, entry older than 30 minutes are removed
84
5
    fn get_or_create(&self, key: IsolationKey, now: Instant) -> IsolationToken {
85
5
        let mut inner = self.inner.lock().expect("Poisoned lock on isolation map.");
86
5
        if inner.next_gc < now {
87
2
            inner.next_gc = now + ISOMAP_GC_INTERVAL;
88
2

            
89
2
            let old_limit = now - ISOMAP_GC_INTERVAL;
90
4
            inner.map.retain(|_, val| val.1 > old_limit);
91
3
        }
92
5
        let entry = inner
93
5
            .map
94
5
            .entry(key)
95
5
            .or_insert_with(|| (IsolationToken::new(), now));
96
5
        entry.1 = now;
97
5
        entry.0
98
5
    }
99
}
100

            
101
/// Given a just-received TCP connection `S` on a SOCKS port, handle the
102
/// SOCKS handshake and relay the connection over the Tor network.
103
///
104
/// Uses `isolation_map` to decide which circuits circuits this connection
105
/// may use.  Requires that `isolation_info` is a pair listing the listener
106
/// id and the source address for the socks request.
107
async fn handle_socks_conn<R, S>(
108
    runtime: R,
109
    tor_client: TorClient<R>,
110
    socks_stream: S,
111
    isolation_map: Arc<IsolationMap>,
112
    isolation_info: (usize, IpAddr),
113
) -> Result<()>
114
where
115
    R: Runtime,
116
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
117
{
118
    // Part 1: Perform the SOCKS handshake, to learn where we are
119
    // being asked to connect, and what we're being asked to do once
120
    // we connect there.
121
    //
122
    // The SOCKS handshake can require multiple round trips (SOCKS5
123
    // always does) so we we need to run this part of the process in a
124
    // loop.
125
    let mut handshake = tor_socksproto::SocksHandshake::new();
126

            
127
    let (mut socks_r, mut socks_w) = socks_stream.split();
128
    let mut inbuf = [0_u8; 1024];
129
    let mut n_read = 0;
130
    let request = loop {
131
        // Read some more stuff.
132
        n_read += socks_r
133
            .read(&mut inbuf[n_read..])
134
            .await
135
            .context("Error while reading SOCKS handshake")?;
136

            
137
        // try to advance the handshake to the next state.
138
        let action = match handshake.handshake(&inbuf[..n_read]) {
139
            Err(_) => continue, // Message truncated.
140
            Ok(Err(e)) => {
141
                if let tor_socksproto::Error::BadProtocol(version) = e {
142
                    // check for HTTP methods: CONNECT, DELETE, GET, HEAD, OPTION, PUT, POST, PATCH and
143
                    // TRACE.
144
                    // To do so, check the first byte of the connection, which happen to be placed
145
                    // where SOCKs version field is.
146
                    if [b'C', b'D', b'G', b'H', b'O', b'P', b'T'].contains(&version) {
147
                        let payload = br#"HTTP/1.0 501 Tor is not an HTTP Proxy
148
Content-Type: text/html; charset=utf-8
149

            
150
<!DOCTYPE html>
151
<html>
152
<head>
153
<title>This is a SOCKS Proxy, Not An HTTP Proxy</title>
154
</head>
155
<body>
156
<h1>This is a SOCKs proxy, not an HTTP proxy.</h1>
157
<p>
158
It appears you have configured your web browser to use this Tor port as
159
an HTTP proxy.
160
</p><p>
161
This is not correct: This port is configured as a SOCKS proxy, not
162
an HTTP proxy. If you need an HTTP proxy tunnel, wait for Arti to
163
add support for it in place of, or in addition to, socks_port.
164
Please configure your client accordingly.
165
</p>
166
<p>
167
See <a href="https://gitlab.torproject.org/tpo/core/arti/#todo-need-to-change-when-arti-get-a-user-documentation">https://gitlab.torproject.org/tpo/core/arti</a> for more information.
168
</p>
169
</body>
170
</html>"#;
171
                        socks_w.write_all(payload).await?;
172
                    }
173
                }
174
                return Err(e.into());
175
            }
176
            Ok(Ok(action)) => action,
177
        };
178

            
179
        // reply if needed.
180
        if action.drain > 0 {
181
            inbuf.copy_within(action.drain..action.drain + n_read, 0);
182
            n_read -= action.drain;
183
        }
184
        if !action.reply.is_empty() {
185
            write_all_and_flush(&mut socks_w, &action.reply).await?;
186
        }
187
        if action.finished {
188
            break handshake.into_request();
189
        }
190
    };
191
    let request = match request {
192
        Some(r) => r,
193
        None => {
194
            warn!("SOCKS handshake succeeded, but couldn't convert into a request.");
195
            return Ok(());
196
        }
197
    };
198

            
199
    // Unpack the socks request and find out where we're connecting to.
200
    let addr = request.addr().to_string();
201
    let port = request.port();
202
    info!(
203
        "Got a socks request: {} {}:{}",
204
        request.command(),
205
        addr,
206
        port
207
    );
208

            
209
    // Use the source address, SOCKS authentication, and listener ID
210
    // to determine the stream's isolation properties.  (Our current
211
    // rule is that two streams may only share a circuit if they have
212
    // the same values for all of these properties.)
213
    let auth = request.auth().clone();
214
    let (source_address, ip) = isolation_info;
215
    let isolation_token = isolation_map.get_or_create((source_address, ip, auth), Instant::now());
216

            
217
    // Determine whether we want to ask for IPv4/IPv6 addresses.
218
    let mut prefs = stream_preference(&request, &addr);
219
    prefs.set_isolation_group(isolation_token);
220

            
221
    match request.command() {
222
        SocksCmd::CONNECT => {
223
            // The SOCKS request wants us to connect to a given address.
224
            // So, launch a connection over Tor.
225
            let tor_stream = tor_client
226
                .connect_with_prefs((addr.clone(), port), &prefs)
227
                .await;
228
            let tor_stream = match tor_stream {
229
                Ok(s) => s,
230
                // In the case of a stream timeout, send the right SOCKS reply.
231
                Err(e) => {
232
                    // The connect attempt has failed.  We need to
233
                    // send an error.  See what kind it is.
234
                    //
235
                    let reply = match e.kind() {
236
                        ErrorKind::RemoteNetworkTimeout => {
237
                            request.reply(tor_socksproto::SocksStatus::TTL_EXPIRED, None)
238
                        }
239
                        _ => request.reply(tor_socksproto::SocksStatus::GENERAL_FAILURE, None),
240
                    };
241
                    write_all_and_close(&mut socks_w, &reply[..]).await?;
242
                    return Err(anyhow!(e));
243
                }
244
            };
245
            // Okay, great! We have a connection over the Tor network.
246
            info!("Got a stream for {}:{}", addr, port);
247
            // TODO: Should send a SOCKS reply if something fails. See #258.
248

            
249
            // Send back a SOCKS response, telling the client that it
250
            // successfully connected.
251
            let reply = request.reply(tor_socksproto::SocksStatus::SUCCEEDED, None);
252
            write_all_and_flush(&mut socks_w, &reply[..]).await?;
253

            
254
            let (tor_r, tor_w) = tor_stream.split();
255

            
256
            // Finally, spawn two background tasks to relay traffic between
257
            // the socks stream and the tor stream.
258
            runtime.spawn(copy_interactive(socks_r, tor_w).map(|_| ()))?;
259
            runtime.spawn(copy_interactive(tor_r, socks_w).map(|_| ()))?;
260
        }
261
        SocksCmd::RESOLVE => {
262
            // We've been asked to perform a regular hostname lookup.
263
            // (This is a tor-specific SOCKS extension.)
264
            let addrs = tor_client.resolve_with_prefs(&addr, &prefs).await?;
265
            if let Some(addr) = addrs.first() {
266
                let reply = request.reply(
267
                    tor_socksproto::SocksStatus::SUCCEEDED,
268
                    Some(&SocksAddr::Ip(*addr)),
269
                );
270
                write_all_and_flush(&mut socks_w, &reply[..]).await?;
271
            }
272
        }
273
        SocksCmd::RESOLVE_PTR => {
274
            // We've been asked to perform a reverse hostname lookup.
275
            // (This is a tor-specific SOCKS extension.)
276
            let addr: IpAddr = match addr.parse() {
277
                Ok(ip) => ip,
278
                Err(e) => {
279
                    let reply =
280
                        request.reply(tor_socksproto::SocksStatus::ADDRTYPE_NOT_SUPPORTED, None);
281
                    write_all_and_close(&mut socks_w, &reply[..]).await?;
282
                    return Err(anyhow!(e));
283
                }
284
            };
285
            let hosts = tor_client.resolve_ptr_with_prefs(addr, &prefs).await?;
286
            if let Some(host) = hosts.into_iter().next() {
287
                let reply = request.reply(
288
                    tor_socksproto::SocksStatus::SUCCEEDED,
289
                    Some(&SocksAddr::Hostname(host.try_into()?)),
290
                );
291
                write_all_and_flush(&mut socks_w, &reply[..]).await?;
292
            }
293
        }
294
        _ => {
295
            // We don't support this SOCKS command.
296
            warn!("Dropping request; {:?} is unsupported", request.command());
297
            let reply = request.reply(tor_socksproto::SocksStatus::COMMAND_NOT_SUPPORTED, None);
298
            write_all_and_close(&mut socks_w, &reply[..]).await?;
299
        }
300
    };
301

            
302
    // TODO: we should close the TCP stream if either task fails. Do we?
303
    // See #211 and #190.
304

            
305
    Ok(())
306
}
307

            
308
/// write_all the data to the writer & flush the writer if write_all is successful.
309
async fn write_all_and_flush<W>(writer: &mut W, buf: &[u8]) -> Result<()>
310
where
311
    W: AsyncWrite + Unpin,
312
{
313
    writer
314
        .write_all(buf)
315
        .await
316
        .context("Error while writing SOCKS reply")?;
317
    writer
318
        .flush()
319
        .await
320
        .context("Error while flushing SOCKS stream")
321
}
322

            
323
/// write_all the data to the writer & close the writer if write_all is successful.
324
async fn write_all_and_close<W>(writer: &mut W, buf: &[u8]) -> Result<()>
325
where
326
    W: AsyncWrite + Unpin,
327
{
328
    writer
329
        .write_all(buf)
330
        .await
331
        .context("Error while writing SOCKS reply")?;
332
    writer
333
        .close()
334
        .await
335
        .context("Error while closing SOCKS stream")
336
}
337

            
338
/// Copy all the data from `reader` into `writer` until we encounter an EOF or
339
/// an error.
340
///
341
/// Unlike as futures::io::copy(), this function is meant for use with
342
/// interactive readers and writers, where the reader might pause for
343
/// a while, but where we want to send data on the writer as soon as
344
/// it is available.
345
///
346
/// This function assumes that the writer might need to be flushed for
347
/// any buffered data to be sent.  It tries to minimize the number of
348
/// flushes, however, by only flushing the writer when the reader has no data.
349
async fn copy_interactive<R, W>(mut reader: R, mut writer: W) -> IoResult<()>
350
where
351
    R: AsyncRead + Unpin,
352
    W: AsyncWrite + Unpin,
353
{
354
    use futures::{poll, task::Poll};
355

            
356
    let mut buf = [0_u8; 1024];
357

            
358
    // At this point we could just loop, calling read().await,
359
    // write_all().await, and flush().await.  But we want to be more
360
    // clever than that: we only want to flush when the reader is
361
    // stalled.  That way we can pack our data into as few cells as
362
    // possible, but flush it immediately whenever there's no more
363
    // data coming.
364
    let loop_result: IoResult<()> = loop {
365
        let mut read_future = reader.read(&mut buf[..]);
366
        match poll!(&mut read_future) {
367
            Poll::Ready(Err(e)) => break Err(e),
368
            Poll::Ready(Ok(0)) => break Ok(()), // EOF
369
            Poll::Ready(Ok(n)) => {
370
                writer.write_all(&buf[..n]).await?;
371
                continue;
372
            }
373
            Poll::Pending => writer.flush().await?,
374
        }
375

            
376
        // The read future is pending, so we should wait on it.
377
        match read_future.await {
378
            Err(e) => break Err(e),
379
            Ok(0) => break Ok(()),
380
            Ok(n) => writer.write_all(&buf[..n]).await?,
381
        }
382
    };
383

            
384
    // Make sure that we flush any lingering data if we can.
385
    //
386
    // If there is a difference between closing and dropping, then we
387
    // only want to do a "proper" close if the reader closed cleanly.
388
    let flush_result = if loop_result.is_ok() {
389
        writer.close().await
390
    } else {
391
        writer.flush().await
392
    };
393

            
394
    loop_result.or(flush_result)
395
}
396

            
397
/// Return true if a given IoError, when received from accept, is a fatal
398
/// error.
399
fn accept_err_is_fatal(err: &IoError) -> bool {
400
    #![allow(clippy::match_like_matches_macro)]
401

            
402
    /// Re-declaration of WSAEMFILE with the right type to match
403
    /// `raw_os_error()`.
404
    #[cfg(windows)]
405
    const WSAEMFILE: i32 = winapi::shared::winerror::WSAEMFILE as i32;
406

            
407
    // Currently, EMFILE and ENFILE aren't distinguished by ErrorKind;
408
    // we need to use OS-specific errors. :P
409
    match err.raw_os_error() {
410
        #[cfg(unix)]
411
        Some(libc::EMFILE) | Some(libc::ENFILE) => false,
412
        #[cfg(windows)]
413
        Some(WSAEMFILE) => false,
414
        _ => true,
415
    }
416
}
417

            
418
/// Launch a SOCKS proxy to listen on a given localhost port, and run
419
/// indefinitely.
420
///
421
/// Requires a `runtime` to use for launching tasks and handling
422
/// timeouts, and a `tor_client` to use in connecting over the Tor
423
/// network.
424
pub async fn run_socks_proxy<R: Runtime>(
425
    runtime: R,
426
    tor_client: TorClient<R>,
427
    socks_port: u16,
428
) -> Result<()> {
429
    let mut listeners = Vec::new();
430

            
431
    // We actually listen on two ports: one for ipv4 and one for ipv6.
432
    let localhosts: [IpAddr; 2] = [Ipv4Addr::LOCALHOST.into(), Ipv6Addr::LOCALHOST.into()];
433

            
434
    // Try to bind to the SOCKS ports.
435
    for localhost in &localhosts {
436
        let addr: SocketAddr = (*localhost, socks_port).into();
437
        match runtime.listen(&addr).await {
438
            Ok(listener) => {
439
                info!("Listening on {:?}.", addr);
440
                listeners.push(listener);
441
            }
442
            Err(e) => warn!("Can't listen on {:?}: {}", addr, e),
443
        }
444
    }
445
    // We weren't able to bind any ports: There's nothing to do.
446
    if listeners.is_empty() {
447
        error!("Couldn't open any listeners.");
448
        return Err(anyhow!("Couldn't open listeners"));
449
    }
450

            
451
    // Create a stream of (incoming socket, listener_id) pairs, selected
452
    // across all the listeners.
453
    let mut incoming = futures::stream::select_all(
454
        listeners
455
            .into_iter()
456
            .map(TcpListener::incoming)
457
            .enumerate()
458
            .map(|(listener_id, incoming_conns)| {
459
                incoming_conns.map(move |socket| (socket, listener_id))
460
            }),
461
    );
462

            
463
    // Make a new IsolationMap; We'll use this to register which incoming
464
    // connections can and cannot share a circuit.
465
    let isolation_map = Arc::new(IsolationMap::new());
466

            
467
    // Loop over all incoming connections.  For each one, call
468
    // handle_socks_conn() in a new task.
469
    while let Some((stream, sock_id)) = incoming.next().await {
470
        let (stream, addr) = match stream {
471
            Ok((s, a)) => (s, a),
472
            Err(err) => {
473
                if accept_err_is_fatal(&err) {
474
                    return Err(err).context("Failed to receive incoming stream on SOCKS port");
475
                } else {
476
                    warn!("Incoming stream failed: {}", err);
477
                    continue;
478
                }
479
            }
480
        };
481
        let client_ref = tor_client.clone();
482
        let runtime_copy = runtime.clone();
483
        let isolation_map_ref = Arc::clone(&isolation_map);
484
        runtime.spawn(async move {
485
            let res = handle_socks_conn(
486
                runtime_copy,
487
                client_ref,
488
                stream,
489
                isolation_map_ref,
490
                (sock_id, addr.ip()),
491
            )
492
            .await;
493
            if let Err(e) = res {
494
                warn!("connection exited with error: {}", e);
495
            }
496
        })?;
497
    }
498

            
499
    Ok(())
500
}
501

            
502
#[cfg(test)]
503
mod test {
504
    #![allow(clippy::unwrap_used)]
505
    use super::*;
506

            
507
    #[test]
508
    fn test_isomap() {
509
        let m = IsolationMap::new();
510

            
511
        let k1 = (6, "10.0.0.1".parse().unwrap(), SocksAuth::NoAuth);
512
        let k2 = (
513
            6,
514
            "10.0.0.1".parse().unwrap(),
515
            SocksAuth::Socks4(vec![1, 2, 3]),
516
        );
517

            
518
        let t1 = Instant::now() + ISOMAP_GC_INTERVAL / 2;
519

            
520
        let tok1 = m.get_or_create(k1.clone(), t1);
521
        let tok2 = m.get_or_create(k2, t1);
522
        assert_ne!(tok1, tok2);
523
        assert_eq!(tok1, m.get_or_create(k1.clone(), t1));
524

            
525
        // Now make sure the GC happens, but the items aren't deleted since
526
        // they aren't quite old enough
527
        let t2 = t1 + (ISOMAP_GC_INTERVAL * 3) / 4;
528
        assert_eq!(tok1, m.get_or_create(k1.clone(), t2));
529

            
530
        // Now make sure that the GC happens, and the items _are_ deleted
531
        // as to old.
532
        let t3 = t2 + ISOMAP_GC_INTERVAL * 2;
533
        let tok3 = m.get_or_create(k1, t3);
534
        assert_ne!(tok3, tok2);
535
        assert_ne!(tok3, tok1);
536
    }
537
}