1
1
//! Tools for generating a stream of structured events, similar to C tor's `ControlPort`.
2

            
3
#![deny(missing_docs)]
4
#![warn(noop_method_call)]
5
#![deny(unreachable_pub)]
6
#![warn(clippy::all)]
7
#![deny(clippy::await_holding_lock)]
8
#![deny(clippy::cargo_common_metadata)]
9
#![deny(clippy::cast_lossless)]
10
#![deny(clippy::checked_conversions)]
11
#![warn(clippy::cognitive_complexity)]
12
#![deny(clippy::debug_assert_with_mut_call)]
13
#![deny(clippy::exhaustive_enums)]
14
#![deny(clippy::exhaustive_structs)]
15
#![deny(clippy::expl_impl_clone_on_copy)]
16
#![deny(clippy::fallible_impl_from)]
17
#![deny(clippy::implicit_clone)]
18
#![deny(clippy::large_stack_arrays)]
19
#![warn(clippy::manual_ok_or)]
20
#![deny(clippy::missing_docs_in_private_items)]
21
#![deny(clippy::missing_panics_doc)]
22
#![warn(clippy::needless_borrow)]
23
#![warn(clippy::needless_pass_by_value)]
24
#![warn(clippy::option_option)]
25
#![warn(clippy::rc_buffer)]
26
#![deny(clippy::ref_option_ref)]
27
#![warn(clippy::semicolon_if_nothing_returned)]
28
#![warn(clippy::trait_duplication_in_bounds)]
29
#![deny(clippy::unnecessary_wraps)]
30
#![warn(clippy::unseparated_literal_suffix)]
31
#![deny(clippy::unwrap_used)]
32

            
33
pub mod events;
34

            
35
use crate::events::{TorEvent, TorEventKind};
36
use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
37
use futures::channel::mpsc;
38
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
39
use futures::future::Either;
40
use futures::StreamExt;
41
use once_cell::sync::OnceCell;
42
use std::pin::Pin;
43
use std::sync::atomic::{AtomicUsize, Ordering};
44
use std::task::{Context, Poll};
45
use thiserror::Error;
46
use tracing::{error, warn};
47

            
48
/// Pointer to an `UnboundedSender`, used to send events into the `EventReactor`.
49
static EVENT_SENDER: OnceCell<UnboundedSender<TorEvent>> = OnceCell::new();
50
/// An inactive receiver for the currently active broadcast channel, if there is one.
51
static CURRENT_RECEIVER: OnceCell<InactiveReceiver<TorEvent>> = OnceCell::new();
52
/// The number of `TorEventKind`s there are.
53
const EVENT_KIND_COUNT: usize = 1;
54
/// An array containing one `AtomicUsize` for each `TorEventKind`, used to track subscriptions.
55
///
56
/// When a `TorEventReceiver` subscribes to a `TorEventKind`, it uses its `usize` value to index
57
/// into this array and increment the associated `AtomicUsize` (and decrements it to unsubscribe).
58
/// This lets event emitters check whether there are any subscribers, and avoid emitting events
59
/// if there aren't.
60
static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];
61

            
62
/// The size of the internal broadcast channel used to implement event subscription.
63
pub static BROADCAST_CAPACITY: usize = 512;
64

            
65
/// A reactor used to forward events to make the event reporting system work.
66
///
67
/// # Note
68
///
69
/// Currently, this type is a singleton; there is one event reporting system used for the entire
70
/// program. This is not stable, and may change in future.
71
pub struct EventReactor {
72
    /// A receiver that the reactor uses to learn about incoming events.
73
    ///
74
    /// This is unbounded so that event publication doesn't have to be async.
75
    receiver: UnboundedReceiver<TorEvent>,
76
    /// A sender that the reactor uses to publish events.
77
    ///
78
    /// Events are only sent here if at least one subscriber currently wants them.
79
    broadcast: Sender<TorEvent>,
80
}
81

            
82
impl EventReactor {
83
    /// Initialize the event reporting system, returning a reactor that must be run for it to work,
84
    /// and a `TorEventReceiver` that can be used to extract events from the system. If the system
85
    /// has already been initialized, returns `None` instead of a reactor.
86
    ///
87
    /// # Warnings
88
    ///
89
    /// The returned reactor *must* be run with `EventReactor::run`, in a background async task.
90
    /// If it is not, the event system might consume unbounded amounts of memory.
91
4
    pub fn new() -> Option<Self> {
92
4
        let (tx, rx) = mpsc::unbounded();
93
4
        if EVENT_SENDER.set(tx).is_ok() {
94
1
            let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
95
1
            CURRENT_RECEIVER
96
1
                .set(brx.deactivate())
97
1
                .expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
98
1
            Some(Self {
99
1
                receiver: rx,
100
1
                broadcast: btx,
101
1
            })
102
        } else {
103
3
            None
104
        }
105
4
    }
106
    /// Get a `TorEventReceiver` to receive events from, assuming an `EventReactor` is already
107
    /// running somewhere. (If it isn't, returns `None`.)
108
    ///
109
    /// As noted in the type-level documentation, this function might not always work this way.
110
4
    pub fn receiver() -> Option<TorEventReceiver> {
111
4
        CURRENT_RECEIVER
112
4
            .get()
113
4
            .map(|rx| TorEventReceiver::wrap(rx.clone()))
114
4
    }
115
    /// Run the event forwarding reactor.
116
    ///
117
    /// You *must* call this function once a reactor is created.
118
1
    pub async fn run(mut self) {
119
2
        while let Some(event) = self.receiver.next().await {
120
1
            match self.broadcast.try_broadcast(event) {
121
1
                Ok(_) => {}
122
                Err(TrySendError::Closed(_)) => break,
123
                Err(TrySendError::Full(event)) => {
124
                    // If the channel is full, do a blocking broadcast to wait for it to be
125
                    // not full, and log a warning about receivers lagging behind.
126
                    warn!("TorEventReceivers aren't receiving events fast enough!");
127
                    if self.broadcast.broadcast(event).await.is_err() {
128
                        break;
129
                    }
130
                }
131
                Err(TrySendError::Inactive(_)) => {
132
                    // no active receivers, so just drop the event on the floor.
133
                }
134
            }
135
        }
136
        // It shouldn't be possible to get here, since we have globals keeping the channels
137
        // open. Still, if we somehow do, log an error about it.
138
        error!("event reactor shutting down; this shouldn't ever happen");
139
    }
140
}
141

            
142
/// An error encountered when trying to receive a `TorEvent`.
143
#[derive(Clone, Debug, Error)]
144
#[non_exhaustive]
145
pub enum ReceiverError {
146
    /// The receiver isn't subscribed to anything, so wouldn't ever return any events.
147
    #[error("no event subscriptions")]
148
    NoSubscriptions,
149
    /// The internal broadcast channel was closed, which shouldn't ever happen.
150
    #[error("internal event broadcast channel closed")]
151
    ChannelClosed,
152
}
153

            
154
/// A receiver for `TorEvent`s emitted by other users of this crate.
155
///
156
/// To use this type, first subscribe to some kinds of event by calling
157
/// `TorEventReceiver::subscribe`. Then, consume events using the implementation of
158
/// `futures::stream::Stream`.
159
///
160
/// # Warning
161
///
162
/// Once interest in events has been signalled with `subscribe`, events must be continuously
163
/// read from the receiver in order to avoid excessive memory consumption.
164
#[derive(Clone, Debug)]
165
pub struct TorEventReceiver {
166
    /// If no events have been subscribed to yet, this is an `InactiveReceiver`; otherwise,
167
    /// it's a `Receiver`.
168
    inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
169
    /// Whether we're subscribed to each event kind (if `subscribed[kind]` is true, we're
170
    /// subscribed to `kind`).
171
    subscribed: [bool; EVENT_KIND_COUNT],
172
}
173

            
174
impl futures::stream::Stream for TorEventReceiver {
175
    type Item = TorEvent;
176

            
177
5
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
178
5
        let this = self.get_mut();
179
5
        match this.inner {
180
4
            Either::Left(ref mut active) => loop {
181
4
                match Pin::new(&mut *active).poll_next(cx) {
182
1
                    Poll::Ready(Some(e)) => {
183
1
                        if this.subscribed[e.kind() as usize] {
184
1
                            return Poll::Ready(Some(e));
185
                        }
186
                        // loop, since we weren't subscribed to that event
187
                    }
188
3
                    x => return x,
189
                }
190
            },
191
            Either::Right(_) => {
192
1
                warn!("TorEventReceiver::poll_next() called without subscriptions!");
193
1
                Poll::Ready(None)
194
            }
195
        }
196
5
    }
