1
//! Stream utilities to help implement
2
//! [`AbstractCircMgr`](`super::AbstractCircMgr.`)
3

            
4
use futures::stream::{Fuse, FusedStream, Stream, StreamExt};
5
use futures::task::{Context, Poll};
6
use pin_project::pin_project;
7
use std::pin::Pin;
8

            
9
/// Enumeration to indicate which of two streams provided a result.
10
53
#[derive(Debug, Clone, Eq, PartialEq)]
11
pub(super) enum Source {
12
    /// Indicates a result coming from the left (preferred) stream.
13
    Left,
14
    /// Indicates a result coming from the right (secondary) stream.
15
    Right,
16
}
17

            
18
/// A stream returned by [`select_biased`]
19
///
20
/// See that function for more documentation.
21
313
#[pin_project]
22
pub(super) struct SelectBiased<S, T> {
23
    /// Preferred underlying stream.
24
    ///
25
    /// When results are available from both streams, we always yield them
26
    /// from this one.  When this stream is exhausted, the `SelectBiased`
27
    /// is exhausted too.
28
    #[pin]
29
    left: Fuse<S>,
30
    /// Secondary underlying stream.
31
    #[pin]
32
    right: Fuse<T>,
33
}
34

            
35
/// Combine two instances of [`Stream`] into one.
36
///
37
/// This function is similar to [`futures::stream::select`], but differs
38
/// in that it treats the two underlying streams asymmetrically.  Specifically:
39
///
40
///  * Each result is labeled with [`Source::Left`] or
41
///    [`Source::Right`], depending on which of the two streams it came
42
///    from.
43
///  * If both the "left" and the "right" stream are ready, we always
44
///    prefer the left stream.
45
///  * We stop iterating over this stream when there are no more
46
///    results on the left stream, regardless whether the right stream
47
///    is exhausted or not.
48
///
49
/// # Future plans
50
///
51
/// This might need a better name, especially if we use it anywhere
52
/// else.
53
///
54
/// If we do expose this function, we might want to split up the ways in
55
/// which it differs from `select`.
56
61
pub(super) fn select_biased<S, T>(left: S, right: T) -> SelectBiased<S, T>
57
61
where
58
61
    S: Stream,
59
61
    T: Stream<Item = S::Item>,
60
61
{
61
61
    SelectBiased {
62
61
        left: left.fuse(),
63
61
        right: right.fuse(),
64
61
    }
65
61
}
66

            
67
impl<S, T> FusedStream for SelectBiased<S, T>
68
where
69
    S: Stream,
70
    T: Stream<Item = S::Item>,
71
{
72
2
    fn is_terminated(&self) -> bool {
73
2
        // We're done if the left stream is done, whether the right stream
74
2
        // is done or not.
75
2
        self.left.is_terminated()
76
2
    }
77
}
78

            
79
impl<S, T> Stream for SelectBiased<S, T>
80
where
81
    S: Stream,
82
    T: Stream<Item = S::Item>,
83
{
84
    type Item = (Source, S::Item);
85

            
86
313
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87
313
        let this = self.project();
88
313
        // We always check the left stream first.
89
313
        match this.left.poll_next(cx) {
90
62
            Poll::Ready(Some(val)) => {
91
62
                // The left stream has an item: yield it.
92
62
                return Poll::Ready(Some((Source::Left, val)));
93
            }
94
            Poll::Ready(None) => {
95
                // The left stream is exhausted: don't even check the right.
96
42
                return Poll::Ready(None);
97
            }
98
208
            Poll::Pending => {}
99
208
        }
100
208

            
101
208
        // The left stream is pending: see whether the right stream has
102
208
        // anything to say.
103
208
        match this.right.poll_next(cx) {
104
7
            Poll::Ready(Some(val)) => {
105
7
                // The right stream has an item: yield it.
106
7
                Poll::Ready(Some((Source::Right, val)))
107
            }
108
            _ => {
109
                // The right stream is exhausted or pending: in either case,
110
                // we need to wait.
111
202
                Poll::Pending
112
            }
113
        }
114
313
    }
115
}
116

            
117
#[cfg(test)]
118
mod test {
119
    #![allow(clippy::unwrap_used)]
120
    use super::*;
121
    use futures_await_test::async_test;
122

            
123
    // Tests where only elements from the left stream should be yielded.
124
    #[async_test]
125
    async fn left_only() {
126
        use futures::stream::iter;
127
        use Source::Left as L;
128
        // If there's nothing in the right stream, we just yield the left.
129
        let left = vec![1_usize, 2, 3];
130
        let right = vec![];
131

            
132
        let s = select_biased(iter(left), iter(right));
133
        let result: Vec<_> = s.collect().await;
134
        assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
135

            
136
        // If the left runs out (which this will), we don't yield anything
137
        // from the right.
138
        let left = vec![1_usize, 2, 3];
139
        let right = vec![4, 5, 6];
140
        let s = select_biased(iter(left), iter(right));
141
        let result: Vec<_> = s.collect().await;
142
        assert_eq!(result, vec![(L, 1_usize), (L, 2), (L, 3)]);
143

            
144
        // The same thing happens if the left stream is completely empty!
145
        let left = vec![];
146
        let right = vec![4_usize, 5, 6];
147
        let s = select_biased(iter(left), iter(right));
148
        let result: Vec<_> = s.collect().await;
149
        assert_eq!(result, vec![]);
150
    }
151

            
152
    // Tests where only elements from the right stream should be yielded.
153
    #[async_test]
154
    async fn right_only() {
155
        use futures::stream::{iter, pending};
156
        use Source::Right as R;
157

            
158
        // Try a forever-pending stream for the left hand side.
159
        let left = pending();
160
        let right = vec![4_usize, 5, 6];
161
        let mut s = select_biased(left, iter(right));
162
        assert_eq!(s.next().await, Some((R, 4)));
163
        assert_eq!(s.next().await, Some((R, 5)));
164
        assert_eq!(s.next().await, Some((R, 6)));
165
    }
166

            
167
    // Tests where we can find elements from both streams.
168
    #[async_test]
169
    async fn multiplex() {
170
        use futures::SinkExt;
171
        use Source::{Left as L, Right as R};
172

            
173
        let (mut snd_l, rcv_l) = futures::channel::mpsc::channel(5);
174
        let (mut snd_r, rcv_r) = futures::channel::mpsc::channel(5);
175
        let mut s = select_biased(rcv_l, rcv_r);
176

            
177
        snd_l.send(1_usize).await.unwrap();
178
        snd_r.send(4_usize).await.unwrap();
179
        snd_l.send(2_usize).await.unwrap();
180

            
181
        assert_eq!(s.next().await, Some((L, 1)));
182
        assert_eq!(s.next().await, Some((L, 2)));
183
        assert_eq!(s.next().await, Some((R, 4)));
184

            
185
        snd_r.send(5_usize).await.unwrap();
186
        snd_l.send(3_usize).await.unwrap();
187

            
188
        assert!(!s.is_terminated());
189
        drop(snd_r);
190

            
191
        assert_eq!(s.next().await, Some((L, 3)));
192
        assert_eq!(s.next().await, Some((R, 5)));
193

            
194
        drop(snd_l);
195
        assert_eq!(s.next().await, None);
196

            
197
        assert!(s.is_terminated());
198
    }
199
}