1
//! Implements a simple mock network for testing purposes.
2

            
3
// Note: There are lots of opportunities here for making the network
4
// more and more realistic, but please remember that this module only
5
// exists for writing unit tests.  Let's resist the temptation to add
6
// things we don't need.
7

            
8
use super::io::{stream_pair, LocalStream};
9
use super::MockNetRuntime;
10
use tor_rtcompat::tls::TlsConnector;
11
use tor_rtcompat::{CertifiedConn, Runtime, TcpListener, TcpProvider, TlsProvider};
12

            
13
use async_trait::async_trait;
14
use futures::channel::mpsc;
15
use futures::io::{AsyncRead, AsyncWrite};
16
use futures::lock::Mutex as AsyncMutex;
17
use futures::sink::SinkExt;
18
use futures::stream::{Stream, StreamExt};
19
use futures::FutureExt;
20
use std::collections::HashMap;
21
use std::io::{Error as IoError, ErrorKind, Result as IoResult};
22
use std::net::{IpAddr, SocketAddr};
23
use std::pin::Pin;
24
use std::sync::atomic::{AtomicU16, Ordering};
25
use std::sync::{Arc, Mutex};
26
use std::task::{Context, Poll};
27
use thiserror::Error;
28

            
29
/// A channel sender that we use to send incoming connections to
30
/// listeners.
31
type ConnSender = mpsc::Sender<(LocalStream, SocketAddr)>;
32
/// A channel receiver that listeners use to receive incoming connections.
33
type ConnReceiver = mpsc::Receiver<(LocalStream, SocketAddr)>;
34

            
35
/// A simulated Internet, for testing.
36
///
37
/// We simulate TCP streams only, and skip all the details. Connection
38
/// are implemented using [`LocalStream`]. The MockNetwork object is
39
/// shared by a large set of MockNetworkProviders, each of which has
40
/// its own view of its address(es) on the network.
41
pub struct MockNetwork {
42
    /// A map from address to the entries about listeners there.
43
    listening: Mutex<HashMap<SocketAddr, ListenerEntry>>,
44
}
45

            
46
/// The `MockNetwork`'s view of a listener.
47
24
#[derive(Clone)]
48
struct ListenerEntry {
49
    /// A sender that need to be informed about connection attempts
50
    /// there.
51
    send: ConnSender,
52

            
53
    /// A notional TLS certificate for this listener.  If absent, the
54
    /// listener isn't a TLS listener.
55
    tls_cert: Option<Vec<u8>>,
56
}
57

            
58
/// A view of a single host's access to a MockNetwork.
59
///
60
/// Each simulated host has its own addresses that it's allowed to listen on,
61
/// and a reference to the network.
62
///
63
/// This type implements [`TcpProvider`] so that it can be used as a
64
/// drop-in replacement for testing code that uses the network.
65
///
66
/// # Limitations
67
///
68
/// There's no randomness here, so we can't simulate the weirdness of
69
/// real networks.
70
///
71
/// So far, there's no support for DNS or UDP.
72
///
73
/// We don't handle localhost specially, and we don't simulate providers
74
/// that can connect to some addresses but not all.
75
///
76
/// We don't do the right thing (block) if there is a listener that
77
/// never calls accept.
78
///
79
/// We use a simple `u16` counter to decide what arbitrary port
80
/// numbers to use: Once that counter is exhausted, we will fail with
81
/// an assertion.  We don't do anything to prevent those arbitrary
82
/// ports from colliding with specified ports, other than declare that
83
/// you can't have two listeners on the same addr:port at the same
84
/// time.
85
///
86
/// We pretend to provide TLS, but there's no actual encryption or
87
/// authentication.
88
#[derive(Clone)]
89
pub struct MockNetProvider {
90
    /// Actual implementation of this host's view of the network.
91
    ///
92
    /// We have to use a separate type here and reference count it,
93
    /// since the `next_port` counter needs to be shared.
94
    inner: Arc<MockNetProviderInner>,
95
}
96

            
97
/// Shared part of a MockNetworkProvider.
98
///
99
/// This is separate because providers need to implement Clone, but
100
/// `next_port` can't be cloned.
101
struct MockNetProviderInner {
102
    /// List of public addresses
103
    addrs: Vec<IpAddr>,
104
    /// Shared reference to the network.
105
    net: Arc<MockNetwork>,
106
    /// Next port number to hand out when we're asked to listen on
107
    /// port 0.
108
    ///
109
    /// See discussion of limitations on `listen()` implementation.
110
    next_port: AtomicU16,
111
}
112

            
113
/// A [`TcpListener`] implementation returned by a [`MockNetProvider`].
114
///
115
/// Represents listening on a public address for incoming TCP connections.
116
pub struct MockNetListener {
117
    /// The address that we're listening on.
118
    addr: SocketAddr,
119
    /// The incoming channel that tells us about new connections.
120
    // TODO: I'm not thrilled to have to use an AsyncMutex and a
121
    // std Mutex in the same module.
122
    receiver: AsyncMutex<ConnReceiver>,
123
}
124

            
125
/// A builder object used to configure a [`MockNetProvider`]
126
///
127
/// Returned by [`MockNetwork::builder()`].
128
pub struct ProviderBuilder {
129
    /// List of public addresses.
130
    addrs: Vec<IpAddr>,
131
    /// Shared reference to the network.
132
    net: Arc<MockNetwork>,
133
}
134

            
135
impl MockNetwork {
136
    /// Make a new MockNetwork with no active listeners.
137
14
    pub fn new() -> Arc<Self> {
138
14
        Arc::new(MockNetwork {
139
14
            listening: Mutex::new(HashMap::new()),
140
14
        })
141
14
    }
142

            
143
    /// Return a [`ProviderBuilder`] for creating a [`MockNetProvider`]
144
    ///
145
    /// # Examples
146
    ///
147
    /// ```
148
    /// # use tor_rtmock::net::*;
149
    /// # let mock_network = MockNetwork::new();
150
    /// let client_net = mock_network.builder()
151
    ///       .add_address("198.51.100.6".parse().unwrap())
152
    ///       .add_address("2001:db8::7".parse().unwrap())
153
    ///       .provider();
154
    /// ```
155
27
    pub fn builder(self: &Arc<Self>) -> ProviderBuilder {
156
27
        ProviderBuilder {
157
27
            addrs: vec![],
158
27
            net: Arc::clone(self),
159
27
        }
160
27
    }
161

            
162
    /// Tell the listener at `target_addr` (if any) about an incoming
163
    /// connection from `source_addr` at `peer_stream`.
164
    ///
165
    /// If the listener is a TLS listener, returns its certificate.
166
    /// **Note:** Callers should check whether the presence or absence of a certificate
167
    /// matches their expectations.
168
    ///
169
    /// Returns an error if there isn't any such listener.
170
28
    async fn send_connection(
171
28
        &self,
172
28
        source_addr: SocketAddr,
173
28
        target_addr: SocketAddr,
174
28
        peer_stream: LocalStream,
175
28
    ) -> IoResult<Option<Vec<u8>>> {
176
28
        let entry = {
177
28
            let listener_map = self.listening.lock().expect("Poisoned lock for listener");
178
28
            listener_map.get(&target_addr).map(Clone::clone)
179
        };
180
28
        if let Some(mut entry) = entry {
181
24
            if entry.send.send((peer_stream, source_addr)).await.is_ok() {
182
24
                return Ok(entry.tls_cert);
183
            }
184
4
        }
185
4
        Err(err(ErrorKind::ConnectionRefused))
186
28
    }
187

            
188
    /// Register a listener at `addr` and return the ConnReceiver
189
    /// that it should use for connections.
190
    ///
191
    /// If tls_cert is provided, then the listener is a TLS listener
192
    /// and any only TLS connection attempts should succeed.
193
    ///
194
    /// Returns an error if the address is already in use.
195
13
    fn add_listener(&self, addr: SocketAddr, tls_cert: Option<Vec<u8>>) -> IoResult<ConnReceiver> {
196
13
        let mut listener_map = self.listening.lock().expect("Poisoned lock for listener");
197
13
        if listener_map.contains_key(&addr) {
198
            // TODO: Maybe this should ignore dangling Weak references?
199
            return Err(err(ErrorKind::AddrInUse));
200
13
        }
201
13

            
202
13
        let (send, recv) = mpsc::channel(16);
203
13

            
204
13
        let entry = ListenerEntry { send, tls_cert };
205
13

            
206
13
        listener_map.insert(addr, entry);
207
13

            
208
13
        Ok(recv)
209
13
    }
210
}
211

            
212
impl ProviderBuilder {
213
    /// Add `addr` as a new address for the provider we're building.
214
28
    pub fn add_address(&mut self, addr: IpAddr) -> &mut Self {
215
28
        self.addrs.push(addr);
216
28
        self
217
28
    }
218
    /// Use this builder to return a new [`MockNetRuntime`] wrapping
219
    /// an existing `runtime`.
220
2
    pub fn runtime<R: Runtime>(&self, runtime: R) -> super::MockNetRuntime<R> {
221
2
        MockNetRuntime::new(runtime, self.provider())
222
2
    }
223
    /// Use this builder to return a new [`MockNetProvider`]
224
27
    pub fn provider(&self) -> MockNetProvider {
225
27
        let inner = MockNetProviderInner {
226
27
            addrs: self.addrs.clone(),
227
27
            net: Arc::clone(&self.net),
228
27
            next_port: AtomicU16::new(1),
229
27
        };
230
27
        MockNetProvider {
231
27
            inner: Arc::new(inner),
232
27
        }
233
27
    }
234
}
235

            
236
#[async_trait]
237
impl TcpListener for MockNetListener {
238
    type TcpStream = LocalStream;
239

            
240
    type Incoming = Self;
241

            
242
12
    async fn accept(&self) -> IoResult<(Self::TcpStream, SocketAddr)> {
243
12
        let mut receiver = self.receiver.lock().await;
244
12
        receiver
245
12
            .next()
246
            .await
247
12
            .ok_or_else(|| err(ErrorKind::BrokenPipe))
248
24
    }
249

            
250
9
    fn local_addr(&self) -> IoResult<SocketAddr> {
251
9
        Ok(self.addr)
252
9
    }
253

            
254
4
    fn incoming(self) -> Self {
255
4
        self
256
4
    }
257
}
258

            
259
impl Stream for MockNetListener {
260
    type Item = IoResult<(LocalStream, SocketAddr)>;
261
12
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
262
12
        let mut recv = futures::ready!(self.receiver.lock().poll_unpin(cx));
263
12
        match recv.poll_next_unpin(cx) {
264
            Poll::Pending => Poll::Pending,
265
            Poll::Ready(None) => Poll::Ready(None),
266
12
            Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
267
        }
268
12
    }
