1
1
//! `tor-dirclient`: Implements a minimal directory client for Tor.
2
//!
3
//! # Overview
4
//!
5
//! Tor makes its directory requests as HTTP/1.0 requests tunneled over
6
//! Tor circuits.  For most objects, Tor uses a one-hop tunnel.  Tor
7
//! also uses a few strange and ad-hoc HTTP headers to select
8
//! particular functionality, such as asking for diffs, compression,
9
//! or multiple documents.
10
//!
11
//! This crate provides an API for downloading Tor directory resources
12
//! over a Tor circuit.
13
//!
14
//! This crate is part of
15
//! [Arti](https://gitlab.torproject.org/tpo/core/arti/), a project to
16
//! implement [Tor](https://www.torproject.org/) in Rust.
17
//!
18
//! # Features
19
//!
20
//! `xz` -- enable XZ compression.  This can be expensive in RAM and CPU,
21
//! but it saves a lot of bandwidth.  (On by default.)
22
//!
23
//! `zstd` -- enable ZSTD compression.  (On by default.)
24
//!
25
//! `routerdesc` -- Add support for downloading router descriptors.
26

            
27
#![deny(missing_docs)]
28
#![warn(noop_method_call)]
29
#![deny(unreachable_pub)]
30
#![warn(clippy::all)]
31
#![deny(clippy::await_holding_lock)]
32
#![deny(clippy::cargo_common_metadata)]
33
#![deny(clippy::cast_lossless)]
34
#![deny(clippy::checked_conversions)]
35
#![warn(clippy::cognitive_complexity)]
36
#![deny(clippy::debug_assert_with_mut_call)]
37
#![deny(clippy::exhaustive_enums)]
38
#![deny(clippy::exhaustive_structs)]
39
#![deny(clippy::expl_impl_clone_on_copy)]
40
#![deny(clippy::fallible_impl_from)]
41
#![deny(clippy::implicit_clone)]
42
#![deny(clippy::large_stack_arrays)]
43
#![warn(clippy::manual_ok_or)]
44
#![deny(clippy::missing_docs_in_private_items)]
45
#![deny(clippy::missing_panics_doc)]
46
#![warn(clippy::needless_borrow)]
47
#![warn(clippy::needless_pass_by_value)]
48
#![warn(clippy::option_option)]
49
#![warn(clippy::rc_buffer)]
50
#![deny(clippy::ref_option_ref)]
51
#![warn(clippy::semicolon_if_nothing_returned)]
52
#![warn(clippy::trait_duplication_in_bounds)]
53
#![deny(clippy::unnecessary_wraps)]
54
#![warn(clippy::unseparated_literal_suffix)]
55
#![deny(clippy::unwrap_used)]
56

            
57
mod err;
58
pub mod request;
59
mod response;
60
mod util;
61

            
62
use tor_circmgr::{CircMgr, DirInfo};
63
use tor_rtcompat::{Runtime, SleepProvider, SleepProviderExt};
64

            
65
// Zlib is required; the others are optional.
66
#[cfg(feature = "xz")]
67
use async_compression::futures::bufread::XzDecoder;
68
use async_compression::futures::bufread::ZlibDecoder;
69
#[cfg(feature = "zstd")]
70
use async_compression::futures::bufread::ZstdDecoder;
71

            
72
use futures::io::{
73
    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
74
};
75
use futures::FutureExt;
76
use memchr::memchr;
77
use std::sync::Arc;
78
use std::time::Duration;
79
use tracing::info;
80

            
81
pub use err::Error;
82
pub use response::{DirResponse, SourceInfo};
83

            
84
/// Type for results returned in this crate.
85
pub type Result<T> = std::result::Result<T, Error>;
86

            
87
/// Fetch the resource described by `req` over the Tor network.
88
///
89
/// Circuits are built or found using `circ_mgr`, using paths
90
/// constructed using `dirinfo`.
91
///
92
/// For more fine-grained control over the circuit and stream used,
93
/// construct them yourself, and then call [`download`] instead.
94
///
95
/// # TODO
96
///
97
/// This is the only function in this crate that knows about CircMgr and
98
/// DirInfo.  Perhaps this function should move up a level into DirMgr?
99
pub async fn get_resource<CR, R, SP>(
100
    req: &CR,
101
    dirinfo: DirInfo<'_>,
102
    runtime: &SP,
103
    circ_mgr: Arc<CircMgr<R>>,
104
) -> Result<DirResponse>
105
where
106
    CR: request::Requestable + ?Sized,
