1
//! Declare DataStream, a type that wraps RawCellStream so as to be useful
2
//! for byte-oriented communication.
3

            
4
use crate::{Error, Result};
5
use tor_cell::relaycell::msg::EndReason;
6

            
7
use futures::io::{AsyncRead, AsyncWrite};
8
use futures::task::{Context, Poll};
9
use futures::Future;
10

            
11
#[cfg(feature = "tokio")]
12
use tokio_crate::io::ReadBuf;
13
#[cfg(feature = "tokio")]
14
use tokio_crate::io::{AsyncRead as TokioAsyncRead, AsyncWrite as TokioAsyncWrite};
15
#[cfg(feature = "tokio")]
16
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
17

            
18
use std::fmt::Debug;
19
use std::io::Result as IoResult;
20
use std::pin::Pin;
21

            
22
use educe::Educe;
23

            
24
use crate::circuit::StreamTarget;
25
use crate::stream::StreamReader;
26
use tor_basic_utils::skip_fmt;
27
use tor_cell::relaycell::msg::{Data, RelayMsg};
28
use tor_error::internal;
29

            
30
/// An anonymized stream over the Tor network.
31
///
32
/// For most purposes, you can think of this type as an anonymized
33
/// TCP stream: it can read and write data, and get closed when it's done.
34
///
35
/// [`DataStream`] implements [`futures::io::AsyncRead`] and
36
/// [`futures::io::AsyncWrite`], so you can use it anywhere that those
37
/// traits are expected.
38
///
39
/// # Examples
40
///
41
/// Connecting to an HTTP server and sending a request, using
42
/// [`AsyncWriteExt::write_all`](futures::io::AsyncWriteExt::write_all):
43
///
44
/// ```ignore
45
/// let mut stream = tor_client.connect(("icanhazip.com", 80), None).await?;
46
///
47
/// use futures::io::AsyncWriteExt;
48
///
49
/// stream
50
///     .write_all(b"GET / HTTP/1.1\r\nHost: icanhazip.com\r\nConnection: close\r\n\r\n")
51
///     .await?;
52
///
53
/// // Flushing the stream is important; see below!
54
/// stream.flush().await?;
55
/// ```
56
///
57
/// Reading the result, using [`AsyncReadExt::read_to_end`](futures::io::AsyncReadExt::read_to_end):
58
///
59
/// ```ignore
60
/// use futures::io::AsyncReadExt;
61
///
62
/// let mut buf = Vec::new();
63
/// stream.read_to_end(&mut buf).await?;
64
///
65
/// println!("{}", String::from_utf8_lossy(&buf));
66
/// ```
67
///
68
/// # Usage with Tokio
69
///
70
/// If the `tokio` crate feature is enabled, this type also implements
71
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) and
72
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
73
/// with code that expects those traits.
74
///
75
/// # Remember to call `flush`!
76
///
77
/// DataStream buffers data internally, in order to write as few cells
78
/// as possible onto the network.  In order to make sure that your
79
/// data has actually been sent, you need to make sure that
80
/// [`AsyncWrite::poll_flush`] runs to completion: probably via
81
/// [`AsyncWriteExt::flush`](futures::io::AsyncWriteExt::flush).
82
///
83
/// # Splitting the type
84
///
85
/// This type is internally composed of a [`DataReader`] and a [`DataWriter`]; the
86
/// `DataStream::split` method can be used to split it into those two parts, for more
87
/// convenient usage with e.g. stream combinators.
88
// # Semver note
89
//
90
// Note that this type is re-exported as a part of the public API of
91
// the `arti-client` crate.  Any changes to its API here in
92
// `tor-proto` need to be reflected above.
93
#[derive(Debug)]
94
pub struct DataStream {
95
    /// Underlying writer for this stream
96
    w: DataWriter,
97
    /// Underlying reader for this stream
98
    r: DataReader,
99
}
100

            
101
/// The write half of a [`DataStream`], implementing [`futures::io::AsyncWrite`].
102
///
103
/// See the [`DataStream`] docs for more information. In particular, note
104
/// that this writer requires `poll_flush` to complete in order to guarantee that
105
/// all data has been written.
106
///
107
/// # Usage with Tokio
108
///
109
/// If the `tokio` crate feature is enabled, this type also implements
110
/// [`tokio::io::AsyncWrite`](tokio_crate::io::AsyncWrite) for easier integration
111
/// with code that expects that trait.
112
// # Semver note
113
//
114
// Note that this type is re-exported as a part of the public API of
115
// the `arti-client` crate.  Any changes to its API here in
116
// `tor-proto` need to be reflected above.
117
#[derive(Debug)]
118
pub struct DataWriter {
119
    /// Internal state for this writer
120
    ///
121
    /// This is stored in an Option so that we can mutate it in the
122
    /// AsyncWrite functions.  It might be possible to do better here,
123
    /// and we should refactor if so.
124
    state: Option<DataWriterState>,
125
}
126

            
127
/// The read half of a [`DataStream`], implementing [`futures::io::AsyncRead`].
128
///
129
/// See the [`DataStream`] docs for more information.
130
///
131
/// # Usage with Tokio
132
///
133
/// If the `tokio` crate feature is enabled, this type also implements
134
/// [`tokio::io::AsyncRead`](tokio_crate::io::AsyncRead) for easier integration
135
/// with code that expects that trait.
136
//
137
// # Semver note
138
//
139
// Note that this type is re-exported as a part of the public API of
140
// the `arti-client` crate.  Any changes to its API here in
141
// `tor-proto` need to be reflected above.
142
#[derive(Debug)]
143
pub struct DataReader {
144
    /// Internal state for this reader.
145
    ///
146
    /// This is stored in an Option so that we can mutate it in
147
    /// poll_read().  It might be possible to do better here, and we
148
    /// should refactor if so.
149
    state: Option<DataReaderState>,
150
}
151

            
152
impl DataStream {
153
    /// Wrap raw stream reader and target parts as a DataStream.
154
    ///
155
    /// For non-optimistic stream, function `wait_for_connection`
156
    /// must be called after to make sure CONNECTED is received.
157
12
    pub(crate) fn new(reader: StreamReader, target: StreamTarget) -> Self {
158
12
        let r = DataReader {
159
12
            state: Some(DataReaderState::Ready(DataReaderImpl {
160
12
                s: reader,
161
12
                pending: Vec::new(),
162
12
                offset: 0,
163
12
                connected: false,
164
12
            })),
165
12
        };
166
12
        let w = DataWriter {
167
12
            state: Some(DataWriterState::Ready(DataWriterImpl {
168
12
                s: target,
169
12
                buf: Box::new([0; Data::MAXLEN]),
170
12
                n_pending: 0,
171
12
            })),
172
12
        };
173
12
        DataStream { w, r }
174
12
    }
175

            
176
    /// Divide this DataStream into its constituent parts.
177
    pub fn split(self) -> (DataReader, DataWriter) {
178
        (self.r, self.w)
179
    }
180

            
181
    /// Wait until a CONNECTED cell is received, or some other cell
182
    /// is received to indicate an error.
183
    ///
184
    /// Does nothing if this stream is already connected.
185
8
    pub(crate) async fn wait_for_connection(&mut self) -> Result<()> {
186
8
        // We must put state back before returning
187
8
        let state = self.r.state.take().expect("Missing state in DataReader");
188

            
189
8
        if let DataReaderState::Ready(imp) = state {
190
8
            let (imp, result) = if imp.connected {
191
                (imp, Ok(()))
192
            } else {
193
                // This succeeds if the cell is CONNECTED, and fails otherwise.
194
14
                imp.read_cell().await
195
            };
196
8
            self.r.state = Some(DataReaderState::Ready(imp));
197
8
            result
198
        } else {
199
            Err(Error::from(internal!(
200
                "Expected ready state, got {:?}",
201
                state
202
            )))
203
        }
204
8
    }
205
}
206

            
207
impl AsyncRead for DataStream {
208
25
    fn poll_read(
209
25
        mut self: Pin<&mut Self>,
210
25
        cx: &mut Context<'_>,
211
25
        buf: &mut [u8],
212
25
    ) -> Poll<IoResult<usize>> {
213
25
        AsyncRead::poll_read(Pin::new(&mut self.r), cx, buf)
214
25
    }
215
}
216

            
217
#[cfg(feature = "tokio")]
218
impl TokioAsyncRead for DataStream {
219
    fn poll_read(
220
        self: Pin<&mut Self>,
221
        cx: &mut Context<'_>,
222
        buf: &mut ReadBuf<'_>,
223
    ) -> Poll<IoResult<()>> {
224
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
225
    }
226
}
227

            
228
impl AsyncWrite for DataStream {
229
4943
    fn poll_write(
230
4943
        mut self: Pin<&mut Self>,
231
4943
        cx: &mut Context<'_>,
232
4943
        buf: &[u8],
233
4943
    ) -> Poll<IoResult<usize>> {
234
4943
        AsyncWrite::poll_write(Pin::new(&mut self.w), cx, buf)
235
4943
    }
236
19
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
237
19
        AsyncWrite::poll_flush(Pin::new(&mut self.w), cx)
238
19
    }