269
}
270

            
271
impl MockNetProvider {
272
    /// If we have a local addresses that is in the same family as `other`,
273
    /// return it.
274
40
    fn get_addr_in_family(&self, other: &IpAddr) -> Option<IpAddr> {
275
40
        self.inner
276
40
            .addrs
277
40
            .iter()
278
41
            .find(|a| a.is_ipv4() == other.is_ipv4())
279
40
            .copied()
280
40
    }
281

            
282
    /// Return an arbitrary port number that we haven't returned from
283
    /// this function before.
284
    ///
285
    /// # Panics
286
    ///
287
    /// Panics if there are no remaining ports that this function hasn't
288
    /// returned before.
289
31
    fn arbitrary_port(&self) -> u16 {
290
31
        let next = self.inner.next_port.fetch_add(1, Ordering::Relaxed);
291
31
        assert!(next != 0);
292
31
        next
293
31
    }
294

            
295
    /// Helper for connecting: Picks the socketaddr to use
296
    /// when told to connect to `addr`.
297
    ///
298
    /// The IP is one of our own IPs with the same family as `addr`.
299
    /// The port is a port that we haven't used as an arbitrary port
300
    /// before.
301
28
    fn get_origin_addr_for(&self, addr: &SocketAddr) -> IoResult<SocketAddr> {
302
28
        let my_addr = self
303
28
            .get_addr_in_family(&addr.ip())
304
28
            .ok_or_else(|| err(ErrorKind::AddrNotAvailable))?;
305
28
        Ok(SocketAddr::new(my_addr, self.arbitrary_port()))
306
28
    }
307

            
308
    /// Helper for binding a listener: Picks the socketaddr to use
309
    /// when told to bind to `addr`.
310
    ///
311
    /// If addr is `0.0.0.0` or `[::]`, then we pick one of our own
312
    /// addresses with the same family. Otherwise we fail unless `addr` is
313
    /// one of our own addresses.
314
    ///
315
    /// If port is 0, we pick a new arbitrary port we haven't used as
316
    /// an arbitrary port before.
317
21
    fn get_listener_addr(&self, spec: &SocketAddr) -> IoResult<SocketAddr> {
318
19
        let ipaddr = {
319
21
            let ip = spec.ip();
320
21
            if ip.is_unspecified() {
321
12
                self.get_addr_in_family(&ip)
322
12
                    .ok_or_else(|| err(ErrorKind::AddrNotAvailable))?
323
12
            } else if self.inner.addrs.iter().any(|a| a == &ip) {
324
7
                ip
325
            } else {
326
2
                return Err(err(ErrorKind::AddrNotAvailable));
327
            }
328
        };
329
19
        let port = {
330
19
            if spec.port() == 0 {
331
3
                self.arbitrary_port()
332
            } else {
333
16
                spec.port()
334
            }
335
        };
336

            
337
19
        Ok(SocketAddr::new(ipaddr, port))
338
21
    }
339

            
340
    /// Create a mock TLS listener with provided certificate.
341
    ///
342
    /// Note that no encryption or authentication is actually
343
    /// performed!  Other parties are simply told that their connections
344
    /// succeeded and were authenticated against the given certificate.
345
5
    pub fn listen_tls(&self, addr: &SocketAddr, tls_cert: Vec<u8>) -> IoResult<MockNetListener> {
346
5
        let addr = self.get_listener_addr(addr)?;
347

            
348
5
        let receiver = AsyncMutex::new(self.inner.net.add_listener(addr, Some(tls_cert))?);
349

            
350
5
        Ok(MockNetListener { addr, receiver })
351
5
    }
352
}
353

            
354
#[async_trait]
355
impl TcpProvider for MockNetProvider {
356
    type TcpStream = LocalStream;
357
    type TcpListener = MockNetListener;
358

            
359
28
    async fn connect(&self, addr: &SocketAddr) -> IoResult<LocalStream> {
360
28
        let my_addr = self.get_origin_addr_for(addr)?;
361
28
        let (mut mine, theirs) = stream_pair();
362

            
363
28
        let cert = self
364
28
            .inner
365
28
            .net
366
28
            .send_connection(my_addr, *addr, theirs)
367
4
            .await?;
368

            
369
24
        mine.tls_cert = cert;
370
24

            
371
24
        Ok(mine)
372
56
    }
373

            
374
8
    async fn listen(&self, addr: &SocketAddr) -> IoResult<Self::TcpListener> {
375
8
        let addr = self.get_listener_addr(addr)?;
376

            
377
8
        let receiver = AsyncMutex::new(self.inner.net.add_listener(addr, None)?);
378

            
379
8
        Ok(MockNetListener { addr, receiver })
380
16
    }
381
}
382

            
383
#[async_trait]
384
impl TlsProvider<LocalStream> for MockNetProvider {
385
    type Connector = MockTlsConnector;
386
    type TlsStream = MockTlsStream;
387

            
388
8
    fn tls_connector(&self) -> MockTlsConnector {
389
8
        MockTlsConnector {}
390
8
    }
391
}
392

            
393
/// Mock TLS connector for use with MockNetProvider.
394
///
395
/// Note that no TLS is actually performed here: connections are simply
396
/// told that they succeeded with a given certificate.
397
#[derive(Clone)]
398
#[non_exhaustive]
399
pub struct MockTlsConnector;
400

            
401
/// Mock TLS connector for use with MockNetProvider.
402
///
403
/// Note that no TLS is actually performed here: connections are simply
404
/// told that they succeeded with a given certificate.
405
///
406
/// Note also that we only use this type for client-side connections
407
/// right now: Arti doesn't support being a real TLS Listener yet,
408
/// since we only handle Tor client operations.
409
pub struct MockTlsStream {
410
    /// The peer certificate that we are pretending our peer has.
411
    peer_cert: Option<Vec<u8>>,
412
    /// The underlying stream.
413
    stream: LocalStream,
414
}
415

            
416
#[async_trait]
417
impl TlsConnector<LocalStream> for MockTlsConnector {
418
    type Conn = MockTlsStream;
419

            
420
8
    async fn negotiate_unvalidated(
421
8
        &self,
422
8
        mut stream: LocalStream,
423
8
        _sni_hostname: &str,
424
8
    ) -> IoResult<MockTlsStream> {
425
8
        let peer_cert = stream.tls_cert.take();
426
8

            
427
8
        if peer_cert.is_none() {
428
            return Err(std::io::Error::new(
429
                std::io::ErrorKind::Other,
430
                "attempted to wrap non-TLS stream!",
431
            ));
432
8
        }
433
8

            
434
8
        Ok(MockTlsStream { peer_cert, stream })
435
16
    }
436
}
437

            
438
impl CertifiedConn for MockTlsStream {
439
8
    fn peer_certificate(&self) -> IoResult<Option<Vec<u8>>> {
440
8
        Ok(self.peer_cert.clone())
441
8
    }
442
}
443

            
444
impl AsyncRead for MockTlsStream {
445
72
    fn poll_read(
446
72
        mut self: Pin<&mut Self>,
447
72
        cx: &mut Context<'_>,
448
72
        buf: &mut [u8],
449
72
    ) -> Poll<IoResult<usize>> {
450
72
        Pin::new(&mut self.stream).poll_read(cx, buf)
451
72
    }
452
}
453
impl AsyncWrite for MockTlsStream {
454
20
    fn poll_write(
455
20
        mut self: Pin<&mut Self>,
456
20
        cx: &mut Context<'_>,
457
20
        buf: &[u8],
458
20
    ) -> Poll<IoResult<usize>> {
459
20
        Pin::new(&mut self.stream).poll_write(cx, buf)
460
20
    }
461
8
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
462
8
        Pin::new(&mut self.stream).poll_flush(cx)
463
8
    }