107
    R: Runtime,
108
    SP: SleepProvider,
109
{
110
    let circuit = circ_mgr.get_or_launch_dir(dirinfo).await?;
111

            
112
    // TODO(nickm) This should be an option, and is too long.
113
    let begin_timeout = Duration::from_secs(5);
114
    let source = SourceInfo::new(circuit.unique_id());
115

            
116
    // Launch the stream.
117
    let mut stream = runtime
118
        .timeout(begin_timeout, circuit.begin_dir_stream())
119
        .await??; // TODO(nickm) handle fatalities here too
120

            
121
    // TODO: Perhaps we want separate timeouts for each phase of this.
122
    // For now, we just use higher-level timeouts in `dirmgr`.
123
    let r = download(runtime, req, &mut stream, Some(source.clone())).await;
124

            
125
    if should_retire_circ(&r) {
126
        retire_circ(&circ_mgr, &source, "Partial response");
127
    }
128

            
129
    r
130
}
131

            
132
/// Return true if `result` holds an error indicating that we should retire the
133
/// circuit used for the corresponding request.
134
2
fn should_retire_circ(result: &Result<DirResponse>) -> bool {
135
2
    match result {
136
1
        Err(e) => e.should_retire_circ(),
137
1
        Ok(dr) => dr.error().map(Error::should_retire_circ) == Some(true),
138
    }
139
2
}
140

            
141
/// Fetch a Tor directory object from a provided stream.
142
///
143
/// To do this, we send a simple HTTP/1.0 request for the described
144
/// object in `req` over `stream`, and then wait for a response.  In
145
/// log messages, we describe the origin of the data as coming from
146
/// `source`.
147
///
148
/// # Notes
149
///
150
/// It's kind of bogus to have a 'source' field here at all; we may
151
/// eventually want to remove it.
152
///
153
/// This function doesn't close the stream; you may want to do that
154
/// yourself.
155
7
pub async fn download<R, S, SP>(
156
7
    runtime: &SP,
157
7
    req: &R,
158
7
    stream: &mut S,
159
7
    source: Option<SourceInfo>,
160
7
) -> Result<DirResponse>
161
7
where
162
7
    R: request::Requestable + ?Sized,
163
7
    S: AsyncRead + AsyncWrite + Send + Unpin,
164
7
    SP: SleepProvider,
165
7
{
166
7
    let partial_ok = req.partial_docs_ok();
167
7
    let maxlen = req.max_response_len();
168
7
    let req = req.make_request()?;
169
7
    let encoded = util::encode_request(&req);
170
7

            
171
7
    // Write the request.
172
7
    stream.write_all(encoded.as_bytes()).await?;
173
6
    stream.flush().await?;
174

            
175
6
    let mut buffered = BufReader::new(stream);
176

            
177
    // Handle the response
178
    // TODO: should there be a separate timeout here?
179
26
    let header = read_headers(&mut buffered).await?;
180
4
    if header.status != Some(200) {
181
1
        return Ok(DirResponse::new(
182
1
            header.status.unwrap_or(0),
183
1
            None,
184
1
            vec![],
185
1
            source,
186
1
        ));
187
3
    }
188

            
189
3
    let mut decoder = get_decoder(buffered, header.encoding.as_deref())?;
190

            
191
3
    let mut result = Vec::new();
192
6
    let ok = read_and_decompress(runtime, &mut decoder, maxlen, &mut result).await;
193

            
194
3
    let ok = match (partial_ok, ok, result.len()) {
195
1
        (true, Err(e), n) if n > 0 => {
196
1
            // Note that we _don't_ return here: we want the partial response.
197
1
            Err(e)
198
        }
199
1
        (_, Err(e), _) => {
200
1
            return Err(e);
201
        }
202
1
        (_, Ok(()), _) => Ok(()),
203
    };
204

            
205
2
    Ok(DirResponse::new(200, ok.err(), result, source))
206
7
}
207

            
208
/// Read and parse HTTP/1 headers from `stream`.
209
10
async fn read_headers<S>(stream: &mut S) -> Result<HeaderStatus>
210
10
where
211
10
    S: AsyncBufRead + Unpin,