197
}
198

            
199
impl TorEventReceiver {
200
    /// Create a `TorEventReceiver` from an `InactiveReceiver` handle.
201
4
    pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
202
4
        Self {
203
4
            inner: Either::Right(rx),
204
4
            subscribed: [false; EVENT_KIND_COUNT],
205
4
        }
206
4
    }
207
    /// Subscribe to a given kind of `TorEvent`.
208
    ///
209
    /// After calling this function, `TorEventReceiver::recv` will emit events of that kind.
210
    /// This function is idempotent (subscribing twice has the same effect as doing so once).
211
7
    pub fn subscribe(&mut self, kind: TorEventKind) {
212
7
        if !self.subscribed[kind as usize] {
213
5
            EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
214
5
            self.subscribed[kind as usize] = true;
215
5
        }
216
        // FIXME(eta): cloning is ungood, but hard to avoid
217
7
        if let Either::Right(inactive) = self.inner.clone() {
218
5
            self.inner = Either::Left(inactive.activate());
219
5
        }
220
7
    }
221
    /// Unsubscribe from a given kind of `TorEvent`.
222
    ///
223
    /// After calling this function, `TorEventReceiver::recv` will no longer emit events of that
224
    /// kind.
225
    /// This function is idempotent (unsubscribing twice has the same effect as doing so once).