239
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
240
        AsyncWrite::poll_close(Pin::new(&mut self.w), cx)
241
    }
242
}
243

            
244
#[cfg(feature = "tokio")]
245
impl TokioAsyncWrite for DataStream {
246
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
247
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat()), cx, buf)
248
    }
249

            
250
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
251
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat()), cx)
252
    }
253

            
254
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
255
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat()), cx)
256
    }
257
}
258

            
259
/// An enumeration for the state of a DataWriter.
260
///
261
/// We have to use an enum here because, for as long as we're waiting
262
/// for a flush operation to complete, the future returned by
263
/// `flush_cell()` owns the DataWriterImpl.
264
#[derive(Educe)]
265
#[educe(Debug)]
266
enum DataWriterState {
267
    /// The writer has closed or gotten an error: nothing more to do.
268
    Closed,
269
    /// The writer is not currently flushing; more data can get queued
270
    /// immediately.
271
    Ready(DataWriterImpl),
272
    /// The writer is flushing a cell.
273
    Flushing(
274
        #[educe(Debug(method = "skip_fmt"))]
275
        Pin<Box<dyn Future<Output = (DataWriterImpl, Result<()>)> + Send>>,
276
    ),
277
}
278

            
279
/// Internal: the write part of a DataStream
280
#[derive(Educe)]
281
#[educe(Debug)]
282
struct DataWriterImpl {
283
    /// The underlying StreamTarget object.
284
    s: StreamTarget,
285

            
286
    /// Buffered data to send over the connection.
287
    // TODO: this buffer is probably smaller than we want, but it's good
288
    // enough for now.  If we _do_ make it bigger, we'll have to change
289
    // our use of Data::split_from to handle the case where we can't fit
290
    // all the data.
291
    #[educe(Debug(method = "skip_fmt"))]
292
    buf: Box<[u8; Data::MAXLEN]>,
293

            
294
    /// Number of unflushed bytes in buf.
295
    n_pending: usize,
296
}
297

            
298
impl DataWriter {
299
    /// Helper for poll_flush() and poll_close(): Performs a flush, then
300
    /// closes the stream if should_close is true.
301
19
    fn poll_flush_impl(
302
19
        mut self: Pin<&mut Self>,
303
19
        cx: &mut Context<'_>,
304
19
        should_close: bool,
305
19
    ) -> Poll<IoResult<()>> {
306
19
        let state = self.state.take().expect("Missing state in DataWriter");
307

            
308
        // TODO: this whole function is a bit copy-pasted.
309

            
310
19
        let mut future = match state {
311
12
            DataWriterState::Ready(imp) => {
312
12
                if imp.n_pending == 0 {
313
                    // Nothing to flush!
314
                    self.state = Some(DataWriterState::Ready(imp));
315
                    return Poll::Ready(Ok(()));
316
12
                }
317
12

            
318
12
                Box::pin(imp.flush_buf())
319
            }
320
7
            DataWriterState::Flushing(fut) => fut,
321
            DataWriterState::Closed => {
322
                self.state = Some(DataWriterState::Closed);
323
                return Poll::Ready(Err(Error::NotConnected.into()));
324
            }
325
        };
326

            
327
19
        match future.as_mut().poll(cx) {
328
            Poll::Ready((_imp, Err(e))) => {
329
                self.state = Some(DataWriterState::Closed);
330
                Poll::Ready(Err(e.into()))
331
            }
332
12
            Poll::Ready((imp, Ok(()))) => {
333
12
                if should_close {
334
                    self.state = Some(DataWriterState::Closed);
335
12
                } else {
336
12
                    self.state = Some(DataWriterState::Ready(imp));
337
12
                }
338
12
                Poll::Ready(Ok(()))
339
            }
340
            Poll::Pending => {
341
7
                self.state = Some(DataWriterState::Flushing(future));
342
7
                Poll::Pending
343
            }
344
        }
345
19
    }
346
}
347

            
348
impl AsyncWrite for DataWriter {
349
4943
    fn poll_write(
350
4943
        mut self: Pin<&mut Self>,
351
4943
        cx: &mut Context<'_>,
352
4943
        buf: &[u8],
353
4943
    ) -> Poll<IoResult<usize>> {
354
4943
        if buf.is_empty() {
355
            return Poll::Ready(Ok(0));
356
4943
        }
357
4943

            
358
4943
        let state = self.state.take().expect("Missing state in DataWriter");
359

            
360
4943
        let mut future = match state {
361
3572
            DataWriterState::Ready(mut imp) => {
362
3572
                let n_queued = imp.queue_bytes(buf);
363
3572
                if n_queued != 0 {
364
1172
                    self.state = Some(DataWriterState::Ready(imp));
365
1172
                    return Poll::Ready(Ok(n_queued));
366
2400
                }
367
2400
                // we couldn't queue anything, so the current cell must be full.
368
2400
                Box::pin(imp.flush_buf())
369
            }
370
1371
            DataWriterState::Flushing(fut) => fut,
371
            DataWriterState::Closed => {
372
                self.state = Some(DataWriterState::Closed);
373
                return Poll::Ready(Err(Error::NotConnected.into()));
374
            }
375
        };
376

            
377
3771
        match future.as_mut().poll(cx) {
378
            Poll::Ready((_imp, Err(e))) => {
379
                self.state = Some(DataWriterState::Closed);
380
                Poll::Ready(Err(e.into()))
381
            }
382
2400
            Poll::Ready((mut imp, Ok(()))) => {
383
2400
                // Great!  We're done flushing.  Queue as much as we can of this
384
2400
                // cell.
385
2400
                let n_queued = imp.queue_bytes(buf);
386
2400
                self.state = Some(DataWriterState::Ready(imp));
387
2400
                Poll::Ready(Ok(n_queued))
388
            }
389
            Poll::Pending => {
390
1371
                self.state = Some(DataWriterState::Flushing(future));
391
1371
                Poll::Pending
392
            }
393
        }
394
4943
    }
395

            
396
19
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
397
19
        self.poll_flush_impl(cx, false)
398
19
    }