212
10
{
213
10
    let mut buf = Vec::with_capacity(1024);
214

            
215
    loop {
216
        // TODO: it's inefficient to do this a line at a time; it would
217
        // probably be better to read until the CRLF CRLF ending of the
218
        // response.  But this should be fast enough.
219
31
        let n = read_until_limited(stream, b'\n', 2048, &mut buf).await?;
220

            
221
        // TODO(nickm): Better maximum and/or let this expand.
222
31
        let mut headers = [httparse::EMPTY_HEADER; 32];
223
31
        let mut response = httparse::Response::new(&mut headers);
224
31

            
225
31
        match response.parse(&buf[..])? {
226
            httparse::Status::Partial => {
227
                // We didn't get a whole response; we may need to try again.
228

            
229
24
                if n == 0 {
230
                    // We hit an EOF; no more progress can be made.
231
3
                    return Err(Error::TruncatedHeaders);
232
21
                }
233
21

            
234
21
                // TODO(nickm): Pick a better maximum
235
21
                if buf.len() >= 16384 {
236
1
                    return Err(httparse::Error::TooManyHeaders.into());
237
20
                }
238
            }
239
6
            httparse::Status::Complete(n_parsed) => {
240
6
                if response.code != Some(200) {
241
2
                    return Ok(HeaderStatus {
242
2
                        status: response.code,
243
2
                        encoding: None,
244
2
                    });
245
4
                }
246
4
                let encoding = if let Some(enc) = response
247
4
                    .headers
248
4
                    .iter()
249
4
                    .find(|h| h.name == "Content-Encoding")
250
                {
251
3
                    Some(String::from_utf8(enc.value.to_vec())?)
252
                } else {
253
1
                    None
254
                };
255
                /*
256
                if let Some(clen) = response.headers.iter().find(|h| h.name == "Content-Length") {
257
                    let clen = std::str::from_utf8(clen.value)?;
258
                    length = Some(clen.parse()?);
259
                }
260
                 */
261
4
                assert!(n_parsed == buf.len());
262
4
                return Ok(HeaderStatus {
263
4
                    status: Some(200),
264
4
                    encoding,
265
4
                });
266
            }
267
        }
268
20
        if n == 0 {
269
            return Err(Error::TruncatedHeaders);
270
20
        }
271
    }
272
11
}
273

            
274
/// Return value from read_headers
275
#[derive(Debug, Clone)]
276
struct HeaderStatus {
277
    /// HTTP status code.
278
    status: Option<u16>,
279
    /// The Content-Encoding header, if any.
280
    encoding: Option<String>,
281
}
282

            
283
/// Helper: download directory information from `stream` and
284
/// decompress it into a result buffer.  Assumes that `buf` is empty.
285
///
286
/// If we get more than maxlen bytes after decompression, give an error.
287
///
288
/// Returns the status of our download attempt, stores any data that
289
/// we were able to download into `result`.  Existing contents of
290
/// `result` are overwritten.
291
10
async fn read_and_decompress<S, SP>(
292
10
    runtime: &SP,
293
10
    mut stream: S,
294
10
    maxlen: usize,
295
10
    result: &mut Vec<u8>,
296
10
) -> Result<()>
297
10
where
298
10
    S: AsyncRead + Unpin,
299
10
    SP: SleepProvider,