464
4
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
465
4
        Pin::new(&mut self.stream).poll_close(cx)
466
4
    }
467
}
468

            
469
/// Inner error type returned when a `MockNetwork` operation fails.
470
#[derive(Clone, Error, Debug)]
471
#[non_exhaustive]
472
pub enum MockNetError {
473
    /// General-purpose error.  The real information is in `ErrorKind`.
474
    #[error("Invalid operation on mock network")]
475
    BadOp,
476
}
477

            
478
/// Wrap `k` in a new [`std::io::Error`].
479
6
fn err(k: ErrorKind) -> IoError {
480
6
    IoError::new(k, MockNetError::BadOp)
481
6
}
482

            
483
#[cfg(test)]
484
mod test {
485
    #![allow(clippy::unwrap_used)]
486
    use super::*;
487
    use futures::io::{AsyncReadExt, AsyncWriteExt};
488
    use tor_rtcompat::test_with_all_runtimes;
489

            
490
    fn client_pair() -> (MockNetProvider, MockNetProvider) {
491
        let net = MockNetwork::new();
492
        let client1 = net
493
            .builder()
494
            .add_address("192.0.2.55".parse().unwrap())
495
            .provider();
496
        let client2 = net
497
            .builder()
498
            .add_address("198.51.100.7".parse().unwrap())
499
            .provider();
500

            
501
        (client1, client2)
502
    }
503

            
504
    #[test]
505
    fn end_to_end() {
506
        test_with_all_runtimes!(|_rt| async {
507
            let (client1, client2) = client_pair();
508
            let lis = client2.listen(&"0.0.0.0:99".parse().unwrap()).await?;
509
            let address = lis.local_addr()?;
510

            
511
            let (r1, r2): (IoResult<()>, IoResult<()>) = futures::join!(
512
                async {
513
                    let mut conn = client1.connect(&address).await?;
514
                    conn.write_all(b"This is totally a network.").await?;
515
                    conn.close().await?;
516

            
517
                    // Nobody listening here...
518
                    let a2 = "192.0.2.200:99".parse().unwrap();
519
                    let cant_connect = client1.connect(&a2).await;
520
                    assert!(cant_connect.is_err());
521
                    Ok(())
522
                },
523
                async {
524
                    let (mut conn, a) = lis.accept().await?;
525
                    assert_eq!(a.ip(), "192.0.2.55".parse::<IpAddr>().unwrap());
526
                    let mut inp = Vec::new();
527
                    conn.read_to_end(&mut inp).await?;
528
                    assert_eq!(&inp[..], &b"This is totally a network."[..]);
529
                    Ok(())
530
                }
531
            );
532
            r1?;
533
            r2?;
534
            IoResult::Ok(())
535
        });
536
    }
537

            
538
    #[test]
539
    fn pick_listener_addr() -> IoResult<()> {
540
        let net = MockNetwork::new();
541
        let ip4 = "192.0.2.55".parse().unwrap();
542
        let ip6 = "2001:db8::7".parse().unwrap();
543
        let client = net.builder().add_address(ip4).add_address(ip6).provider();
544

            
545
        // Successful cases
546
        let a1 = client.get_listener_addr(&"0.0.0.0:99".parse().unwrap())?;
547
        assert_eq!(a1.ip(), ip4);
548
        assert_eq!(a1.port(), 99);
549
        let a2 = client.get_listener_addr(&"192.0.2.55:100".parse().unwrap())?;
550
        assert_eq!(a2.ip(), ip4);
551
        assert_eq!(a2.port(), 100);
552
        let a3 = client.get_listener_addr(&"192.0.2.55:0".parse().unwrap())?;
553
        assert_eq!(a3.ip(), ip4);
554
        assert!(a3.port() != 0);
555
        let a4 = client.get_listener_addr(&"0.0.0.0:0".parse().unwrap())?;
556
        assert_eq!(a4.ip(), ip4);
557
        assert!(a4.port() != 0);
558
        assert!(a4.port() != a3.port());
559
        let a5 = client.get_listener_addr(&"[::]:99".parse().unwrap())?;
560
        assert_eq!(a5.ip(), ip6);
561
        assert_eq!(a5.port(), 99);
562
        let a6 = client.get_listener_addr(&"[2001:db8::7]:100".parse().unwrap())?;
563
        assert_eq!(a6.ip(), ip6);
564
        assert_eq!(a6.port(), 100);
565

            
566
        // Failing cases
567
        let e1 = client.get_listener_addr(&"192.0.2.56:0".parse().unwrap());
568
        let e2 = client.get_listener_addr(&"[2001:db8::8]:0".parse().unwrap());
569
        assert!(e1.is_err());
570
        assert!(e2.is_err());
571

            
572
        IoResult::Ok(())
573
    }
574

            
575
    #[test]
576
    fn listener_stream() {
577
        test_with_all_runtimes!(|_rt| async {
578
            let (client1, client2) = client_pair();
579

            
580
            let lis = client2.listen(&"0.0.0.0:99".parse().unwrap()).await?;
581
            let address = lis.local_addr()?;
582
            let mut incoming = lis.incoming();
583

            
584
            let (r1, r2): (IoResult<()>, IoResult<()>) = futures::join!(
585
                async {
586
                    for _ in 0..3_u8 {
587
                        let mut c = client1.connect(&address).await?;
588
                        c.close().await?;
589
                    }
590
                    Ok(())
591
                },
592
                async {
593
                    for _ in 0..3_u8 {
594
                        let (mut c, a) = incoming.next().await.unwrap()?;
595
                        let mut v = Vec::new();
596
                        let _ = c.read_to_end(&mut v).await?;
597
                        assert_eq!(a.ip(), "192.0.2.55".parse::<IpAddr>().unwrap());
598
                    }
599
                    Ok(())
600
                }
601
            );
602
            r1?;
603
            r2?;
604
            IoResult::Ok(())
605
        });
606
    }
607

            
608
    #[test]
609
    fn tls_basics() {
610
        let (client1, client2) = client_pair();
611
        let cert = b"I am certified for something I assure you.";
612

            
613
        let lis = client2
614
            .listen_tls(&"0.0.0.0:0".parse().unwrap(), cert[..].into())
615
            .unwrap();
616
        let address = lis.local_addr().unwrap();
617

            
618
        test_with_all_runtimes!(|_rt| async {
619
            let (r1, r2): (IoResult<()>, IoResult<()>) = futures::join!(
620
                async {
621
                    let connector = client1.tls_connector();
622
                    let conn = client1.connect(&address).await?;
623
                    let mut conn = connector
624
                        .negotiate_unvalidated(conn, "zombo.example.com")
625
                        .await?;
626
                    assert_eq!(&conn.peer_certificate()?.unwrap()[..], &cert[..]);
627
                    conn.write_all(b"This is totally encrypted.").await?;
628
                    let mut v = Vec::new();
629
                    conn.read_to_end(&mut v).await?;
630
                    conn.close().await?;
631
                    assert_eq!(v[..], b"Yup, your secrets is safe"[..]);
632
                    Ok(())
633
                },
634
                async {
635
                    let (mut conn, a) = lis.accept().await?;
636
                    assert_eq!(a.ip(), "192.0.2.55".parse::<IpAddr>().unwrap());
637
                    let mut inp = [0_u8; 26];
638
                    conn.read_exact(&mut inp[..]).await?;
639
                    assert_eq!(&inp[..], &b"This is totally encrypted."[..]);
640
                    conn.write_all(b"Yup, your secrets is safe").await?;
641
                    Ok(())
642
                }
643
            );
644
            r1?;
645
            r2?;
646
            IoResult::Ok(())
647
        });
648
    }
649
}