226
2
    pub fn unsubscribe(&mut self, kind: TorEventKind) {
227
2
        if self.subscribed[kind as usize] {
228
2
            EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
229
2
            self.subscribed[kind as usize] = false;
230
2
        }
231
        // If we're now not subscribed to anything, deactivate our channel.
232
2
        if self.subscribed.iter().all(|x| !*x) {
233
            // FIXME(eta): cloning is ungood, but hard to avoid
234
2
            if let Either::Left(active) = self.inner.clone() {
235
2
                self.inner = Either::Right(active.deactivate());
236
2
            }
237
        }
238
2
    }
239
}
240

            
241
impl Drop for TorEventReceiver {
242
4
    fn drop(&mut self) {
243
4
        for (i, subscribed) in self.subscribed.iter().enumerate() {
244
            // FIXME(eta): duplicates logic from Self::unsubscribe, because it's not possible
245
            //             to go from a `usize` to a `TorEventKind`
246
4
            if *subscribed {
247
3
                EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
248
3
            }
249
        }
250
4
    }
251
}
252

            
253
/// Returns a boolean indicating whether the event `kind` has any subscribers (as in,
254
/// whether `TorEventReceiver::subscribe` has been called with that event kind).
255
///
256
/// This is useful to avoid doing work to generate events that might be computationally expensive
257
/// to generate.
258
10
pub fn event_has_subscribers(kind: TorEventKind) -> bool {
259
10
    EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
260
10
}
261

            
262
/// Broadcast the given `TorEvent` to any interested subscribers.
263
///
264
/// As an optimization, does nothing if the event has no subscribers (`event_has_subscribers`
265
/// returns false). (also does nothing if the event subsystem hasn't been initialized yet)
266
///
267
/// This function isn't intended for use outside Arti crates (as in, library consumers of Arti
268
/// shouldn't broadcast events!).
269
2
pub fn broadcast(event: TorEvent) {
270
2
    if !event_has_subscribers(event.kind()) {
271
1
        return;
272
1
    }
273
1
    if let Some(sender) = EVENT_SENDER.get() {
274
1
        // If this fails, there isn't much we can really do about it!
275
1
        let _ = sender.unbounded_send(event);
276
1
    }
277
2
}
278

            
279
#[cfg(test)]
280
mod test {
281
    #![allow(clippy::unwrap_used)]
282
    use crate::{
283
        broadcast, event_has_subscribers, EventReactor, StreamExt, TorEvent, TorEventKind,
284
    };
285
    use once_cell::sync::OnceCell;
286
    use std::sync::{Mutex, MutexGuard};
287
    use std::time::Duration;
288
    use tokio::runtime::Runtime;
289

            
290
    // HACK(eta): these tests need to run effectively singlethreaded, since they mutate global
291
    //            state. They *also* need to share the same tokio runtime, which the
292
    //            #[tokio::test] thing doesn't do (it makes a new runtime per test), because of
293
    //            the need to have a background singleton EventReactor.
294
    //
295
    //            To hack around this, we just have a global runtime protected by a mutex!
296
    static TEST_MUTEX: OnceCell<Mutex<Runtime>> = OnceCell::new();
297

            
298
    /// Locks the mutex, and makes sure the event reactor is initialized.
299
    fn test_setup() -> MutexGuard<'static, Runtime> {
300
        let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
301
        let runtime = mutex
302
            .lock()
303
            .expect("mutex poisoned, probably by other failing tests");
304
        if let Some(reactor) = EventReactor::new() {
305
            runtime.handle().spawn(reactor.run());
306
        }
307
        runtime
308
    }