300
10
{
301
10
    let buffer_window_size = 1024;
302
10
    let mut written_total: usize = 0;
303
10
    // TODO(nickm): This should be an option, and is maybe too long.
304
10
    // Though for some users it may be too short?
305
10
    let read_timeout = Duration::from_secs(10);
306
10
    let timer = runtime.sleep(read_timeout).fuse();
307
10
    futures::pin_mut!(timer);
308

            
309
234
    loop {
310
234
        // allocate buffer for next read
311
234
        result.resize(written_total + buffer_window_size, 0);
312
234
        let buf: &mut [u8] = &mut result[written_total..written_total + buffer_window_size];
313

            
314
474
        let status = futures::select! {
315
356
            status = stream.read(buf).fuse() => status,
316
356
            _ = timer => {
317
356
                result.resize(written_total, 0); // truncate as needed
318
356
                return Err(Error::DirTimeout);
319
356
            }
320
356
        };
321
234
        let written_in_this_loop = match status {
322
231
            Ok(n) => n,
323
3
            Err(other) => {
324
3
                result.resize(written_total, 0); // truncate as needed
325
3
                return Err(other.into());
326
            }
327
        };
328

            
329
231
        written_total += written_in_this_loop;
330
231

            
331
231
        // exit conditions below
332
231

            
333
231
        if written_in_this_loop == 0 {
334
            /*
335
            in case we read less than `buffer_window_size` in last `read`
336
            we need to shrink result because otherwise we'll return those
337
            un-read 0s
338
            */
339
6
            if written_total < result.len() {
340
6
                result.resize(written_total, 0);
341
6
            }
342
6
            return Ok(());
343
225
        }
344
225

            
345
225
        // TODO: It would be good to detect compression bombs, but
346
225
        // that would require access to the internal stream, which
347
225
        // would in turn require some tricky programming.  For now, we
348
225
        // use the maximum length here to prevent an attacker from
349
225
        // filling our RAM.
350
225
        if written_total > maxlen {
351
1
            result.resize(maxlen, 0);
352
1
            return Err(Error::ResponseTooLong(written_total));
353
224
        }
354
    }
355
10
}
356

            
357
/// Retire a directory circuit because of an error we've encountered on it.
358
fn retire_circ<R, E>(circ_mgr: &Arc<CircMgr<R>>, source_info: &SourceInfo, error: &E)
359
where
360
    R: Runtime,
361
    E: std::fmt::Display + ?Sized,
362
{
363
    let id = source_info.unique_circ_id();
364
    info!(
365
        "{}: Retiring circuit because of directory failure: {}",
366
        &id, &error
367
    );
368
    circ_mgr.retire_circ(id);
369
}
370

            
371
/// As AsyncBufReadExt::read_until, but stops after reading `max` bytes.
372
///
373
/// Note that this function might not actually read any byte of value
374
/// `byte`, since EOF might occur, or we might fill the buffer.
375
///
376
/// A return value of 0 indicates an end-of-file.
377
33
async fn read_until_limited<S>(
378
33
    stream: &mut S,
379
33
    byte: u8,
380
33
    max: usize,
381
33
    buf: &mut Vec<u8>,
382
33
) -> std::io::Result<usize>
383
33
where
384
33
    S: AsyncBufRead + Unpin,