399

            
400
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
401
        self.poll_flush_impl(cx, true)
402
    }
403
}
404

            
405
#[cfg(feature = "tokio")]
406
impl TokioAsyncWrite for DataWriter {
407
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
408
        TokioAsyncWrite::poll_write(Pin::new(&mut self.compat_write()), cx, buf)
409
    }
410

            
411
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
412
        TokioAsyncWrite::poll_flush(Pin::new(&mut self.compat_write()), cx)
413
    }
414

            
415
    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
416
        TokioAsyncWrite::poll_shutdown(Pin::new(&mut self.compat_write()), cx)
417
    }
418
}
419

            
420
impl DataWriterImpl {
421
    /// Try to flush the current buffer contents as a data cell.
422
2412
    async fn flush_buf(mut self) -> (Self, Result<()>) {
423
2412
        let result = if self.n_pending != 0 {
424
2412
            let (cell, remainder) = Data::split_from(&self.buf[..self.n_pending]);
425
2412
            // TODO: Eventually we may want a larger buffer; if we do,
426
2412
            // this invariant will become false.
427
2412
            assert!(remainder.is_empty());
428
2412
            self.n_pending = 0;
429
2412
            self.s.send(cell.into()).await
430
        } else {
431
            Ok(())
432
        };
433

            
434
2412
        (self, result)
435
2412
    }
436

            
437
    /// Add as many bytes as possible from `b` to our internal buffer;
438
    /// return the number we were able to add.
439
5972
    fn queue_bytes(&mut self, b: &[u8]) -> usize {
440
5972
        let empty_space = &mut self.buf[self.n_pending..];
441
5972
        if empty_space.is_empty() {
442
            // that is, len == 0
443
2400
            return 0;
444
3572
        }
445
3572

            
446
3572
        let n_to_copy = std::cmp::min(b.len(), empty_space.len());
447
3572
        empty_space[..n_to_copy].copy_from_slice(&b[..n_to_copy]);
448
3572
        self.n_pending += n_to_copy;
449
3572
        n_to_copy
450
5972
    }
451
}
452

            
453
/// An enumeration for the state of a DataReader.
454
///
455
/// We have to use an enum here because, when we're waiting for
456
/// ReadingCell to complete, the future returned by `read_cell()` owns the
457
/// DataCellImpl.  If we wanted to store the future and the cell at the
458
/// same time, we'd need to make a self-referential structure, which isn't
459
/// possible in safe Rust AIUI.
460
#[derive(Educe)]
461
#[educe(Debug)]
462
enum DataReaderState {
463
    /// In this state we have received an end cell or an error.
464
    Closed,
465
    /// In this state the reader is not currently fetching a cell; it
466
    /// either has data or not.
467
    Ready(DataReaderImpl),
468
    /// The reader is currently fetching a cell: this future is the
469
    /// progress it is making.
470
    ReadingCell(
471
        #[educe(Debug(method = "skip_fmt"))]
472
        Pin<Box<dyn Future<Output = (DataReaderImpl, Result<()>)> + Send>>,
473
    ),
474
}
475

            
476
/// Wrapper for the read part of a DataStream
477
#[derive(Educe)]
478
#[educe(Debug)]
479
struct DataReaderImpl {
480
    /// The underlying StreamReader object.
481
    #[educe(Debug(method = "skip_fmt"))]
482
    s: StreamReader,
483

            
484
    /// If present, data that we received on this stream but have not
485
    /// been able to send to the caller yet.
486
    // TODO: This data structure is probably not what we want, but
487
    // it's good enough for now.
488
    #[educe(Debug(method = "skip_fmt"))]
489
    pending: Vec<u8>,
490

            
491
    /// Index into pending to show what we've already read.
492
    offset: usize,
493

            
494
    /// If true, we have received a CONNECTED cell on this stream.
495
    connected: bool,
496
}
497

            
498
impl AsyncRead for DataReader {
499
25
    fn poll_read(
500
25
        mut self: Pin<&mut Self>,
501
25
        cx: &mut Context<'_>,
502
25
        buf: &mut [u8],
503
25
    ) -> Poll<IoResult<usize>> {
504
25
        // We're pulling the state object out of the reader.  We MUST
505
25
        // put it back before this function returns.
506
25
        let mut state = self.state.take().expect("Missing state in DataReader");
507

            
508
        loop {
509
33
            let mut future = match state {
510
16
                DataReaderState::Ready(mut imp) => {
511
16
                    // There may be data to read already.
512
16
                    let n_copied = imp.extract_bytes(buf);
513
16
                    if n_copied != 0 {
514
                        // We read data into the buffer.  Tell the caller.
515
4
                        self.state = Some(DataReaderState::Ready(imp));
516
4
                        return Poll::Ready(Ok(n_copied));
517
12
                    }
518
12

            
519
12
                    // No data available!  We have to launch a read.
520
12
                    Box::pin(imp.read_cell())
521
                }
522
17
                DataReaderState::ReadingCell(fut) => fut,
523
                DataReaderState::Closed => {
524
                    self.state = Some(DataReaderState::Closed);
525
                    return Poll::Ready(Err(Error::NotConnected.into()));
526
                }
527
            };
528

            
529
            // We have a future that represents an in-progress read.
530
            // See if it can make progress.
531
29
            match future.as_mut().poll(cx) {
532
4
                Poll::Ready((_imp, Err(e))) => {
533
4
                    // There aren't any survivable errors in the current
534
4
                    // design.
535
4
                    self.state = Some(DataReaderState::Closed);
536
4
                    let result = if matches!(e, Error::EndReceived(EndReason::DONE)) {
537
4
                        Ok(0)
538
                    } else {
539
                        Err(e.into())
540
                    };
541
4
                    return Poll::Ready(result);
542
                }
543
8
                Poll::Ready((imp, Ok(()))) => {
544
8
                    // It read a cell!  Continue the loop.
545
8
                    state = DataReaderState::Ready(imp);
546
8
                }
547
                Poll::Pending => {
548
                    // The future is pending; store it and tell the
549
                    // caller to get back to us later.
550
17
                    self.state = Some(DataReaderState::ReadingCell(future));
551
17
                    return Poll::Pending;
552
                }
553
            }
554
        }
555
25
    }
556
}
557

            
558
#[cfg(feature = "tokio")]
559
impl TokioAsyncRead for DataReader {
560
    fn poll_read(
561
        self: Pin<&mut Self>,
562
        cx: &mut Context<'_>,
563
        buf: &mut ReadBuf<'_>,
564
    ) -> Poll<IoResult<()>> {
565
        TokioAsyncRead::poll_read(Pin::new(&mut self.compat()), cx, buf)
566
    }
567
}
568

            
569
impl DataReaderImpl {
570
    /// Pull as many bytes as we can off of self.pending, and return that
571
    /// number of bytes.
572
16
    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
573
16
        let remainder = &self.pending[self.offset..];
574
16
        let n_to_copy = std::cmp::min(buf.len(), remainder.len());
575
16
        buf[..n_to_copy].copy_from_slice(&remainder[..n_to_copy]);
576
16
        self.offset += n_to_copy;
577
16

            
578
16
        n_to_copy
579
16
    }