309

            
310
    #[test]
311
    fn subscriptions() {
312
        let rt = test_setup();
313

            
314
        rt.block_on(async move {
315
            // shouldn't have any subscribers at the start
316
            assert!(!event_has_subscribers(TorEventKind::Empty));
317

            
318
            let mut rx = EventReactor::receiver().unwrap();
319
            // creating a receiver shouldn't result in any subscriptions
320
            assert!(!event_has_subscribers(TorEventKind::Empty));
321

            
322
            rx.subscribe(TorEventKind::Empty);
323
            // subscription should work
324
            assert!(event_has_subscribers(TorEventKind::Empty));
325

            
326
            rx.unsubscribe(TorEventKind::Empty);
327
            // unsubscribing should work
328
            assert!(!event_has_subscribers(TorEventKind::Empty));
329

            
330
            // subscription should be idempotent
331
            rx.subscribe(TorEventKind::Empty);
332
            rx.subscribe(TorEventKind::Empty);
333
            rx.subscribe(TorEventKind::Empty);
334
            assert!(event_has_subscribers(TorEventKind::Empty));
335

            
336
            rx.unsubscribe(TorEventKind::Empty);
337
            assert!(!event_has_subscribers(TorEventKind::Empty));
338

            
339
            rx.subscribe(TorEventKind::Empty);
340
            assert!(event_has_subscribers(TorEventKind::Empty));
341

            
342
            std::mem::drop(rx);
343
            // dropping the receiver should auto-unsubscribe
344
            assert!(!event_has_subscribers(TorEventKind::Empty));
345
        });
346
    }
347

            
348
    #[test]
349
    fn empty_recv() {
350
        let rt = test_setup();
351

            
352
        rt.block_on(async move {
353
            let mut rx = EventReactor::receiver().unwrap();
354
            // attempting to read from a receiver with no subscriptions should return None
355
            let result = rx.next().await;
356
            assert!(result.is_none());
357
        });
358
    }
359

            
360
    #[test]
361
    fn receives_events() {
362
        let rt = test_setup();
363

            
364
        rt.block_on(async move {
365
            let mut rx = EventReactor::receiver().unwrap();
366
            rx.subscribe(TorEventKind::Empty);
367
            // HACK(eta): give the event reactor time to run
368
            tokio::time::sleep(Duration::from_millis(100)).await;
369
            broadcast(TorEvent::Empty);
370

            
371
            let result = rx.next().await;
372
            assert_eq!(result, Some(TorEvent::Empty));
373
        });
374
    }
375

            
376
    #[test]
377
    fn does_not_send_to_no_subscribers() {
378
        let rt = test_setup();
379

            
380
        rt.block_on(async move {
381
            // this event should just get dropped on the floor, because no subscribers exist
382
            broadcast(TorEvent::Empty);
383

            
384
            let mut rx = EventReactor::receiver().unwrap();
385
            rx.subscribe(TorEventKind::Empty);
386

            
387
            // this shouldn't have an event to receive now
388
            let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
389
            assert!(result.is_err());
390
        });
391
    }
392
}