385
34
{
386
34
    let mut n_added = 0;
387
    loop {
388
113
        let data = stream.fill_buf().await?;
389
112
        if data.is_empty() {
390
            // End-of-file has been reached.
391
6
            return Ok(n_added);
392
106
        }
393
106
        debug_assert!(n_added < max);
394
106
        let remaining_space = max - n_added;
395
106
        let (available, found_byte) = match memchr(byte, data) {
396
21
            Some(idx) => (idx + 1, true),
397
85
            None => (data.len(), false),
398
        };
399
107
        debug_assert!(available >= 1);
400
106
        let n_to_copy = std::cmp::min(remaining_space, available);
401
106
        buf.extend(&data[..n_to_copy]);
402
106
        stream.consume_unpin(n_to_copy);
403
106
        n_added += n_to_copy;
404
106
        if found_byte || n_added == max {
405
28
            return Ok(n_added);
406
79
        }
407
    }
408
34
}
409

            
410
/// Helper: Return a boxed decoder object that wraps the stream  $s.
411
macro_rules! decoder {
412
    ($dec:ident, $s:expr) => {{
413
        let mut decoder = $dec::new($s);
414
        decoder.multiple_members(true);
415
        Ok(Box::new(decoder))
416
    }};
417
}
418

            
419
/// Wrap `stream` in an appropriate type to undo the content encoding
420
/// as described in `encoding`.
421
11
fn get_decoder<'a, S: AsyncBufRead + Unpin + Send + 'a>(
422
11
    stream: S,
423
11
    encoding: Option<&str>,
424
11
) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>> {
425
11
    match encoding {
426
9
        None | Some("identity") => Ok(Box::new(stream)),
427
7
        Some("deflate") => decoder!(ZlibDecoder, stream),
428
        #[cfg(feature = "xz")]
429
3
        Some("x-tor-lzma") => decoder!(XzDecoder, stream),
430
        #[cfg(feature = "zstd")]
431
2
        Some("x-zstd") => decoder!(ZstdDecoder, stream),
432
1
        Some(other) => Err(Error::ContentEncoding(other.into())),
433
    }
434
11
}
435

            
436
#[cfg(test)]
437
mod test {
438
    #![allow(clippy::unwrap_used)]
439
    use super::*;
440
    use tor_rtmock::{io::stream_pair, time::MockSleepProvider};
441

            
442
    use futures_await_test::async_test;
443

            
444
    #[async_test]
445
    async fn test_read_until_limited() -> Result<()> {
446
        let mut out = Vec::new();
447
        let bytes = b"This line eventually ends\nthen comes another\n";
448

            
449
        // Case 1: find a whole line.
450
        let mut s = &bytes[..];
451
        let res = read_until_limited(&mut s, b'\n', 100, &mut out).await;
452
        assert_eq!(res?, 26);
453
        assert_eq!(&out[..], b"This line eventually ends\n");
454

            
455
        // Case 2: reach the limit.
456
        let mut s = &bytes[..];
457
        out.clear();
458
        let res = read_until_limited(&mut s, b'\n', 10, &mut out).await;
459
        assert_eq!(res?, 10);
460
        assert_eq!(&out[..], b"This line ");
461

            
462
        // Case 3: reach EOF.
463
        let mut s = &bytes[..];
464
        out.clear();
465
        let res = read_until_limited(&mut s, b'Z', 100, &mut out).await;
466
        assert_eq!(res?, 45);
467
        assert_eq!(&out[..], &bytes[..]);
468

            
469
        Ok(())
470
    }
471

            
472
    // Basic decompression wrapper.
473
    async fn decomp_basic(
474
        encoding: Option<&str>,
475
        data: &[u8],
476
        maxlen: usize,
477
    ) -> (Result<()>, Vec<u8>) {
478
        // We don't need to do anything fancy here, since we aren't simulating
479
        // a timeout.
480
        let mock_time = MockSleepProvider::new(std::time::SystemTime::now());
481

            
482
        let mut output = Vec::new();
483
        let mut stream = match get_decoder(data, encoding) {
484
            Ok(s) => s,
485
            Err(e) => return (Err(e), output),
486
        };
487

            
488
        let r = read_and_decompress(&mock_time, &mut stream, maxlen, &mut output).await;
489

            
490
        (r, output)
491
    }
492

            
493
    #[async_test]
494
    async fn decompress_identity() -> Result<()> {
495
        let mut text = Vec::new();
496
        for _ in 0..1000 {
497
            text.extend(b"This is a string with a nontrivial length that we'll use to make sure that the loop is executed more than once.");
498
        }
499

            
500
        let limit = 10 << 20;
501
        let (s, r) = decomp_basic(None, &text[..], limit).await;
502
        s?;
503
        assert_eq!(r, text);
504

            
505
        let (s, r) = decomp_basic(Some("identity"), &text[..], limit).await;
506
        s?;
507
        assert_eq!(r, text);
508

            
509
        // Try truncated result
510
        let limit = 100;
511
        let (s, r) = decomp_basic(Some("identity"), &text[..], limit).await;
512
        assert!(s.is_err());
513
        assert_eq!(r, &text[..100]);
514

            
515
        Ok(())
516
    }
517

            
518
    #[async_test]
519
    async fn decomp_zlib() -> Result<()> {
520
        let compressed =
521
            hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap();
522

            
523
        let limit = 10 << 20;
524
        let (s, r) = decomp_basic(Some("deflate"), &compressed, limit).await;
525
        s?;
526
        assert_eq!(r, b"One fish Two fish Red fish Blue fish");
527

            
528
        Ok(())
529
    }
530

            
531
    #[cfg(feature = "zstd")]
532
    #[async_test]
533
    async fn decomp_zstd() -> Result<()> {
534
        let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap();
535
        let limit = 10 << 20;
536
        let (s, r) = decomp_basic(Some("x-zstd"), &compressed, limit).await;
537
        s?;
538
        assert_eq!(r, b"One fish Two fish Red fish Blue fish\n");
539

            
540
        Ok(())
541
    }
542

            
543
    #[cfg(feature = "xz")]
544
    #[async_test]
545
    async fn decomp_xz2() -> Result<()> {
546
        // Not so good at tiny files...
547
        let compressed = hex::decode("fd377a585a000004e6d6b446020021011c00000010cf58cce00024001d5d00279b88a202ca8612cfb3c19c87c34248a570451e4851d3323d34ab8000000000000901af64854c91f600013925d6ec06651fb6f37d010000000004595a").unwrap();
548
        let limit = 10 << 20;
549
        let (s, r) = decomp_basic(Some("x-tor-lzma"), &compressed, limit).await;
550
        s?;
551
        assert_eq!(r, b"One fish Two fish Red fish Blue fish\n");
552

            
553
        Ok(())
554
    }
555

            
556
    #[async_test]
557
    async fn decomp_unknown() {
558
        let compressed = hex::decode("28b52ffd24250d0100c84f6e6520666973682054776f526564426c756520666973680a0200600c0e2509478352cb").unwrap();
559
        let limit = 10 << 20;
560
        let (s, _r) = decomp_basic(Some("x-proprietary-rle"), &compressed, limit).await;
561

            
562
        assert!(matches!(s, Err(Error::ContentEncoding(_))));
563
    }
564

            
565
    #[async_test]
566
    async fn decomp_bad_data() {
567
        let compressed = b"This is not good zlib data";
568
        let limit = 10 << 20;
569
        let (s, _r) = decomp_basic(Some("deflate"), compressed, limit).await;
570

            
571
        // This should possibly be a different type in the future.
572
        assert!(matches!(s, Err(Error::IoError(_))));
573
    }
574

            
575
    #[async_test]
576
    async fn headers_ok() -> Result<()> {
577
        let text = b"HTTP/1.0 200 OK\r\nDate: ignored\r\nContent-Encoding: Waffles\r\n\r\n";
578

            
579
        let mut s = &text[..];
580
        let h = read_headers(&mut s).await?;
581

            
582
        assert_eq!(h.status, Some(200));
583
        assert_eq!(h.encoding.as_deref(), Some("Waffles"));
584

            
585
        // now try truncated
586
        let mut s = &text[..15];
587
        let h = read_headers(&mut s).await;
588
        assert!(matches!(h, Err(Error::TruncatedHeaders)));
589

            
590
        // now try with no encoding.
591
        let text = b"HTTP/1.0 404 Not found\r\n\r\n";
592
        let mut s = &text[..];
593
        let h = read_headers(&mut s).await?;
594

            
595
        assert_eq!(h.status, Some(404));
596
        assert!(h.encoding.is_none());
597

            
598
        Ok(())
599
    }
600

            
601
    #[async_test]
602
    async fn headers_bogus() -> Result<()> {
603
        let text = b"HTTP/999.0 WHAT EVEN\r\n\r\n";
604
        let mut s = &text[..];
605
        let h = read_headers(&mut s).await;
606

            
607
        assert!(h.is_err());
608
        assert!(matches!(h, Err(Error::HttparseError(_))));
609
        Ok(())
610
    }
611

            
612
    /// Run a trivial download example with a response provided as a binary
613
    /// string.
614
    ///
615
    /// Return the directory response (if any) and the request as encoded (if
616
    /// any.)
617
    fn run_download_test<Req: request::Requestable>(
618
        req: Req,
619
        response: &[u8],
620
    ) -> (Result<DirResponse>, Result<Vec<u8>>) {
621
        let (mut s1, s2) = stream_pair();
622
        let (mut s2_r, mut s2_w) = s2.split();
623

            
624
        tor_rtcompat::test_with_one_runtime!(|rt| async move {
625
            let rt2 = rt.clone();
626
            let (v1, v2, v3): (Result<DirResponse>, Result<Vec<u8>>, Result<()>) = futures::join!(
627
                async {
628
                    // Run the download function.
629
                    let r = download(&rt, &req, &mut s1, None).await;
630
                    s1.close().await?;
631
                    r
632
                },
633
                async {
634
                    // Take the request from the client, and return it in "v2"
635
                    let mut v = Vec::new();
636
                    s2_r.read_to_end(&mut v).await?;
637
                    Ok(v)
638
                },
639
                async {
640
                    // Send back a response.
641
                    s2_w.write_all(response).await?;
642
                    // We wait a moment to give the other side time to notice it
643
                    // has data.
644
                    //
645
                    // (Tentative diagnosis: The `async-compress` crate seems to
646
                    // be behave differently depending on whether the "close"
647
                    // comes right after the incomplete data or whether it comes
648
                    // after a delay.  If there's a delay, it notices the
649
                    // truncated data and tells us about it. But when there's
650
                    // _no_delay, it treats the data as an error and doesn't
651
                    // tell our code.)
652

            
653
                    // TODO: sleeping in tests is not great.
654
                    rt2.sleep(Duration::from_millis(50)).await;
655
                    s2_w.close().await?;
656
                    Ok(())
657
                }
658
            );
659

            
660
            assert!(v3.is_ok());
661

            
662
            (v1, v2)
663
        })
664
    }
665

            
666
    #[test]
667
    fn test_download() -> Result<()> {
668
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
669

            
670
        let (response, request) = run_download_test(
671
            req,
672
            b"HTTP/1.0 200 OK\r\n\r\nThis is where the descs would go.",
673
        );
674

            
675
        let request = request?;
676
        assert!(request[..].starts_with(
677
            b"GET /tor/micro/d/CQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQkJCQk.z HTTP/1.0\r\n"
678
        ));
679

            
680
        assert!(!should_retire_circ(&response));
681
        let response = response?;
682
        assert_eq!(response.status_code(), 200);
683
        assert!(!response.is_partial());
684
        assert!(response.error().is_none());
685
        assert!(response.source().is_none());
686
        let out_ref = response.output();
687
        assert_eq!(out_ref, b"This is where the descs would go.");
688
        let out = response.into_output();
689
        assert_eq!(&out, b"This is where the descs would go.");
690

            
691
        Ok(())
692
    }
693

            
694
    #[test]
695
    fn test_download_truncated() -> Result<()> {
696
        // Request only one md, so "partial ok" will not be set.
697
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
698
        let mut response_text: Vec<u8> =
699
            (*b"HTTP/1.0 200 OK\r\nContent-Encoding: deflate\r\n\r\n").into();
700
        // "One fish two fish" as above twice, but truncated the second time
701
        response_text.extend(
702
            hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5970c88").unwrap(),
703
        );
704
        response_text.extend(
705
            hex::decode("789cf3cf4b5548cb2cce500829cf8730825253200ca79c52881c00e5").unwrap(),
706
        );
707
        let (response, request) = run_download_test(req, &response_text);
708
        assert!(request.is_ok());
709
        assert!(response.is_err()); // The whole download should fail, since partial_ok wasn't set.
710

            
711
        // request two microdescs, so "partial_ok" will be set.
712
        let req: request::MicrodescRequest = vec![[9; 32]; 2].into_iter().collect();
713

            
714
        let (response, request) = run_download_test(req, &response_text);
715
        assert!(request.is_ok());
716

            
717
        let response = response?;
718
        assert_eq!(response.status_code(), 200);
719
        assert!(response.error().is_some());
720
        assert!(response.is_partial());
721
        assert!(response.output().len() < 37 * 2);
722
        assert!(response.output().starts_with(b"One fish"));
723

            
724
        Ok(())
725
    }
726

            
727
    #[test]
728
    fn test_404() {
729
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
730
        let response_text = b"HTTP/1.0 418 I'm a teapot\r\n\r\n";
731
        let (response, _request) = run_download_test(req, response_text);
732

            
733
        assert_eq!(response.unwrap().status_code(), 418);
734
    }
735

            
736
    #[test]
737
    fn test_headers_truncated() {
738
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
739
        let response_text = b"HTTP/1.0 404 truncation happens here\r\n";
740
        let (response, _request) = run_download_test(req, response_text);
741

            
742
        assert!(matches!(response, Err(Error::TruncatedHeaders)));
743

            
744
        // Try a completely empty response.
745
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
746
        let response_text = b"";
747
        let (response, _request) = run_download_test(req, response_text);
748

            
749
        assert!(matches!(response, Err(Error::TruncatedHeaders)));
750
    }
751

            
752
    #[test]
753
    fn test_headers_too_long() {
754
        let req: request::MicrodescRequest = vec![[9; 32]].into_iter().collect();
755
        let mut response_text: Vec<u8> = (*b"HTTP/1.0 418 I'm a teapot\r\nX-Too-Many-As: ").into();
756
        response_text.resize(16384, b'A');
757
        let (response, _request) = run_download_test(req, &response_text);
758

            
759
        assert!(should_retire_circ(&response));
760
        assert!(matches!(response, Err(Error::HttparseError(_))));
761
    }
762

            
763
    // TODO: test with bad utf-8
764
}