580

            
581
    /// Return true iff there are no buffered bytes here to yield
582
4
    fn buf_is_empty(&self) -> bool {
583
4
        self.pending.len() == self.offset
584
4
    }
585

            
586
    /// Load self.pending with the contents of a new data cell.
587
    ///
588
    /// This function takes ownership of self so that we can avoid
589
    /// self-referential lifetimes.
590
20
    async fn read_cell(mut self) -> (Self, Result<()>) {
591
31
        let cell = self.s.recv().await;
592

            
593
20
        let result = match cell {
594
12
            Ok(RelayMsg::Connected(_)) if !self.connected => {
595
12
                self.connected = true;
596
12
                Ok(())
597
            }
598
4
            Ok(RelayMsg::Data(d)) if self.connected => {
599
4
                self.add_data(d.into());
600
4
                Ok(())
601
            }
602
4
            Ok(RelayMsg::End(e)) => Err(Error::EndReceived(e.reason())),
603
            Err(e) => Err(e),
604
            Ok(m) => {
605
                self.s.protocol_error();
606
                Err(Error::StreamProto(format!(
607
                    "Unexpected {} cell on stream",
608
                    m.cmd()
609
                )))
610
            }
611
        };
612

            
613
20
        (self, result)
614
20
    }
615

            
616
    /// Add the data from `d` to the end of our pending bytes.
617
4
    fn add_data(&mut self, mut d: Vec<u8>) {
618
4
        if self.buf_is_empty() {
619
4
            // No data pending?  Just take d as the new pending.
620
4
            self.pending = d;
621
4
            self.offset = 0;
622
4
        } else {
623
            // TODO(nickm) This has potential to grow `pending` without bound.
624
            // Fortunately, we don't currently read cells or call this
625
            // `add_data` method when pending is nonempty—but if we do in the
626
            // future, we'll have to be careful here.
627
            self.pending.append(&mut d);
628
        }
629
4
    }
630
}